본문 바로가기

IT for developer/Storm

[스톰] Storm Tutorial

원문 : https://github.com/nathanmarz/storm/wiki/Tutorial

Tutorial

튜토리얼에서, 어떻게 스톰 토폴로지를 생성하고 스톰 클러스에 그것들을 배치하는지 배울 것이다. 자바가 사용될 주요 언어가 되겠지만 몇몇 예제는 스톰의 다중 언어 능력을 보이기 위하여 파이썬을 사용할 것이다. 

Preliminaries

이 튜토리얼은 storm-starter 프로젝트의 예제를 사용한다. 프로젝트를 복제해서 해당 예제를 따라하길 추천한다. 다음을 읽고 설정하자. Setting up development environment (https://github.com/nathanmarz/storm/wiki/Setting-up-development-environment) 와 Creating a new Storm project 

Components of a Storm Cluster


스톰 클러스터는 외관상 하둡 클러스터와 유사하다.  하둡에서 "MapReduce jobs"을 실행하는 반면에 스톰에서는 토폴로지(topologies)를 실행한다. 잡(job)과 토폴로지는 매우 다르다. 주요 차이점은 맵리듀스 잡은 결국 끝나지만 토폴로지는 영원히 메세지를 처리한다.

스톰 클러스터에 두 종류의 노드가 있다. 마스터 노드와 워커 노드들. 마스터 노드는 하둡의 잡트랙커(JobTracker)와 유사한 Nimbus라고 불리는 데몬을 실행한다. Nimbus는 클러스터에 코드 분산하고 머신들에게 태스크들을 할당하고 실패에 대한 모니터링을 책임지고 있다.

각 워커 노드는 "슈퍼바이저(Supervisor)"라고 불리는 데몬을 실행한다. 슈퍼바이저는 워커 노드 머신에 할당된 일이 있는지 주시(listen)하고 Nimbus가 워커노드에 무엇을 할당했느냐에 따라 워커 프로세스를 시작/정지 한다. 각 워커 프로세스는 토폴로지의 한 부분을 수행한다. 실행중인 토폴로지는 많은 머신들에 걸쳐 퍼져있는 많은 워커 프로세스들로 구성된다.

Nimbus와 슈퍼바이저들 사이에 모든 조정은 쥬키퍼(zookeeper) 클러스터를 통해서 이루어진다. 추가적으로, Nimbus 데몬과 슈퍼바이저 데몬들은 fail-fast 와 무상태이다. 모든 상태는 쥬키퍼 또는 로컬 디스크에 유지된다. 이는 kill -9를 통해 Nimbus 또는 수퍼바이져를 죽일 수 있으며 아무일 없었던 것처럼 다시 시작할 수 있음을 의미 한다.

Topologies


스톰에서 리얼타임 계산을 수행하기 위해서는 "토폴로지"라고 불리는 것을 생성한다. 토폴로지는 계산 그래프이다. 토폴로지에서 각 노드는 로직 처리를 포함하고, 노드들 사이에 링크들은 어떻게 데이터가 노드들 사이로 지나가야되는지를 나타낸다.

토폴로지를 실행하는 것은 간단하다. 첫째, jar파일에 코드와 종속된 라이브러리들을 패키지한다. 그런 후에 다음과 같은 커맨드를 실행한다.

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

인수 arg1, arg2를 가지고 있는 MyTopology 클래스를 실행한다. 이 클래스의 main 함수는 토폴로지를 정의하고 Nimbus에게 전달한다. storm jar 부분은 Nimbus와 연결하고 jar를 업로딩한다.

토폴로지 정의는 Thrift 구조이고 Nimbus는 Thrift 서비스이기 때문에, 어떠한 언어를 사용해서도 토폴로지를 생성하고 제출할 수 있다. 위에 예제는 JVM 기반 언어로 부터 이를 수행하는 가장 간단한 방법이다. 토폴로지를 시작하고 정지하기 위한 더 많은 정보를 얻으려면 다음 링크를 살펴보자.  Running topologies on a production cluster

Streams


스톰에서 핵심 추상화는 스트림(stream) 이다. 스트림은 튜플들의 무한 시퀀스이다. 스톰은 분산되고 신뢰할 만한 방법으로 어떤 하나의 스트림을 새로운 스트림으로 변형하기 위한 기본 요소들(primitives)를 제공한다. 예를 들어, 트윗의 스트림을 유행 토픽의 스트림으로 변형할 수 있다.

스톰이 스트림 변형을 위해 제공하는 기본적인 요소들로 "spouts" 와 "bolts"가 있다. spouts와 bolts는 당신의 애플리케이션에 구체적인 로직을 수행하도록 구현하기 위한 인터페이스들을 가지고 있다.

Spout는 스트림의 소스이다. 예를 들어, spout은 Kestrel 큐에 튜플들을 읽고 그것들을 스트림으로 뽑아낸다. 또는 spout는 트윗터 API와 연결하고 그를 트윗터 스트림으로 뽑아낸다. (원시 데이터를 스트림으로 뽑아 내는 역할)

bolt는 입력 스트립의 소비하고 일부를 처리하고 가능하면 새로운 스트림을 뽑아낸다. 트윗의 스트림으로 부터 유행하는 토픽을의 스트림을 계산하는 것과 같은 복잡한 스트림 변형은 다중 스텝, 다중의 bolts가 요구된다. Bolts는 함수 실행, 튜플 필터, 스트림 취합, 스트림 참여, 데이터베이스와 연계등 어떠한 것도 할 수 있다. (볼트의 연결이 결국 출력을 만들어냄)

spouts와 bolts의 네트워크들은 탑-레벨 추상화인 "토폴로지"안에 패키지된다. 토폴로지는 각 노드가 spout 또는 bolt인 스트림 변환 그래프이다. 그래프에서 화살표 모양은 어느 bolts가 어떤 스트림에 가입할 지를 나타낸다.  spout 또는 bolt가 스트림에 어떤 튜플을 내보내면, 그 스트림을 구독하는 모든 bolt에게 그 튜플을 보낸다. 




토폴로지에서 노드들 사이의 링크들은 튜플들이 어떻게 지나가는지 나타낸다. 예를 들어, Spout A와 Bolt B 사이에 링크, Spout A에서 Bolt C 까지 링크와 Bolt B와 Bolt C 사이의 링크가 있다면, 매번 Spout A는 어떤 튜플을 내보낼 때마다 이 튜플을 Bolt B와 Bolt C에 보낼 것이다.

스톰 토폴로지에서 각 노드는 병렬로 수행한다. 토폴로지에서, 각 노드가 어느정도의 병렬성을 원하는지 명시할 수 있고, 그 후 스톰은 실행을 위한 클러스터의 쓰레드의 수만큼을 생성할 것이다.

토폴로지는 영원히 수행되거나 kill될때 까지 수행된다. 스톰은 자동적으로 실패한 태스크들을 재할당할 것이다. 그리고 머신이 다운되고 메세지가 드랍됐더라도 데이터 무손실을 보장한다.

Data model


스톰은 데이터 모델로 튜플들을 사용한다. 튜플은 이름을 갖는 값들의 리스트이고 튜플안에 필드는 어떤 타입의 객체도 될 수 있다. 특별히, 스톰은 튜플 필드 타입으로 원시 타입, 스트링, 바이트 배열 모두 지원한다. 다른 타입의 객체를 사용하기 위하여 해당 타입을 위한 serializer를 구현할 필요가 있다.

토폴로지에 모든 노드들은 내보내는 튜플들을 위한 출력 필드들을 선언해야만 한다. 아래 예제에서, bolt 는 필드 "double"과 "triple"를 가진 2-튜플를  내보내도록 선언하고 있다.

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector; 

    @Override 
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector; 
    } 

    @Override 
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3)); 
        _collector.ack(input); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple")); 
    }     
}

