본문 바로가기

IT for developer/Hadoop+Nosql

Hadoop YARN (MapReduce 2)

Hadoop 0.23.0 부터 새로운 Map/Reduce 구현 방법을 지원하는데 Hadoop The Definitive Guide 3rd 를 발번역해서 살펴봐야겠다.~

YARN (MapReduce 2)

4000노드 이상의 큰 클러스터에 대해 이전 방식의 맵리듀스 시스템은 확장성 병목이 생긴다. 그래서 야후에서는 맵리듀스의 다음 세대를 설계하였다. 그 결과가 YARN (Yet Another Resource Negotiator 또는 YARN Apllication Resource Negotiator) 이였다. 

YARN은  잡트래커의 책임을 분리된 엔티티로 나누어서 기존 맵리듀스의 확장성 단점을 극복하고 있다. 잡트랙커는 잡 스케쥴링(태스크를 태스크트랙커들에게 매칭)과 태스크 진행상태 모니터링을 다룬다.(태스크의 상태를 살피고 실패하거나 느린 태스크를 재시작하고 전체 카운터를 유지하는것과 같은 기록(bookkeeping)을 하는 태스크를 수행한다.)

YARN은 이 두개의 롤을 독립적인 데몬으로 나눈다.

리소스 매니저(Resource Manager): 클러스터에서 리소스들의 사용을 관리한다.

애플리케이션 마스터(Application Master): 클러스터에 실행중인 애플리케이션의 라이프사이클을 관리한다.

노드 매너저(Node Manager): 컨테이너들을 관리한다.

애플리케이션 마스터는 클러스터의 리소스를 위해 리소스 관리자와 교섭하고  컨테이너에 애플리케이션 프로세스를 실행한다. 컨테이너들은 클러스터 노드에 실행중인 노드 매니저에 의해 관리된다. (할당된 것보다 더 많은 리소스를 사용할 수 없도록 관리)

잡트랙커와 비교해서, 애플리케이션 각 인스턴스 (맵리듀스 Job) 은 자신만의 애플리케이션 마스터를 가지고 있다. 이는 애플리케이션 실행 내내 동작한다. 이 모델은 구글 맵리듀스 논문에 가깝다.(마스터 프로세스가 워커들 집합에서 실행중인 맵과 리듀스 태스크들을 어떻게 조직화하는지에 대해 기술)

이와같이 YARN은 맵리듀스보다 좀 더 보편적으로 적용할 수 있다. 사실 MapReduce는 YARN 애플리케이션의 한 타입이다.

몇 가지 다른 YARN 애플리케이션이 있다.(클러스터 노드들위에서 스크립트를 실행할 수 있는 분산 쉘) 그리고 다른 활동들도 수행 중이다. http://wiki.apache.org/hadoop/PoweredByYarn
YARN 설계의 아름다움은 다양한 애플리케이션들이 같은 클러스터안에 공존할 수 있다는 것이다. 예를들어  맵리듀스 애플리케이션은 MPI 애플리케이션로써 동시에 실행 될 수 있다. 이는 관리성과 클러스터 이용에 대한 커다란 이득을 가져다 준다.

더욱이, 동일한 YARN 클러스터에서 다양한 맵리듀스 버전을 실행시킬 수 있다. 이는 맵리듀스 업그레이딩을 수행할 수 있게 해준다. (YARN 자체 뿐만아니라 잡 히스토리 서버, 셔플 핸들러와 같은  맵리듀스의 일부분)

YARN에서 맵리듀스는 예전 맵리듀스보다 많은 엔티티와 연관되어 있다.

  • 클라이언트 - 맵리듀스 잡을 제출
  • YARN 리소스 매니저 - 클러스터에서 리소스 할당 조율
  • YARN 노드 매니저 - 클러스터에 머신들에 컨테이너들을 시작하고 모니터링
  • 맵리듀스 애플리케이션 마스터 - 맵리듀스 잡을 실행시키는 태스크들을 조율 . 애플리케이션 마스터와 맵리듀스 태스크들은 컨테이너에서 실행된다. (컨테이너들은 리소스 매니저에 의해 스케쥴되고 노드 매니저에 의해 관리된다.)
  • 분산 파일 시스템 (일반적으로 HDFS) - 다른 엔티티들에게 잡 파일을 공유하기 위해 사용된다.

 

YARN 사용시 맵리듀스 잡을 어떻게 실행시키는가


Job Submission

잡은 맵리듀스1처럼 동일한 사용자 API를 사용하여 맵리듀스2에 제출된다. (스텝 1) 맵리듀스2는 ClientProtocol을 구현하고 있다 (mapreduce.framework.name=yarn 일때 활성화됨). 제출 프로세스는 이전 구현과 매우 유사하다. 비록 YARN에서 애플리케이션 ID로 불리지만 동일한 의미의 새로운 잡 ID를 리소스 매니저로부터 가져온다.(이전에는 잡트랙커로부터 가져옴). (스텝2) 잡 클라이언트는 잡의 출력 파트 명세를 검사한다. 입력 조각(split)들 계산 (yarn.app.mapresuce.am.compute-splits-in-cluster 라는 옵션을 통해 클러스터에서 그것들을 생성하는 하지만 이는 많은 조각들을 가지고 있는 잡들에게 유용할 수 있다.)
잡 리소소들을 HDFS에 복사한다 (잡 JAR, 설정, 조각 정보 포함) (스텝 3).
결국, 잡들은 submitApplication()이 호출로 리소스 매너저에 제출된다 (스텝 4).


Job Initialization

리소스 매니저가 submitApplication()에 호출을 받으면, 스케쥴러에게 요청을 넘긴다. 스케쥴러는 컨테이너를 할당하고 리소스 매니저는 애플리케이션 마스터 프로세스를 실행한다 - 이는 노드매니저 관리하에 수행된다 (스텝 5a 와 스텝 5b).

맵리듀스 잡을 위한 애플리케이션 마스터는 MRAppMaster라는 매인 클래스를 가진 자바 애플리케이션이다. 잡은 상태를 추적하기 위한 수많은 북킵핑 객체들을 생성하여 잡을 초기화한다 (스텝 6).
다음, 공유 파일 시스템으로 부터 클라이언트에서 계산된 입력 조각들을 가져온다 (스텝 7)
각 조각에 대해 맵 태스크 객체와  mapreduce.job.reduces 속성에 의해 결정되는 리듀스 태스크 객체들을 생성한다.

애플리케이션 마스터가 해야할 다음 것은 맵 리듀스 잡을 구성하는 태스크들을 어떻게 실행시킬지 결정하는 것이다. 만약에 잡이 작다면 애플리케이션 마스터는 동일한 JVM에서 그것들을 실행시키도록 선택할 것이다. 이는 한노드에서 시퀀스하게 태스크들을 실행하는 것과 비교해서 새로운 컨테이너를 할당하고 태스크들을 실행하려는 오버헤드가 병렬로 처리해서 얻는것보다 크다는 판단되면 동일한 JVM에서 실행한다. 그러한 잡을 uberized라고 부른다. uber 태스크

작은 잡으로써 어떻게 판단할까? 디폴트로 10 매퍼이하, 하나의 리듀서, HDFS 블럭사이즈보다 작은 입력 사이즈등이다.
이는 mapreduce.job.ubertask.maxmaps,  mapreduce.job.ubertask.maxreduce, mapreduce.job.ubertask.maxbytes 를 설정하여 변경할 수 있다. uber 태스크를 활성화/비활성화하는 것은 mapreduce.job.ubertask.enable을 설정)

태스크들은 실행되기전에 출력 디렉토리를 생성하기 위해 잡 setup 메소드가 호출된다. 맵리듀스1과 비교하여 YARN에서는 이 메소드가 애플리케이션 마스터에 의해 직접 호출된다. (맵리듀스1에서는 태스크트랙커에 의해 실행되는 특별한 태스크에 의해 호출 됨)


Task Assignment

만약에 잡이 uber 태스크로 실행되는게 아니라면 애플리케이션 마스터는 모든 맵, 리듀스 태스크들에 대한 컨테이너들을 리소스 매니저에게 요청한다 (스텝 8). 이 요청은 hearbeat 통해 맵 태스크의 데이터 지역성에 대한 정보(입력 조각이 있는 호스트와 랙)를 포함한다. 스케쥴러는 스케쥴링 결정을 할때 이 정보를 이용한다: 이상적인 경우 태스크를 데이터 지역성이 있는 곳에 놓이도록 시도한다. 그러나 불가능하다면 지역성이 아예 없는 것보다 랙-지역성이 있는 것을 좋아한다.

요청들 또한 태스크들을 위한 메모리 요구사항을 명시한다. 디폴트로, 맵과 리듀스 태스크들은 1024MB 메모리가 할당된다 . 그러나 mapreduce.map.memory.mb 와 mapreduce.reduce.memory.mb 를 설정하여 변경할 수 있다.

매모리를 할당하는 방법은  맵 리듀스1과 다르다. 태스크트랙커들은 클러스터 설정 시간을 지정되는 고정된 슬롯수를 가지고 있고 각 태스크들은 하나의 슬롯에서 실행된다. 슬롯들이 최대 메모리를 허락받고 다시 하나의 클러스터에 대해 고정되는데 이는 메모리를 덜 사용하는 문제와 충분한 메모리가 없어서 태스크를 완료할수 없을때 잡 실패의 문제를 가진다.

YARN에서 리소스들은 좀 더 세밀하다. 위에 두 가지 문제를 피할 수 있다. 특히, 애플리케이션들은 최소 할당과 최대 할당사이에 어느정도의 메모리를 요청한다. 디폴트 메모리 할당은 스케쥴러에 따라 다르다 그리고 디폴트 최소 1024MB (yarn.scheduler.capacity.minimum-allocation-mb), 그리고 최대 10240MB(yarn.scheduler.capacity.maximum-allocation-mb)로 설정된다. 그래서 태스크들은 1~10G 사이의 메모리를 요구할 수 있다. (mapreduce.map.memory.mb 와 mapreduce.reduce.memory.mb로 지정)


Task Execution

일단 태스크가 리소스매니저의 스케쥴러에 의해 컨테이너에 할당되면, 애플리케이션 마스터는 노드매니저와 연결하여 컨테이너를 시작한다 (스텝 9a 와 스텝 9b). 태스크는 YarnChild 메인 클래스 자바 애플리케이션에 의해 실행된다. 실행전에 태스크가 필요로 하는 리소스들을 지역화한다 (잡 설정, Jar 파일, 분산 캐시로 부터의 파일들) (스텝10). 결국, 이는 맵과 리듀스 태스크를 실행한다 (스텝 11).

YarnChild는 전용 JVM에서 실행된다. 이는 맵리듀스1에서 태스크트랙커들이 태스크들을 위한 새로운 JVM을 생성하는 것과 같은 이유이다. 그러나 맵리듀스1과 다르게 YARN은 JVM 재사용을 지원하지 않는다.

스트리밍과 파이프 프로그램들은 맵리듀스1과 동일한 방법으로 동작한다. YarnChild는 스트리밍 또는 파이프 프로세스를 시작하고 표준 입출력 또는 소켓을 사용하여 통신한다. (자식과 서브플로세서들이 태스크트랙커가 아니라 노드 매니저위에서 실행된다는 차이점만 있다.)


Progress and Status Updates

YARN에서 실행하면, 태스크는 진행상태를 애플리케이션 마스터에게 3초마다 리포트한다 (umblical 인터페이스로). 태스크트랙커를 통한 자식부터 잡트랙커까지 플로우를 갱신하던 맵리듀스1과 비교하라.

Task --> YarnChild --> ApplicationMaster  

클라이언트는 매초마다 애플리케이션을 폴링 한다. (mapreduce.client.progressmonitor.pollinterval)


Job Completion

진행상황에 대한 애플리케이션 폴링 뿐만 아니라, 매 5초마다 클라이언트는 잡이 끝났는지 검사한다. (waitCompletion() 메소드를 사용한 경우). 폴링 주기는 mapreduce.client.completion.pollinterval 속성으로 지정할 수 있다.

HTTP 콜백을 통해 잡 완료를 알리는 것은 맵리듀스1에서 지원하는 것과 같다. 맵리듀스2에서 애플리케이션 마스터가 콜백을 초기화시킨다.

애플리케이션 마스트와 태스크 컨테이너들은 그들의 작업상태를 깨끗하게 하고 OutputCommitter의 잡 cleanup 메소드가 호출된다. 잡 정보는 잡 히스토리 서버에 의해 얻을 수 있다.