在map之前会对要处理的文件进行拆分,按照定义的格式进行都写操作。主要是在InputFormat中,
InputFormat是一个抽象类,主要有两个抽象方法:
1,public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
确认输入的且分原则
2, public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException,InterruptedException;
按照指定格式读取数据
在起子类中需要实现这两个方法:
FileInputFormat:
配置FileInputFormat的参数:
1,mapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat
public static void setInputPathFilter(Job job,
Class<? extends PathFilter> filter) {
job.getConfiguration().setClass("mapred.input.pathFilter.class", filter,
PathFilter.class);
}
2, mapred.min.split.size:最小的划分大
public static void setMinInputSplitSize(Job job,
long size) {
job.getConfiguration().setLong("mapred.min.split.size", size);
}
3, mapred.max.split.size:最大的划分大小;
public static void setMaxInputSplitSize(Job job,
long size) {
job.getConfiguration().setLong("mapred.max.split.size", size);
}
4, mapred.input.dir:输入路径,用逗号做分割。
conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
FileInputFormat实现了InputFormat的getSplits()方法,将输入的文件划分为InputSplit(输入块)。
/**
* Generate the list of files and make them into FileSplits.
*/
public List<InputSplit> getSplits(JobContext job
) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
for (FileStatus file: listStatus(job)) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
文件的划分是依据maxsize,BlockSize,minsize来的,
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
另一个方法是:protected List<FileStatus> listStatus(JobContext job ) throws IOException
递归获取输入数据中的文件,其中的job包含前面的那几个参数,是系统的配置Configuration
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDir()) {
for(FileStatus stat: fs.listStatus(globStat.getPath(),
inputFilter)) {
result.add(stat);
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
切分之后有RecordReader来读取,
FileInputFormat没有对应的RecordReader,他的两个子类:
SequenceFileInputFormat二进制形式存放的键/值文件
TextInputFormat是文本文件的处理,
他们的createRecordReader()分别返回SequenceFileRecordReader,LineRecordReader实例
hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR
= 13)或换行符(LF = 10)为行分隔符。
分享到:
相关推荐
基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf...
MapReduce Shuffle 过程图解 Xmind文件
了解map和reduce工作原理,以及排序,分组,分区设置,有详细的注释,方便查看学习,适合入门初学者练手
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件 网址:https://blog.csdn.net/chenwewi520feng/article/details/130456088 本文的前提是hadoop环境正常。 本文最好和MapReduce操作常见...
对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并, 并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例 供参考。 输入文件 A 的样例如下:
针对目前物联网和云计算技术结合后,物联网RFID产生的小型数据致使云计算中MapReduce算法产生运算瓶颈问题进行了研究。运用PML和EPC编码技术保证了数据存储的完整性,采用快速排序和改进XGrind压缩技术对MapReduce...
用于在idea编写hadoop(mapreduce)程序时的maven pom文件。可直接使用搭建mapreduce工程。
包中含有hadoop-eclipse-plugin-2.6.0.jar ,hadoop.dll,winutils.exe 三个文件,是windows 运行mapreduce 的配置文件。hadoop2.8.1亲测可用
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
本文对MapReduce中的数据处理模型进行整体说明,分别对输入和输出的各种类及可口进行讲解,从而可以处理比如文件不分片,非文本文件,多个文件合并等问题
对应博客:MapReduce 运行原理(万字长篇 原理 + 案例) 链接:https://blog.csdn.net/weixin_47243236/article/details/121581689?spm=1001.2014.3001.5501
4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。
大数据Mapreduce(1)编程实现文件合并和去重操作.docx
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
任务执行基本流程 基本流程图见下一页 首先输入收据文件被Mapreduce库函数分割成M个split集。用户定义的程序被 拷贝到机群中,其中一个是master,其它的都是worker。M个map任务和R个reduc e任务将被分配。Master...
hadoop mapreduce开发需要的pom文件,复制内容后,点击编译器的import导入即可使用
MapReduce--->实现简单的数据清洗需要的数据文件
https://blog.csdn.net/qq_39063526/article/details/105968494 本文案例中用到的文件
MapReduce2.0程序设计,包括编程模型介绍,编程接口介绍,Java编程与多语言编程的理论与实践
单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数。本节通过单词计数实例来阐述采用 MapReduce 解决...