declareOutputFields 함수는 출력 필드 ["double", "triple"]를 선언하고 있다. 나머지 부분은 추후에 설명한다.


A simple topology



개념에 대해 좀더 알아보고 어떻게 코드가 구성되는지 알기 위하여 간단한 토폴로지를 살펴보자.

storm-starter로 부터 ExclamationTopology 정의를 살펴보자.

API는 여기에서 확인할 수 있다.

public BoltDeclarer setBolt(java.lang.String id, IBasicBolt bolt,  java.lang.Integer parallelism_hint)

public SpoutDeclarer setSpout(java.lang.String id, IRichSpout spout,  java.lang.Integer parallelism_hint)


TopologyBuilder builder = new TopologyBuilder();        

builder.setSpout("words", new TestWordSpout(), 10);        

builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words");

builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

이 토폴로지는 하나의 spout과 두개의 bolt를 포함하고 있다. spout은 단어들을 내보내고 각 bolt는 스트링 "!!!"을 입력값에  추가한다. 노드들은 하나의 라인으로 되어있다. spout는 추후 두 번째 bolt로 내보내는 첫 번째 bolt로 내보낸다. 만약에 spout가  ["bob"] 과 ["john"]을 내보낸다면 두번째 bolt는 ["bob!!!!!!"] 과 ["john!!!!!!"]을 내보낼 것이다.

이 코드는 setSpout과 setBolt 메소드를 사용하여 노드들을 정의한다. 이 메소드들은 사용자가 지정한 아이디, 처리 로직을 포함하고 있는 객체와 노드에 대해 원하는 병렬성의 정도를 인수로 가진다. 이 예제에서, spout는 id는 "words"이고 bolt의 각각 아이디는 "exclaim1", "exclaim2"이다.

처리 로직을 포함하고 있는 객체는 각각 IRichSpout와 IRichBolt 인터페이스를 구현하고있다.

마지막 파라미터, 노드에게 원하는 병렬성 정도는 선택사항이다. 이는 클러스터에 얼마나 많은 스레드가 이 컴포넌트를 수행할지를 나타낸다. 만약에 이를 생략하면, 스톰은 노드당 하나의 쓰레드를 할당 할 것이다.

setBolt 는 Bolt에 입력으로 정의되어 사용될 InputDeclarer를 리턴한다. 여기에, 컴포넌트 "exclaim1"은 셔플 그룹핑을 사용하여 컴포넌트 "words"에 의해 내보내지는 모든 튜플들을 읽기 원한다고 선언하고 있고 컴포넌트 "exclaim2"는 셔플 그룹핑을 사용하여 컴포넌트 "exclaim1"에서 내보내지는 모든 튜플들을 읽기 원한다고 선언하고 있다. 셔플 그룹핑 (shuffle grouping)은 튜플들이 입력 태스크로부터 bolt  태스크까지 임의로 분산되야한다는 것을 의미한다. 컴포넌트들 사이에서 데이터를 그룹화하기 위한 여러가지 방법이 있다. 나중에 설명하도록 한다.

만약에 컴포넌트 "word"와 컴포넌트 "exclaim1" 에서 내보내는 모든 튜플들을 읽는 컴포넌트 "exclaim2" 를 정의한다면 다음과 같이 정의하면 된다.

builder.setBolt("exclaim2", new ExclamationBolt(), 5).shuffleGrouping("words").shuffleGrouping("exclaim1")

입력 선언들은 bolt에 대해 다중의 소스들을 지정하기 위해 연결될 수 있다.

이 토폴로지에서 spout와 bolt의 구현을 좀 더 깊이 살펴보자. Spout는 토폴로지에 새로운 메세지를 내보내는 책임이 있다. 이 토폴로지에서 TestWordSpout는  100ms 마다 1-튜플로써 ["nathan", "mike", "jackson", "golda", "bertels"] 리스트로 부터 임의의 한개의 단어를 내보낸다. TestWordSpout에서 nextTuple() 구현은 다음과 같다.

public void nextTuple() { 

    Utils.sleep(100); 

    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; 

    final Random rand = new Random();

    final String word = words[rand.nextInt(words.length)]; 

    _collector.emit(new Values(word)); 

}

살펴본 것처럼, 구현은 매우 간단하다.

ExclamationBolt는 문자열 "!!!"를 입력값에 추가한다. 전체 구현을 살펴보자.

public static class ExclamationBolt implements IRichBolt {

    OutputCollector _collector; 

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector; 

    } 

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 
        _collector.ack(tuple); 

    } 

    public void cleanup() {

    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word")); 

    } 

    public Map getComponentConfiguration() { 

        return null; 

    } 

}

