`
cloudtech
  • 浏览: 4606711 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
文章分类
社区版块
存档分类
最新评论

[Hadoop]使用DistributedCache进行复制联结

 
阅读更多

使用DistributedCache有一个前提,就是进行联结的数据有一个足够小,可以装入内存中。注意我们可以从代码中看出它是如何被装入内存中的,因此,我们也可以在装入的过程中进行过滤。但是值得指出的是,如果文件很大,那么装入内存中也是很费时的。

DistributedCache的原理是将小的那个文件复制到所有节点上。

我们使用DistributedCache.addCacheFile()来设定要传播的文件,然后在mapper的初始化方法setup中取用DistributedCache.getLocalCacheFiles()方法获取该文件并装入内存中。


import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class DistributedCacheJoin extends Configured implements Tool{

	
	public static class MyMapper extends Mapper<Text,Text,Text,Text>{
		private HashMap<String,String> joinData = new HashMap<String,String>();
		public void map(Text key, Text value, Context context)
			throws IOException,InterruptedException{
			String joinValue = joinData.get(key.toString());//注意要toString(),hashcode()你懂的
			if(null != joinValue){
				context.write(key, new Text(joinValue + ","+ value.toString()));
			}
		}
		
		public void setup(Context context){
			try {
				Path [] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
				if(null != cacheFiles  && cacheFiles.length > 0){
					String line;
					String []tokens;
					BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));
					try{
						while((line = br.readLine()) != null){
							tokens = line.split(",", 2);
							joinData.put(tokens[0], tokens[1]);
							
						}
					}finally{
						br.close();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = new Job(conf,"DistributedCacheJoin");
		job.setJarByClass(DistributedCacheJoin.class);
		
		DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());
		Path in = new Path(args[1]);
		Path out = new Path(args[2]);
		
		
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		job.setMapperClass(MyMapper.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(KeyValueTextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.getConfiguration()
			.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
		//在新API 中不再是key.value.separator.in.input.line,你可以在源码KeyValueLineRecordReader.java中看见。
		System.exit(job.waitForCompletion(true)?0:1);
		
		return 0;
	}
	
	public static void main(String args[]) throws Exception{
		int res = ToolRunner.run(new Configuration(), new DistributedCacheJoin(), args);
		System.exit(res);
	}
	
}

参考:

[1] <<hadoop in action>>

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics