본문 바로가기

IT for researcher/Cloud Computing

How MapReduce Works

Hadoop : The Definitive Guide 책 6장 내용을 일부 발췌해서 발번역함. 이미 한글판 서적이 나왔지만 정리한다는 생각으로 번역~

(3rd Edition을 요즘 보고 있어서 내용을 수정 추가~)


이 장에서 우리는 하둡에서 맵리듀스가 어떻게 동작하는지 상세하게 볼 것이다.이는 다음 두 장에서 살펴볼 좀 더 개선된 맵리듀스 프로그램 만들기 위한 좋은 지침을 제공한다. 

Anatomy of a MapReduce Job Run

당신은 한 라인 코드를 가지고 맵리듀스 job을 실행할 수 있다. : JobClient.runJob(conf). 매우 짧다. 그러나 이 뒤에는 수많은 처리가 숨겨져 있다. 이번 섹션은 하둡이 job을 수행하기위한 절차들을 알아본다.
(Hadoop이 0.2로 버전업되면서 API가 바뀌었다. submit(), waitForComplete() 메소드로 Job을 실행시킨다.)

하둡 0.23.0에서 새로운 맵리듀스 구현방법이 소개된다. 이를 MapReduce 2라고 부르겠다.

MapReduce 1
다음 그림은 전체 과정을 보여주고 있다.  최고 레벨에는 네 가지 독립적인 엔티티가 있다.
  • 맵리듀스 job을 제출하는 클라이언트
  • job 실행을 조정하는 잡트랙커. 잡트랙커의 메인 클래스는 JobTracker이다.
  • job은 태스크로 나누어 지는데, 이러한 task를 실행하는 태스크트랙커. 태스크트랙커들의 메인 클래스는 TaskTracker 이다.
  • 다른 엔티티들 사이에서 job 파일들을 공유하는데 사용되는 분산 파일시스템 (HDFS)



Job Submission
hadoop 0.2 부터
Job 의 submit() 메소드는 내부 JobSummitter 인스턴스를 생성하고 submitJobInternal() 호출 (스텝 1)

JobClient 의 runJob 메소드는 새로운 jobClient를 생성하고 submitJob 메소드를 호출하는 편리한 메소드이다 (스텝 1) . 제출된 job을 가지고 runJob은 매초마다 job의 진행을 검사하여 마지막 리포트 이후 변화가 있다면 콘솔에 진행과정을 리포트한다. job이 끝났을 때, 성공했다면, job 카운터가 보여진다. 그렇지 않으면 실패하게된 원인이 되는 에러를 콘솔에 보여준다.
Job 제출 과정은 JobClient의 submitJob() 메소드에 구현되었다. 
  • 잡트랙커에게 새로운 job 아이디를 요청한다. (JobTracker.getNewJobId() 호출. 스텝 2)
  • job의 출력 명세를 검사한다. 예를 들어, 출력 디렉토리가 지정되지 않았거나 존재하지 않는다면, job은 제출되지 않고 에러를 발생한다.
  • job에 대한 입력 조각(split)을 계산한다. 조각들이 계산될 수 없다면, (입력 경로가 존재하지 않는 이유등으로) job은 제출되지 않고 에러를 발생한다.
  • job을 실행하기 위해 필요한 리소스를 job 아이디 디렉토리안에 잡트랙커 파일 시스템으로 복사한다. (job Jar 파일, 설정 파일, 계산된 입력 분할 파일). job JAR는 태스크들을 수행할 때 태스크트랙커들이 접근하는 클러스터에 많은 복사본들을 놓이게 하기 위하여 높은 복제 인자를 가지고 복사된다. (mapred.submit.replication 속성 - 디폴트 10) (스텝 3)

  • job이 실행될 준비가 됐다는 것을 잡트랙커에게 알린다. (JobTracker.submitJob() 호출, 스텝 4)
