使用DistributedCache有一个前提,就是进行联结的数据有一个足够小,可以装入内存中。注意我们可以从代码中看出它是如何被装入内存中的,因此,我们也可以在装入的过程中进行过滤。但是值得指出的是,如果文件很大,那么装入内存中也是很费时的。
DistributedCache的原理是将小的那个文件复制到所有节点上。
我们使用DistributedCache.addCacheFile()来设定要传播的文件,然后在mapper的初始化方法setup中取用DistributedCache.getLocalCacheFiles()方法获取该文件并装入内存中。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DistributedCacheJoin extends Configured implements Tool{
public static class MyMapper extends Mapper<Text,Text,Text,Text>{
private HashMap<String,String> joinData = new HashMap<String,String>();
public void map(Text key, Text value, Context context)
throws IOException,InterruptedException{
String joinValue = joinData.get(key.toString());//注意要toString(),hashcode()你懂的
if(null != joinValue){
context.write(key, new Text(joinValue + ","+ value.toString()));
}
}
public void setup(Context context){
try {
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(null != cacheFiles && cacheFiles.length > 0){
String line;
String []tokens;
BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));
try{
while((line = br.readLine()) != null){
tokens = line.split(",", 2);
joinData.put(tokens[0], tokens[1]);
}
}finally{
br.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf,"DistributedCacheJoin");
job.setJarByClass(DistributedCacheJoin.class);
DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());
Path in = new Path(args[1]);
Path out = new Path(args[2]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.getConfiguration()
.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
//在新API 中不再是key.value.separator.in.input.line,你可以在源码KeyValueLineRecordReader.java中看见。
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String args[]) throws Exception{
int res = ToolRunner.run(new Configuration(), new DistributedCacheJoin(), args);
System.exit(res);
}
}
参考:
[1] <<hadoop in action>>
分享到:
相关推荐
hadoop使用distcp问题解决 然后用distcp从1.0.3的集群拷数据到2.0.1的集群中。 遇到问题处理
Hadoop使用常见问题以及解决方法,简单实用
5.2.1 Reduce侧的联结 5.2.2 基于DistributedCache的复制联结 5.2.3 半联结:map侧过滤后在reduce侧联结 5.3 创建一个Bloom filter 5.3.1 Bloom filter做了什么 5.3.2 实现一个Bloom filter 5.3.3 Hadoop 0.20...
hadoop 使用 maven3.3 仓库 5hadoop 使用 maven3.3 仓库 7
全面教你在Linux上使用hadoop 启动与关闭 启动HADOOP 1. 进入HADOOP_HOME目录。 2. 执行sh bin/start-all.sh 单个起 :/opt/hadoop-1.0.3/bin/hadoop-daemon.sh start datanode /opt/hadoop-1.0.3/bin/hadoop-...
在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path...解决方案:下载本资源解压将hadoop.dll和winutils.exe文件复制到hadoop2.7.3的bin目录下即可解决。
Hadoop使用常见问题以及解决方法.doc Hadoop使用常见问题以及解决方法.doc
hadoop 使用 maven3.3 仓库3
hadoop 使用 maven3.3 仓库 5hadoop 使用 maven3.3 仓库 5
hadoop 使用 maven3.3 仓库 1
hadoop 使用 maven3.3 仓库 4
ES和HADOOP使用问题和需求
hadoop 使用 maven3.3 仓库 6
网上下的hadoop权威指南,通过软件进行了图像识别,文件体积变小到三分之一,内容完整保留,并能进行复制粘贴。
windows平台上,使用Eclipse hadoop插件,开发基于hdfs文件的中文分词统计和排序功能,以唐诗三百首为例,找出其中使用频率最高的词语。
hadoop 使用 maven3.3 仓库 7
安装hadoop和Eclipse后,想在Eclipse中使用hadoop,必须先安装插件,我安装的是hadoop-eclipse-plugin-2.5.2,亲测有效。
Hadoop是大数据时代不可或缺的一个分布式系统基础架构,用户可以轻松地在Hadoop上开发和运行处理...那么对于初学者来说怎么能够更快的掌握Hadoop的使用技巧呢?本电子书汇聚了业界知名专家撰写的精品博文,分享给大家。
项目负责人tomwhite透过本书详细阐述了如何使用hadoop构建可靠、可伸缩的分布式系统,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装和运行hadoop集群。 本书结合丰富的案例来展示如何用hadoop...
win10下hadoop2.7.2安装包及hadoop.dll和winutils.exe,解决win10下安装hadoop无法使用问题