본문 바로가기

IT for developer/Flume

클라우데라 플룸(Flume) 1


※ 링크들은 수시로 변경되므로 오류가 발생할 수도 있음.


주의사항: 해보면서 글을 쓰고 있으므로 잘못된 내용이 상당할 수 있음.


Flume에 관한 많은 문서를 클라우데라에서 제공해 준다.

https://cwiki.apache.org/confluence/display/FLUME/Index


설치 시에도 각 운영체제 환경마다 별도의 패키지를 이용하여 설치할 수 있다.

https://ccp.cloudera.com/display/CDH4B2/Flume+Installation

https://ccp.cloudera.com/display/CDHDOC/Flume+1.x+Installation

버전 0.9 에서는 Agent, Master, Collector를 별도를 실행한 듯 하지만, 버전 1.1  부터는 Agent만 실행한다.


설정에 따라 Agent가 Collector의 역할을 하는 듯 하다. 개념상으로 Collector이지만 별도의 구분된 소스는 없는듯. (소스를 아직 확인하지는 못했다.)


우선 단일 머신에서 테스트 해봐야겠다.

설정파일에서 source, channel, sink를 설정해야한다. (flume.conf)

source, channel, sink에 대한 설명은 사용자 가이드에 자세히 설명되어 있다.

(http://archive.cloudera.com/cdh/3/flume-ng-1.1.0-cdh3u4/FlumeUserGuide.html)

(https://people.apache.org/~mpercy/flume/flume-1.2.0-incubating-SNAPSHOT/docs/FlumeUserGuide.html)

source는 이벤트를 발생시키는 대상을 정의, channel은 최종 목적지인 sink로 전달하기 위한 임시 저장소라고 보면된다. 하나의 source에서 여러 channel로 보내질 수 있다. 그리고 각 channel에서 자신의 sink로 전달해서 서로 다른 저장소에 저장할 수 있겠다. sink는 채널에서 이벤트를 하나씩 꺼내어 저장소에 저장하는 역할을 한다.


예를 들어 source에 netcat, channel에 memory, sink를 logger로 설정한다면 다음과 같다.

foo.sources = netcatSrc

foo.channels = memoryChannel

foo.sinks = loggerSink


foo.sources.netcatSrc.type = netcat

foo.sources.netcatSrc.bind = 0.0.0.0

foo.sources.netcatSrc.port = 3333


foo.sinks.loggerSink.type = logger


foo.sources.netcatSrc.channels = memoryChannel

foo.sinks.loggerSink.channel = memoryChannel


foo.channels.memoryChannel.type = memory

foo.channels.memoryChannel.capacity = 100


source와 sink 모두 중간 channel을 설정해서 channel에 이벤트를 공급하고 (source), 이벤트를 소비하는 역할 (sink)을 하도록 한다.

netcatSrc는 3333 포트를 데이트를 받도록 설정하고 sink는 log4 포멧 (INFO) 으로 출력하도록 한다.


실행해 보도록 하자.

1) agent를 실행하자. 

sudo bin/flume-ng  agent --conf ./conf/ -f conf/flume.conf -n foo

2) 3333 포트에 연결해서 데이터를 전달해 보자.

telnet localhost 3333

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

1231

OK


다음과 같은 메시지가 agent를 실행한 화면에 출력된다.

2012-08-07 14:31:06,996 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 31 32 33 31 0D                                  1231. }


이것을 이용하여 사용자 가이드에 다양한 형태로 변경해보도록 하자.

1. 아래의 그림처럼 다중의 에이전트를 서로 연결해보자.


우선 Sink 부분에서 logger를 사용하는 대신 avro를 사용해서 연결하면 될 듯

foo.sinks.avroSink.type = avro

foo.sinks.avroSink.hostname = 192.168.40.151 #bar 에이전트가 실행된 ip 주소
foo.sinks.avroSink.port = 4444

나머지 loggerSink부분을 avroSink로 수정하면 된다.

bar 에이전트 설정하기

bar 에이전트는 기존 foo 에이전트 source 부분만 avro로 수정해주면 되겠다.
bar.sources.avroSrc.type = avro
bar.sources.avroSrc.bind = 0.0.0.0
bar.sources.avroSrc.port = 4444

나머지 netcatSrc부분을 avroSrc로 수정하면된다.


netcat으로 연결 후 데이터를 전송하면 bar 에이전트에 전달되는 것을 확인 할 수 있다.

2. 컬렉터 처럼 수행하도록 하는 예제



기존에는 별도의 컬렉터를 두었지만 1.x 버전부터는 Sink와 Source의 연결로 컬럭터의 역할을 수행하는 듯하다.
1번 예제에 Avro Sink를 사용하는 다중의 머신이 있다는 점만 차이가 있는 듯 하다. 패스~~

3. 다중의 채널과 싱크를 사용하는 예


다중의 소스, 채널, 싱크를 선언 방법은 다음과 같다.

<agent>.sources = <Source1> <Source2>

<agent>.sinks = <Sink1> <Sink2>

<agent>.channels = <Channel1> <Channel2>


foo.channels = Channel1 Channel2 Channel3
foo.sinks = Sink1 Sink2 Sink3

foo.sinks.Sink1.channel = Channel1
foo.sinks.Sink2.channel = Channel2
foo.sinks.Sink3.channel = Channel3

foo.sources.Source1.channels = Channel1 Channel2 Channel3