1. 数据完整性:任何语言对IO的操作都要保持其数据的完整性。hadoop当然希望数据在存储和处理中不会丢失或损坏。检查数据完整性的常用方法是校验和。
- HDFS的数据完整性:客户端在写或者读取HDFS的文件时,都会对其进行校验和验证,当然我们可以通过在Open()方法读取之前,将false传给FileSystem中的setVerifyCheckSum()来禁用校验和。
- 本地文件系统,hadoop的本地文件系统执行客户端校验,这意味着,在写一个filename文件时,文件系统的客户端以透明方式创建了一个隐藏的文件.filename.crc,块的大小做为元数据存于此,所以读取文件时会进行校验和验证。
- ChecksumFileSystem:可以通过它对其数据验证。
2. 压缩:压缩后能够节省空间和减少网络中的传输。所以在hadoop中压缩是非常重要的。hadoop的压缩格式
压缩格式 |
算法 |
文件扩展名 |
多文件 |
可分割性 |
DEFLATEa |
DEFLATE |
.deflate |
no |
no |
gzip(zip) |
DEFLATE |
.gz(.zip) |
no(yes) |
no(yes) |
bzip2 |
bzip2 |
.bz2 |
no |
yes |
LZO |
LZO |
.lzo |
no |
no |
Compression format Hadoop CompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
可以用ComressionCodec轻松的压缩和解压缩。我们可以用CompressionOutput创建一个CompressionOutputStream(未压缩的数据写到此)。相反,可以用compressionInputStream进行解压缩。
/**
* @param args
*/
public static void main(String[] args) throws Exception
{
// TODO Auto-generated method stub
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration configuration = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
CompressionOutputStream outputStream = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, outputStream, 4096,false);
outputStream.finish();
}
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCompression <input path> " +
"<output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
conf.setJobName("Max temperature with output compression");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
/*[*/conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class,
CompressionCodec.class);/*]*/
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
}
3.序列化:将字节流和机构化对象的转化。hadoop是进程间通信(RPC调用),PRC序列号结构特点:紧凑,快速,可扩展,互操作,hadoop使用自己的序列化格式Writerable,
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
void write(DataOutput out) throws IOException;// 将序列化流写入DataOutput
void readFields(DataInput in) throws IOException; //从DataInput流读取二进制
}
package WritablePackage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.hsqldb.lib.StringUtil;
public class WritableTestBase
{
public static byte[] serialize(Writable writable) throws IOException
{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
writable.write(dataOutputStream);
dataOutputStream.close();
return outputStream.toByteArray();
}
public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException
{
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
DataInputStream dataInputStream = new DataInputStream(inputStream);
writable.readFields(dataInputStream);
dataInputStream.close();
return bytes;
}
public static String serializeToString(Writable src) throws IOException
{
return StringUtils.byteToHexString(serialize(src));
}
public static String writeTo(Writable src, Writable des) throws IOException
{
byte[] data = deserialize(des, serialize(src));
return StringUtils.byteToHexString(data);
}
}
Writerable 类
Java primitive Writable implementation Serialized size (bytes)
boolean BooleanWritable 1
byte ByteWritable 1
int IntWritable 4
VIntWritable 1–5
float FloatWritable 4
long LongWritable 8
VLongWritable 1–9
4. 基于文件的数据结构
package WritablePackage;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
public class SequenceFileWriteDemo
{
private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException
{
// TODO Auto-generated method stub
String url = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),conf);
Path path = new Path(url);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer =null;
try
{
writer = SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass());
for(int i = 0; i< 100; i++)
{
key.set(100-i);
value.set(DATA[i%DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key,value);
}
}
catch (Exception e)
{
// TODO: handle exception
}
finally
{
IOUtils.closeStream(writer);
}
}
}
package WritablePackage;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFileReadDemo
{
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException
{
// TODO Auto-generated method stub
String url = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),conf);
Path path = new Path(url);
SequenceFile.Reader reader = null;
try
{
reader = new SequenceFile.Reader(fs,path,conf);
Writable key =(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value =(Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while(reader.next(key,value))
{
String syncSeen = reader.syncSeen()? "*":"";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
}
finally
{
IOUtils.closeStream(reader);
}
}
}
分享到:
相关推荐
ch04 - Hadoop I/O ch05 - Developing a MapReduce Application ch06 - How MapReduce Works ch07 - MapReduce Types and Formats ch08 - MapReduce Features ch09 - Setting Up a Hadoop Cluster ch10 - ...
Hadoop I/O操作8. 下一代MapReduce: Yarn9. HDFS简介10. HDFS文件结构11. Hive详解12. HBase详解13. Mahout简介14. Pig详解15. ZooKeeper详解16. Avro详解17. Chukwa详解18. Hadoop的常用插件与开发19. Hadoop在...
Hadoop I/O操作8. 下一代MapReduce: Yarn9. HDFS简介10. HDFS文件结构11. Hive详解12. HBase详解13. Mahout简介14. Pig详解15. ZooKeeper详解16. Avro详解17. Chukwa详解18. Hadoop的常用插件与开发19. Hadoop在...
第一章:初识Hadoop 第2章 关于MapReduce 第3章 Hadoop分布式文件系统 第4章 Hadoop I/O 第5章:MapReduce 应用开发 第6章:MapReduce 的工作原理 第7章:MapReduce 的类型与格式
Hadoop I/O Part II. MapReduce Chapter 6. Developing a MapReduce Application Chapter 7. How MapReduce Works Chapter 8. MapReduce Types and Formats Chapter 9. MapReduce Features Part III. Hadoop ...
Hadoop权威指南 第一章 初始Hadoop 第二章 关于MapReduce 第三章 Hadoop分布式文件系统 第四章 Hadoop I/O 第五章 MapReduce应用开发
第一章:初始hadoop 第二章:关于MapReduce 第三章:Hadoop分布式文件系统 第四章:Hadoop I/O
Hadoop权威指南-中文版 第1章 初识Hadoop 第2章 关于MapReduce 第3章 Hadoop分布式文件系统 第4章 Hadoop I/O 第5章 MapReduce应用开发 ...... 第16章 实例分析
Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何构建Hadoop集群,如何管理Hadoop;Pig简介;Hbase简介;Hive简介;ZooKeeper简介;开源工具Sqoop,最后还...
目前这个技术于2018年4月已经在 Facebook 大规模使用了,作业整体的 I/O 提升了两倍,计算效率提高10%。值得高兴的是,这项技术 Facebook 打算共享给社区。 本地址是这项技术的视频介绍。关注Hadoop技术博文...
这种 shuffle 技术有效地将大量小的 shuffle 读请求转换成少并且大的顺序 I/O 请求。目前这个技术于2018年4月已经在 Facebook 大规模使用了,作业整体的 I/O 提升了两倍,计算效率提高10%。值得高兴的是,这项技术...
想想:Hadoop I/O CLI。 “hio”是 Hadoop I/O 的简写,因为在 CLI 上输入越少越好! 这也是对我们在幕后使用的库的一种呼喊。 目录参考 哲学及其运作方式我们努力尽可能地模仿现有的 Unix CLI 工具,例如在 hio ...
第4章 Hadoop I/O 数据完整性 HDFS的数据完整性 LocalFileSystem ChecksumFileSystem 压缩 codec 压缩和输入切分 在MapReduce中使用压缩 序列化 Writable接口 Writable类 实现定制的Writable类型 序列化框架 Avro ...
4. Hadoop I/O . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 83 Data Integrity Data Integrity in HDFS LocalFileSystem ...
包括初识Hadoop、Hadoop基础知识、Hadoop开发环境配置与搭建、Hadoop分布式文件系统、Hadoop的I/O操作、MapReduce编程基础、MapReduce高级编程、初识HBase、初识Hive。通过本书的学习,读者可以较全面地了解Hadoop的...
Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何安装Hadoop集群,如何管理Hadoop;Pig简介;Hbase简介;ZooKeeper简介,最后还提供了丰富的案例分析。
Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何安装Hadoop集群,如何管理Hadoop;Pig简介;Hbase简介;ZooKeeper简介,最后还提供了丰富的案例分析。
Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何安装Hadoop集群,如何管理Hadoop;Pig简介;Hbase简介;ZooKeeper简介,最后还提供了丰富的案例分析。
hadoop的i/o、mapreduce应用程序开发;mapreduce的工作机制:mapreduce的类型和格式;mapreduce的特性:如何安装hadoop集群,如何管理hadoop;pig简介:hbase简介:zookeeper简介,最后还提供了丰富的案例分析。 ...
4. Hadoop I/O . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 83 Data Integrity 83 Data Integrity in HDFS 83 LocalFileSystem 84 ...