传统的的计算机系统通过I/O操作与外界交流,,Hadoop的I/O由传统的I/O系统发展而来,但是又有些不同,Hadoop需要处理P、T级别的数据,所以在org.apache.hadoop.io包中包含了一些面向海量数据处理的基本输入输出工具,本文会对其中的序列化和压缩进行研究。
1 序列化
对象的序列化用于将对象编码成一个字节流,以及从字节流中重新构建对象。将一个对象编码成一个字节流称为序列化对象(Serializing),相反的处理过程称为反序列化。
序列化有三种主要的用途:
A、作为一种持久化格式化:一个对象被序列化以后,它的编码可以被储存到磁盘上,供以后反序列化使用。
B、作为一种通信数据格式:序列化结果可以从一个正在运行的虚拟机,通过网络被传递到另外一个虚拟机上。
C、作为一种拷贝、克隆机制:将对象序列化到内存的缓存区中,然后通过反序列化,可以得到一个已存对象进行深拷贝的新对象。
在分布式数据处理中,主要使用上面提到的前两种功能:数据持久化和通信数据化格式。在分析Hadoop的序列化机制前,先介绍一下Java內建的序列化机制。
1.1 Java內建序列化机制
Java序列化机制将对象转换为连续的byte数据,这些数据可以在日后还原为原先的对象状态,该机制还能自动处理不同操作系统的差异,在windows上序列化的Java对象,可以在Unix系统上被重建出来,不需要担心不同机器上的数据表示方法,也不需要担心字节排列次序,如大端、小端或者其他细节。
在Java中,让一个类的实例可被序列化非常简单,只需在类声明中加入implements序列化即可。Serializable接口是一个标志,不具备任何成员函数,其定义如下:
public interface Serializable{}
1.2 Hadoop序列化机制
与Java序列化机制不同,Hadoop的序列化机制通过调用对象的write()方法,将对象序列化到流中,反序列化的过程也类似,通过对象的readFields()方法,从流中读取数据。值得一提的是,Java序列化机制中,反序列化过程中会不断地创建新的对象,但在Hadoop的序列化机制的反序列化机制中,用户可以复用对象。
1.3 Hadoop序列化机制的特征
对于处理大数据的Hadoop平台,其序列化机制需要具有如下特征:
A、紧凑:由于带宽是Hadoop集群中最稀缺的资源,一个紧凑的序列化机制可以充分利用数据中心的带宽。
B、快速:在进程间通信(包括MapReduce过程涉及的数据交互)时会大量使用序列化机制,因此,必须尽量减少序列化和反序列化的开销。
C、可扩展:随着系统的发展,系统间通信的协议会升级。类的定义会发生变化,序列化机制需要支持这些升级和变化。
D、互操作:可以支持不同开发语言的通信,如C++和Java间的通信。这样的通信可以通过文件(需要精心设计文件的格式)或者后面介绍的IPC机制实现。
1.4 Hadoop Writable机制
为了支持1.3描述的特性,Hadoop引入了org.apache.hadoop.io.Writable接口,作为所有可序列化对象必须支持的接口。Writable机制紧凑、快速,该接口不是标志性接口,它包含了两个方法:
ublic interface Writable {
/**
* 输出(序列化)对象到流中
*
* @param out 序列化的结果保存到流中
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* 从流中读取(反序列化)对象
*
*
* 为了效率,请尽可能复用现有的对象
*
* @param in 从该流中读取数据
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
Hadoop序列化机制中还包括几个重要接口:WritableComparable、RawComparator、和WritableComparator。
WritableComparable,顾名思义,它提供类型比较的能力,这对MapReduce至关重要。该接口继承自Writable接口和Comparable接口。其中Comparable接口用于类型进行比较。
RawComparator接口具有高效比较能力。该接口允许执行者比较流中读取未被反序列化为对象的记录,从而省去了创建对象的所有开销。
WritableComparator是WritableComparable和RawComparator的一个通用实现,先调用RawComparator的compare()方法。然后调用对象的compare()方法。
1.5 典型的Writable类详解
Hadoop将很多Writable类归入了org.apache.hadoop.io包。本节主要讲解Java基本类和ObjectWritable的实现。
1.5.1 Java基本类型的Writable封装
目前Java基本类型对应的Writable封装见下表,所有这些Writable类都继承自WritableComparable,也就是说他们是可以比较的,同时,它们都有get与set方法,用于获得和设置封装的值。
Java基本类型 | Writable | 序列化后的长度 |
布尔型 | BooleanWritable | 1 |
字节型 | ByteWritable | 1 |
整型 | IntWritable VIntWritable | 4 1-5 |
浮点型 | FloatWritable | 4 |
长整型 | LongWritable VLongWritable | 8 1-9 |
双精度浮点型 | DoubleWritable | 8 |
1.5.2 ObjectWritable类封装
针对Java基本类型、字符串、枚举、Writable、空值、Writable的其他子类,ObjectWritable提供了一个封装,适用于字段需要使用多种类型。
该类有三个成员变量,包括被封装的对象实例instance、该对象运行时类的Class对象和Configuration对象。
1.6 Hadoop序列化框架
大部分的Mapreduce程序都是用Writable键-值作为输入输出,但这并不是HadoopAPI指定的,其他序列化机制也能和Hadoop配合,并应用于MapReduce中。
目前除了前面介绍的Java序列化机制和Hadoop的Writable机制,还流行其他序列化框架。列表如下:
A、Avro:是一个数据序列化系统,用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷、快速地处理大量数据,动态语言友好,AVRO提供的机制使动态语言可以方便处理avro数据。
B、Thrift:是一个可伸缩、跨语言的服务开发框架,由FaceBook贡献给开源社区,是Facebook的核心框架之一。
C、Google Protocol Buffer是Google内部的混合语言数据标准,提供了一种轻便高效的结构化数据存储格式。
2 压缩
一般说来,计算机处理的数据都存在一些冗余度,同时数据中间,尤其是相邻数据间存在着相关性,所以可以通过一些有别于原始编码的特殊编码方式来保存数据,使数据占用的存储空间比较小,这个过程一般叫压缩。和压缩对应的概念是解压缩,就是将被压缩的数据从特殊编码方式还原为原始数据的过程。
压缩广泛应用于海量数据处理中,对数据文件进行压缩,可以有效减少存储文件所需的空间,并加快数据在网络上或者磁盘上的传输速度。在Hadoop中,压缩应用于文件存储,Map阶段和Reduce阶段的数据交换等情景。
2.1 Hadoop压缩简介
Hadoop作为一个较通用的海量数据处理平台,在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。
所有的压缩算法都会考虑时间和空间的权衡,更快的压缩和解压缩速度通常会耗费更多的空间。
当使用MapReduce处理压缩文件时,需要考虑压缩文件的可分割性。考虑我们需要对保持在HDFS上的一个大小为1GB的文本文件进行处理,当前HDFS的数据块大小为64MB的情况下,该文件被存储为16块,对应的MapReduce作业将会将该文件分为16个输入分片,提供给16个独立的Map任务进行处理。
但如果该文件是一个gzip格式的压缩文件,这时,MapReduce作业不能够将该文件分为16个分片,因为不可能从gzip数据流中的某个点开始,进行数据解压。
但如果该文件是一个bzip2格式的压缩文件,那么,MapReduce作业可以通过bzip2格式压缩文件中的块,将输入划分为若干输入分片,并从块开始处于开始解压缩数据。bzip2格式压缩文件中,块与块之间提供了一个48位的同步标记,因此bzip2支持数据分割。
压缩格式 | UNIX工具 | 算法 | 文件扩展名 | 支持多文件 | 可分割 |
deflate | 无 | DEFLATE | .deflate | 否 | 否 |
gzip | gzip | DEFLATE | .gz | 否 | 否 |
zip | zip | DEFLATE | .zip | 是 | 是 |
bzip | bzip2 | bzip2 | .bz2 | 否 | 是 |
LZO | lzop | LZO | .lzo | 否 | 否 |
为了支持多种压缩解压缩算法,Hadoop引入了编码/×××。与Hadoop序列化框架类似,编码/×××也是使用抽象工厂的设计模式。目前,Hadoop支持的编码/×××如下表所示:
压缩格式 | 对应的编码/××× |
DEFLATE | org.apache.hadoop.io.compress.DeflateCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
Bzip | org.apache.hadoop.io.compress.BZip2Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
同一个压缩方法对应的压缩、解压缩相关工具,都可以通过相应的编码×××获得。
2.2 Hadoop压缩框架
2.2.1 编码/×××
CompressionCodec接口实现了编码/×××,使用的是抽象工厂的设计模式,该接口提供了一系列方法,用于创建特定压缩算法的相关设施。
该接口中的方法很对称,一个压缩功能总对应着一个解压缩功能,其中与压缩有关的方法包括:
A、createOutputStream()用于通过底层输出流创建对应压缩算法的压缩流,重载的createOutputStream()方法可使用压缩器创建压缩流。
B、createCompressor()用于创建压缩算法对应的压缩器。
该接口中还提供了获取对应文件扩展名的方法getDafaultExtension(),该方法返回扩展名的字符串。
CompressionCodecFactory是Hadoop压缩框架的另一个类,它应用了工厂方法,使用者可以通过它提供的方法获得CompressionCodec。
2.2.2 压缩器/解压器
Compressor可以插入压缩输出流的实现中,提供具体的压缩功能。该类通过setInput()方法接收数据到内部缓冲区,自然可以多次调用setInput()方法,但是内部缓冲区总是会被写满。可以通过needInput()的返回值,如果是false,表明缓冲区已经满了,这时必须通过compress()方法获取压缩后的数据,释放缓冲区空间。
为了提高压缩效率,并不是每次用户调用setInout()方法,压缩器就会立即工作,所以为了通知压缩器所有数据已经写入,必须使用finish()方法。finish()调用结束后,压缩器缓冲区中保持的已经压缩的数据,可以继续通过compress()方法获得。至于要判断压缩器是否还有未读取的压缩数据,则需要利用finished()方法来判断。
将inbuf分成几部分,通过setInput()方法送入压缩器,而在finish()调用结束后,通过finished()循序怕段压缩器是否还有未读取得数据,并使用compress()方法获取数据。
end()方法用于关闭解压缩器并放弃所有未处理的输入;
reset()方法用于重置压缩器,以处理新的输入数据集合;
reinit()方法更进一步允许使用Hadoop的配置系统,重置并重新配置压缩器。
Decompressor提供具体的解压功能并插入CompressionInputStream中。
2.2.3 压缩流/解压缩流
CompressionOutputStream:该类继承OutputStream,也是个抽象类,该类实现了close()和flush()方法,但用于输出数据的write()方法、用于结束压缩过程并输入写到底层流的finish()方法和重置压缩状态的resetState()方法还是抽象方法,需要该类的子类去实现。
CompressorStream使用压