prepare 메소드는 bolt로 부터 내보내지는 튜플들에 사용되는 OutputCollector를 bolt에게 제공한다. 튜플들은 bolt로부터 언제든 내보내질 수 있다. prepare, execute, cleanup 메소드 또는 다른 쓰레드에서 비동기적으로, prepare 구현은 execute 메소드에서 후에 사용될 인스턴스 변수 OutputCollector를 저장한다.

execute 메소드는 bolt의 입력들중 하나로 부터 한개의 튜플을 받는다. ExclamationBolt 는 그 튜플로부터 첫번째 필드를 잡고 문자열 "!!!"를 추가하여 내보낸다. 지금은 1개의 필드 "word" 밖에 없으므로 첫번째 필드가 "word"가 된다. tuple.getString(0) 대신 tuple.getStringByField("word")로 써도 동일하다. 만약에 다중의 입력 소스들로 부터 구독하는 bolt를 구현한다면 Tuple#getSourceComponent 메소드를 사용하여 어느 컴포넌트로 부터 온 튜플인지 알아 낼 수 있다.

첫 번 째 필드를 emit하는 것 외에 마지막 라인에 입력 튜플을 ack하는 것이 있는데 이는 Storm의 신뢰성 API로써 데이터 손실이 없을을 지원한다. 추후에 설명하도록 한다.

cleanup 메소드는 bolt가 셧다운될 때 불려지며 열어놨던 리소스들을 처리해야만 한다. 클러스터에서 이 메소드가 호출되는것을 보장하지는 않는다. 예를 들어 태스크가 수행중이던 머신이 폭발한다면 이 메소드가 호출될 방법이 없다. cleanup 메소드는 로컬모드에서 토폴로지를 수행할 때와 어떤 리소스의 누수없이 토폴로지를 시작하고 종료하기 원할 때를 위한 의도이다.

declareOutputFields 메소드는 ExclamationBolt가 "word"라고 불리는 하나의 필드를 1-튜플 내보내도록 선언하고 있다. 이 예제에서는 "word"라고 안해도 어차피 정상적으로 동작한다. 튜플에서 데이터를 가져올 때 tuple.getString(0) 하기 땜시롱. 

만약에 tuple.getStringByField("word")로 수정하였다면 무조건 "word"로 써야 한다. 그렇지 않으면 exclaim2 bolt에서 오류가 발생할 것이다.

getComponentConfiguration 메소드는 어떻게 컴포넌트를 실행시킬지 다양한 설정을 할 수 있도록 한다. 이는 Configuration 토픽에서 설명될 것이다.

cleaup 과 getComponentConfiguration 은 bolt 구현에서 종종 필요치 않다. 이럴때 디폴트 구현을 제공하는 베이스 클래스를 사용하여 bolt 정의를 만족스럽게 정의할 수 있다. ExclamationBolt 는 BaseRichBolt를 상속하여 좀더 간결하게 구현할 수 있다.

public static class ExclamationBolt extends BaseRichBolt {

    OutputCollector _collector;


    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;

    }

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }    

}


Running ExclamationTopology in local mode


로컬 모드에서 ExclamationTopology를 어떻게 실행 시키는지 살펴보자.

스톰은 두 가지 동작 모드를 가진다. 로컬과 분산 모드. 로컬 모드에서 스톰은 쓰레드들을 가지고 워커 노드를 모의적으로 수행한다. 로컬모드는 토폴로지들을 개발하고 테스트하기 위해 유용하다. storm-starter에서 토폴로지들을 실행하면 로컬 모드로 실행될 것이고 각 컴포넌트가 나보내고 있는 메시지가 무엇인지 알 수 있을 것이다. 로컬 모드에서 좀 더 자세한 내용을 볼 수 있다.

분산 모드에서, 스톰은 머신들의 클러스터로써 동작한다. 마스터에서 토폴로지를 제출하면 토폴로지를 실행하기 위해 필요한 모든 코드를 제출한다. 마스터는 코드를 분산하고 토폴로지를 실행시킬 워커를 할당할 것이다. 만약에 워커가 다운되면 다른 곳에 그것들을 재 할당할 것이다. 좀 더 자세한 내용은  Running topologies on a production cluster 에서 살펴 볼 수 있다.