실제 입력을 단편화하는 코드가 있는데 다음에 나오는 입력단편하고 무슨 차이가 있는지 아직 모르겠음. 입력파일들중 크기가 큰 파일을 짜르는 작업일 수 있다는 어렴풋한 추측!

Job Initialization :job을 실행 해달라고 요청했으니 이제 job을 실행하기 위한 초기화 작업이 실행된다. - job 추상화. 입력 단편파일들을 가져와서 각 단편마다 맵처리를 위한 태스크를 생성한다.

JobTracker.submitJob() 호출을 받을 때, 이것은 앞으로 job 스케쥴러로부터 선출되고 초기화될 내부 큐에 놓이게 된다. 초기화는 job 실행을 나타내는 (태스크를 캡슐화, 태스크 상태와 진행정도를 추적하기 위한 정보) 객체를 생성하는 것을 수반한다. (스텝 5)

LocalJobRunner인 경우 소스를 따라가다보면 이해가 쉬웠는데; JobTracker인 경우 아무리 봐도 어디서 다음단계로 넘어가는지 찾아도 안나와서 엄청 헤맴.. 알고보니 JobTracker는 별도의 JVM으로 실행됨. main부터 다시 추적이 필요함;;
실행하기 위한 테스크 목록을 생성하기 위하여, job 스케쥴러는 우선 공유된 파일 시스템으로부터 JobClient에 의해 계산된 입력 조각들을 가져온다. (스텝 6). 그런 후 각 조각에 대해 맵태스크를 생성한다. 리듀스 태스크의 수는 JobConf에 mapred.reduce.tasks 속성에 의해 결정된다. setNumReduceTasks() 메소드에 의해 설정되고 스케쥴러는 실행을 위한 리듀스태스크를 해당 수 만큼 생성한다.


Task Assignment :

태스크트랙커는 주기적으로 잡트랙커에게 heartbeat 메소드를 호출하는 간단한 반복문을 실행한다. heartbeat는 태스크트랙커가 살아있다는 것을 잡트랙커에게 알리지만 메세지들을 위한 두 채널로 사용된다. heartbeat 파트에서, 태스크트랙커는 새로운 태스크를 실행할 준비가 되었는지를 알리고 만약에 그렇다면, 잡트랙커는 태스크를 할당한다. 그리고 이것은 heartbeat 리턴 값을 사용하여 태스크트랙커와 통신한다. (스텝 7

태스크트랙커를 위한 태스크선택하기 전에, 잡트랙커는 job으로 부터 태스크를 선출하기 위하여 job을 선택해야만 한다. 여러가지 스케쥴링 알고리즘이 있다. 디폴트는 job에 대한 우선순위 목록를 간단하게 유지한다. job을 선택하면, 잡트랙커는 job을 위한 태스크를 선택한다.

태스크트랙커는 맵과 리듀스태스크들을 위한 고정된 슬롯 수를 가진다. 예를 들어 태스크트랙커는 두개의 맵 태스크와 두개의 리듀스 태스크를 동시에 수행할 수 있을 것이다. (정확한 숫자는 core 수와 메모리 양에 의존한다.) 디폴트 스케쥴러는 리듀스 태스크 전에 비어있는 맵 태스크 슬롯을 채운다. 그래서 만약 태스크트랙커가 적어도 하나의 비어있는 맵 태스크 슬롯이 있다면 잡트랙커는 맵태스크를 선택할 것이다. 그렇지 않으면 리듀스태스크를 선택할 것이다.

리듀스 태스크를 선택하기 위하여 잡트랙커는 아직 실행되지 않은 리스트에서 다음 리듀스 태스크를 가져온다. 거기에 데이터 지역성에 대한 고려는 없다. 그러나 맵태스크에 대해서는 태스크트랙커의 네트워크 위치를 고려하고 태스크트랙커에게 가능한 가까운곳에 있는 입력 조각의 태스크를 선택한다. 최적인 경우에 태스크는 데이터 지역적, 다시 말해 조각이 남겨져 있는 같은 노드에서 실행된다. 일부 태스크들은 데이터 지역성 또는 랙 지역성이지 않고 그들이 실행되고 있는 곳과 다른 랙으로 부터 데이터를 가져온다. 당신은 job 카운터를 보는 것으로 태스크의 각 타입에 대한 비율을 알 수 있을 것이다. 

Task execution

태스크트랙커에 태스크가 할당되고 이제 태스크를 실행하기 위한 다음 절차를 알아 본다. 첫째, 공유된 파일 시스템으로부터 태스크트랙커의 파일시스템으로 복사하여 job JAR를 로컬화한다. 또한 분산 캐시로 부터 애플리케이션에서 필요한 파일들을 로컬 디스크에 복사한다. (스텝 8) 둘째, 태스크를 위한 로컬 워킹 디렉토리를 만들고 JAR파일을 디렉토리에 푼다. 셋째, 태스크를 실행하기 위한 TaskRunner 인스턴스를 생성한다.

TaskRunner는 각 태스크를 실행 (스텝 10) 하기 위하여 새로운 JVM에서 시작된다. (스텝 9) 그래서 사용자가 정의하는 맵과 리듀스 함수에서 발생하는 버그는 태스크트랙커에 아무런 영향을 미치지 않는다. 그러나 태스크들 간의 JVM 재사용도 가능하게 할 수 있다. 

자식 프로세스는 umbilical 인터페이스를 통하여 부모와 통신한다.  이 방법은 태스크가 완료될 때까지 수초 내마다 태스크의 진행상태를 부모에게 알린다.

각 태스크는 setup과 cleanup 액션을 수행한다. 같은 JVM에서 태스크로써 수행되고 OutputCommitter에 의해 결정된다. cleanup 액션은 태스크를 커밋하기위해 사용된다. 이는 이 태스크의 출력이 최종 위치에 쓰여질 것이라는 것을 의미한다. 중복 태스크중 한개만 커밋되고 다른것은 중단된다. (MapReduce의 map, reduce 메소드를 수행하기 전/후 메소드라고 보면된다.)

스트리밍과 파이프는 사용자가 제공하는 프로그램을 시작하고 통신하기 위한 목적으로 특별한 맵과 리듀스 태스크를 실행한다.
스트리밍인 경우, 스트리밍 태스크는 표준 입력과 출력 스트립을 사용하여 프로세스와 통신한다. 반면에, 파이프 태스크는 소켓을 리슨하고 C++ 프로세스에게 포트넘버를 전달한다. 그래서 시작하고, C++ 프로세스는 부모 자바 파이프 태스크에 지속적 소켓 연결을 수행할 수 있다.
두 경우에, 태스크를 실행하는 동안 자바 프로세스는 입력 키-값 쌍을 외부 프로세스에 전달하고 사용자 정의 맵 또는 리듀스 함수를 통해 실행하고 자바프로세스에 키-값 쌍 출력을 전달한다. 태스크트랙커의 관점에서 보면, 마치 태스크트랙커 자식 프로세스가 맵 또는 리듀스 자신의 코드를 실행한것과 같다.



Progress and Status Updates

맵리듀스 job들은 일괄 작업으로 오랫동안 수행된다. 이러한 시간이 상당히 길기 때문에 사용자가 job이 어떻게 진행되고 있는지에 대한 피드백을 얻는것이 중요하다. job과 각 태스크들은 다음과 같은 상황(status)를 가지고 있다. job 또는 태스크의 상태 (running, successfully completed, failed). 맵과 리듀스의 진행과정, job 카운터 값, 상황 메세지. 이러한 상황들은 job의 진행과정중에 변경한다. 그렇다면 어떻게 클라이언트와 통신할까? 

태스크가 실행할 때,  진행과정을 추적한다. 다시말해, 완료된 태스크의 비율. 맵 태스크들에서는  처리된 입력의 비율이 되고, 리듀스 태스크에서는 좀더 복잡하다. 그러나 시스템은 진행된 리듀스 입력 비율을 여전히 평가할 수 있다. 진행정도는 셔플의 3개 페이즈에 따라  세 부분으로 나뉜다. 예를 들어, 태스크가 입력의 절반에 대해 리듀스를 실행한다면 태스크의 진행정도는 5/6이다. 이것은 복사와 소트 페이즈를 완료하고 (각각 1/3) 리듀스 페이즈의 중간정도 수행한 것이다.(1/6)  1/3 + 1/3 + 1/6 ==> 6/5

진행상태가 항상 측정가능한 것은 아니다. 그럼에도 불구하고 태스크가 무엇을 하고 있는지 하둡에게 알릴 수는 있다. 예를 들어 비록 쓰여질 전체 갯수를 퍼센트로써 표현할 수 없지만 출력 레코드들을 쓰고 있는 태스크가 진행되고 있음을 나태낼수는 있다.
진행중인 태스크가 실패하지 않을 것이라는 것을 의미함으로 써 진행상태 리포팅은 중요하다. 다음 오퍼레이션들로 진행을 구성한다.
  • 입력 레코드를 읽기 (매퍼 또는 리듀서에서)
  • 출력 레코드를 쓰기 (매퍼 또는 리듀서에서)
  • 리포터에 상태 표시로 설정 (Reporter에 setStatus() 메소드 사용)
  • 카운터 증가시키기  (Reporter의 incrCounter() 메소드 사용)
  • Reporter의 progress() 메소드 호출

  태스크는 쓰여진 맵 출력 레코드의 수와 같이 프레임워크에 빌드되었거나 사용자들에 의해 정의된 것들을 실행함으로써 다양한 이벤트를 세는 카운터들의 집합을 가지고 있다.

태스크가 진행상태를 리포트한다면 상태 변화를 태스크트랙커에게 보내져야만 한다는 플래그를 설정한다. 플래그는 3초마다 분리된 쓰레드에서 체크되고 설정되었다면 태스크 트랙커에서 현재 태스크 상황을 알린다. 그동안, 태스크트랙커는 heartbeat를 잡트랙커에게 5초마다 보낸다.(이 값은 최소값이다 클러스터의 크기에 따라 달라질 수 있다.) 그리고 태스크트랙커에의 실행된 모든 태스크들의 상태는 그 호출에서 보내진다. 카운터들은 매 5초보다 덜 자주 보내진다. 

잡 트래커는 실행되고 있는 모든 잡과 구성 태스크들의 상태를 거시적인 뷰를 제공하기 위하여 이러한 갱신들을 취합한다. 결국 이전에 얘기한 것처럼, Job은 매초 잡트랙커를 폴링하여 가장 마지막 상태를 받는다. 클라이언트는 또한 Job의 getStatus() 메소드를 사용하여 JobStatus 인스턴스를 얻을 수 있고 이 인스턴스는 job에 대한 모든 상태 정보를 포함하고 있다.

Job Completion

잡트랙커가 잡에 대한 마지막 태스크가 완료됐다는 알림을 받았을 때, 잡에 대한 상황을 "성공적"으로 변경한다. 그런 후 JobClient가 이 상태를 폴링하며 잡이 성공적으로 완료되 었음을 알게 된다. 그래서 사용자에게 메세지를 출력하고 runJob() 메소드를 리턴한다. - in 0.20 waitCompletion()
잡트랙커는 설정에 따라 HTTP job 알림을 보낼 수도 있다. 콜백을 받기 원하는 클라이언트에 의해 설정될 수 있다. (job.end.notificaton.url 속성)
마지막으로, 잡트랙커는 job을 위한 워킹상태를 깨끗이하고 태스크트랙커에게도 동일하게 수행하도록 한다. (예를 들어 임시파일 삭제)