본문 바로가기

IT for researcher/Cloud Computing

Distributed Cache in Hadoop


Hadoop 책에서 발췌해서 임시 발번역함.

 데이터를 직렬화하기 보다 하둡의 분산 캐시 매커니즘을 이용하여 데이터셋을 분산하는 것이 더 유용하다. 첫문장 부터 이해안됨;  태스크들이 실행할 때 사용되는  파일과 아카이브를 태스크 노드에 복사하기 위한 서비스를 제공한다.(맵퍼와 리듀서에서 필요한 리소스들을 저장하기 위한 프로그래밍 매커니즘을 제공함 - 다른 책에서 정의한 문장) 네트워크 대역폭을 절약하기 위하여 파일들은 일반적으로 job 마다 한번 특정 노드에 복사된다.

Usage
GenericOptionsParser를 사용하는 많은 툴들을 위하여 당신은 -files 라는 옵션을 인수로 지정함으로써 콤마로 구분된 URI 목록으로 분산 파일들을 지정할 수 있다. 파일들은 로컬 파일 시스템 HDFS 또는 다른 하둡 파일 시스템(s3와 같은) 에 있을 수 있다. 별다른 스킴이 없다면 이 파일들은 로컬에 있다고 추정된다.
당신은 또한 아카이브 파일들(JAR 파일들, ZIP 파일들, tar 파일들, gzipped tar 파일들) 을 -archives 옵션을 이용하여 태스크들에게 복사할 수 있다;이는  태스크 노드에서 풀어진다. -libjars 옵션은 맵퍼와 리듀서 태스크들의 클래스 패스로 JAR 파일들을 추가할 것이다. 이 옵션은 당신의 job JAR 파일안에 라이브러리 JAR 파일들을 포함하고 있지 않다면 유용할 것이다.

스트리밍은 클러스터를 통해 스트리밍 스크립트를 복사하기 위해 분산 캐시를 사용하지는 않는다. 당신은 -file 옵션을 사용하여 복사할 파일을 지정해야한다. 이는 복사될 각 파일마다 반복해야한다. 더욱이, -file 옵션을 사용하여 지정된 파일들은 URI가 아닌 파일 경로에 있어야만 한다. 그래서 그들은 스트리밍 job을 실행한 클라이언트의 로컬 파일 시스템으로 부터 접근할 수 있어야만 한다.
스트리밍은 스트리밍 스크립들에 의해 사용하기 위한 분산 캐시에 파일들을 복사하기 위해 -files, -archives 옵션을 사용할 수 있다.

관측소 이름들에 대한 메타데이터를 공유하기 위하여 분산 캐시를 어떻게 사용하는지 한번 보자.

% hadoop jar job.jar MaxTemperatureByStationNameUsingDistributedCacheFile \
-files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output

이 명령은 stations-fixed-width.txt 로컬 파일을 태스크 노드들에 복사한다. 그래서 우리는 관측소이름을 찾기 위해 이것을 이용할 수 있다.
public class MaxTemperatureByStationNameUsingDistributedCacheFile extends Configured implements Tool {
static class StationTemperatureMapper extends MapReduceBase 
implements Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
parser.parse(value);
if (parser.isValidTemperature()) {
output.collect(new Text(parser.getStationId()),
new IntWritable(parser.getAirTemperature()));
}
}
}
static class MaxTemperatureReducerWithStationLookup extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
private NcdcStationMetadata metadata;
@Override
public void configure(JobConf conf) {
metadata = new NcdcStationMetadata();
try {
metadata.initialize(new File("stations-fixed-width.txt"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) 
throws IOException {
String stationName = metadata.getStationName(key.toString());
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(new Text(stationName), new IntWritable(maxValue));
}

}
public int run(String[] args) throws IOException {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(StationTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducerWithStationLookup.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new MaxTemperatureByStationNameUsingDistributedCacheFile(), args);
System.exit(exitCode);
}
}

관측소에  최고 온도를 찾는 애플리케이션, 분산된 캐시 파일로써 전달된 룩업 테이블로부터 관측소 이름을 보여주기

이 프로그램은 기상 관측소에서 최고 온도를 찾는 프로그램이다. 그래서 맵퍼(StationTemperatureMapper)는 간단하게 (관측소 ID, 온도) 쌍을 추출한다. 컴바인너를 위하여 우리는 맵의 결과 그룹에서 최고 온도를 가져오기 위해 MaxTemperatureReducer를 재사용한다. 리듀서(MaxTemperatureReducerWithStationLookup)은 컴바이너와 구분되어 최고온도를 찾는 것과 더불어 관측소 이름을 찾기 위해 캐시 파일을 사용한다.
우리는 원본 이름을 사용하여 캐시 파일을 가져오기 위하여 리듀서의 configure() 메소드를 사용한다. 태스크의 워킹디렉토리의 상대적 경로로 찾는다(?)

결과는 다음같이 출력된다.

PEATS RIDGE WARATAH 372
STRATHALBYN RACECOU 410
SHEOAKS AWS 399
WANGARATTA AERO 409


How it works
당신이 job을 실행할 때 하둡은 -files와 -archives 옵션에 의해 지정된 파일들을 잡트랙커의 파일시스템(일반적으로 HDFS)에 복사한다. 그 후 태스크가 실행되기 전에, 태스크트랙커는 잡트랙커파일시스템에서 로컬디스크(캐시)로 파일들을 복사한다. 그래서 태스크는 파일들을 접근할 수 있다. 태스크의 관점에서 파일들은 단지 로컬에 있는것이다. (HDFS로 부터 왔는지는 신경쓸필요 없다.)
태스크트랙커는 캐시에서 각 파일을 사용한 태스크들의 레퍼런스 카운트를 유지한다. 태스크가 실행된 후 파일들의 레퍼런스 카운트는 1 감소하고 0이 되면 삭제된다. 캐시가 특정 사이즈를 넘기면 파일들은 새로운 파일들을 위한 공간을 만들기 위하여 지워진다.-디폴트 10 GB 이다. 캐시 사이즈는 local.cache.size 속서을 설정하여 변경할 수 있다.
이러한 디자인이 같은 태스크트랙커에서 수행되는 같은 job의 다음 태스크들이 캐시에서 파일을 찾을 것이라는 확실한 보장은 못하지만, 그렇게 될 경우가 꽤 많을 것이다.job의 태스크들은 동시에실행되도록 스케쥴된다. 그래서 다른 job이 실행되고 원본 태스크의 파일이 캐시로부터 지워지는 경우가 그다지 많지 않을 것이다.
파일들은 태스크트랙커에 ${mapred.local.dir}/taskTracker/archive 디렉토리에 지역회된다. 그러나 애플리케이션은 이런것을 알필요 없으며 그 파일들은 태스크의 워킹 디렉토리로 부터 심볼릭 링크되어 있다.