다음은 로컬 모드에서 ExclamationToplogy를 실행하는 코드이다.

Config conf = new Config();

conf.setDebug(true);

conf.setNumWorkers(2);


LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

Utils.sleep(10000);

cluster.killTopology("test");

cluster.shutdown();

LocalCluster 객체를 생성하여 클러스터를 정의한다. 가상 클러스터에 토폴로지를 제출하는 것은 분산 클러스터에 토폴로지를 제출하는 것과 동일하다. submitTopology를 호출하여 LocalCluster에 토폴로지를  제출한다. (토폴로지 이름, 설정, 토폴로지)

이름은 토폴로지를 식별하기 위해 사용되고 후에 종료하는데 사용할 수 있다.

설정은  토폴로지를 실행하는 다양한 곳에서 사용된다. 명시된 두개의 설정이 매우 일반적인 것이다.

1. TOPOLOGY_WORKER (setNumWorkers) 는 토폴로지를 수행하기 위하여 클러스터에 얼마나 많은 프로세스 할당되기 원하는지 명시한다. 토폴로지에서 각 컴포넌트는 많은 쓰레드로써 수행될 것이다. 주어진 컴포넌트에 할당된 쓰레드의 수는 setBolt, setSpout메소드를 통하여 설정된다. 이러한 쓰레드들은 워커 프로세스들안에 존재한다. 각 워커 프로세스는 컴포넌트의 수에 따라 쓰레드의 수가 결정된다. 예를 들어 모든 컴포넌트에 걸쳐 300개의 쓰레드와 50개의 워커 프로세스를 명시하고 있다면 각 워커 프로세스는 6개의 쓰레드를 수행하고 각각 쓰레드는 별개의 컴포넌트 소속이다. 각 컴포넌트를 위한 병렬성을 수정과 쓰레드가 실행된 워커 프로세스의 수에 따라 퍼포먼스를 튜닝할 수 있다.

2. TOPOLOGY_DEBUG (setDebug) 는 true로 설정하면 컴포넌트에 의해 내보내지는 모든 메세지를 남긴다. 이는 테스트 토폴로지일때 로컬 노드에서 유용하다. 클러스터에서 토폴로지를 실행하는 경우는 꺼두는것이 좋다.


Stream groupings


스트림 그룹핑은 두 컴포넌트 사이에서 어떻게 튜플을 보낸지 토폴로지에게 말한다. 기억해라, spouts와 bolt는 클러스터에 있는 수많은 태스크로 병렬로 수행한다.  토폴로지거 어떻게 태스크 레벨에서 실행되는지 본다면 다음과 같은 것을 볼 것이다.



bolt A는 bolt B에 튜플 하나를 내보내는 태스크일 때, 어느 태스크가 이 튜플을 보낼 것인가?

"스트림 그룹핑"은 태스크 집합들 간에 어떻게 튜플을 보내는지 스톰에게 알려줌으로써 이 질문의 답하고 있다. 자세한 스트림 그룹핑에 대해서 살펴보기 전에 storm-starter로 부터 다른 토폴로지를 살펴보자. WordCountTopology는 spout에서 문장을 읽고 WordCountBolt에 내보낸다.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        

builder.setBolt("split", new SplitSentence(), 8) .shuffleGrouping("sentences");

builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));


SplitSentence는 각 문장에서 각 단어에 대해 튜플을 내보내고 WordCount는 단어의 개수를 메모리에 가지고 있다. WordCount가 단어를 받을 때마다 상태를 갱신하고 새로운 단어 개수를 추출한다.

스트림 그룹핑의 몇 가지 다른 종류가 있다.

shuffle grouping은 그 튜플을 랜덤한 태스크에 보낸다.

좀더 흥미로운 그룹핑의 종류로 "fields grouping" 이라는 것이 있다. 동일한 필드 값을 가지고 있는 것은 동일한 태스크에 보낸다.


나머지 부분은 좀 더 상세하게 설명한 페이지 참조

Defining Bolts in other languages 

Guaranteeing message processing

Transactional topologies

Distributed RPC