大数据技术原理与应用(第2版)
第2章 大数据处理架构 hadoop
2.1 概述
Hadoop的核心是分布式文件系统(Hadoop Distributed File System,HDFS)和MapReduce。HDFS是针对谷歌文件系统(Google File System,GFS)的开源实现,是面向普通硬件环境的分布式文件系统,具有较高的读写速度、很好的容错性和可伸缩性,支持大规模数据的分布式存储,其冗余数据存储的方式很好地保证了数据的安全性。MapReduce是针对谷歌 MapReduce 的开源实现,允许用户在不了解分布式系统底层细节的情况下开发并行应用程序,采用MapReduce来整合分布式文件系统上的数据,可保证分析和处理数据的高效性。借助于Hadoop,程序员可以轻松地编写分布式并行程序,将其运行于廉价计算机集群上,完成海量数据的存储与计算。
它具有以下几个方面的特性。
- 高可靠性
- 高效性。
- 高可扩展性
- 高容错性
- 成本低。
- 运行在Linux平台上。
- 支持多种编程语言。
2.2 Hadoop生态系统
除了核心的HDFS和MapReduce以外,Hadoop生态系统还包括Zookeeper、HBase、Hive、Pig、Mahout、Sqoop、Flume、Ambari等功能组件。需要说明的是,Hadoop 2.0中新增了一些重要的组件,即HDFS HA和分布式资源调度管理框架YARN等,
Hadoop分布式文件系统(Hadoop Distributed File System,HDFS)是Hadoop项目的两大核心之一,是针对谷歌文件系统(Google File System,GFS)的开源实现。
HBase是一个提供高可靠性、高性能、可伸缩、实时读写、分布式的列式数据库,一般采用HDFS作为其底层数据存储。HBase是针对谷歌BigTable的开源实现,二者都采用了相同的数据模型,具有强大的非结构化数据存储能力。HBase与传统关系数据库的一个重要区别是,前者采用基于列的存储,而后者采用基于行的存储。HBase具有良好的横向扩展能力,可以通过不断增加廉价的商用服务器来增加存储能力。
Hadoop MapReduce是针对谷歌MapReduce的开源实现。MapReduce是一种编程模型,用于大规模数据集(大于1 TB)的并行运算,它将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数——Map 和 Reduce 上,并且允许用户在不了解分布式系统底层细节的情况下开发并行应用程序,并将其运行于廉价计算机集群上,完成海量数据的处理。
Hive是一个基于Hadoop的数据仓库工具,可以用于对Hadoop文件中的数据集进行数据整理、特殊查询和分析存储。
Pig是一种数据流语言和运行环境,适合于使用Hadoop和MapReduce平台来查询大型半结构化数据集。
Mahout是Apache软件基金会旗下的一个开源项目,提供一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建智能应用程序。
Zookeeper是针对谷歌Chubby的一个开源实现,是高效和可靠的协同工作系统,提供分布式锁之类的基本服务(如统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等),用于构建分布式应用,减轻分布式应用程序所承担的协调任务。
Flume是Cloudera提供的一个高可用的、高可靠的、分布式的海量日志采集、聚合和传输的系统
Sqoop是SQL-to-Hadoop的缩写,主要用来在Hadoop和关系数据库之间交换数据,可以改进数据的互操作性。通过Sqoop可以方便地将数据从MySQL、Oracle、PostgreSQL等关系数据库中导入Hadoop(可以导入HDFS、HBase或Hive),或者将数据从Hadoop导出到关系数据库,使得传统关系数据库和Hadoop之间的数据迁移变得非常方便。
Apache Ambari是一种基于Web的工具,支持Apache Hadoop集群的安装、部署、配置和管理。
第3章 分布式文件系统 HDFS
3.1 分布式文件系统
相对于传统的本地文件系统而言,分布式文件系统(Distributed File System)是一种通过网络实现文件在多台主机上进行分布式存储的文件系统。分布式文件系统的设计一般采用“客户机/服务器”(Client/Server)模式,客户端以特定的通信协议通过网络与服务器建立连接,提出文件访问请求,客户端和服务器可以通过设置访问权来限制请求方对底层数据存储块的访问。目前,已得到广泛应用的分布式文件系统主要包括GFS和HDFS等,后者是针对前者的开源实现。
与普通文件系统类似,分布式文件系统也采用了块的概念,文件被分成若干个块进行存储,块是数据读写的基本单元,只不过分布式文件系统的块要比操作系统中的块大很多。比如,HDFS默认的一个块的大小是64 MB。与普通文件不同的是,在分布式文件系统中,如果一个文件小于一个数据块的大小,它并不占用整个数据块的存储空间。
这些节点分为两类:一类叫“主节点”(Master Node),或者也被称为“名称节点”(NameNode);另一类叫“从节点”(Slave Node),或者也被称为“数据节点”(DataNode)。名称节点负责文件和目录的创建、删除和重命名等,同时管理着数据节点和文件块的映射关系,因此客户端只有访问名称节点才能找到请求的文件块所在的位置,进而到相应位置读取所需文件块。数据节点负责数据的存储和读取,在存储时,由名称节点分配存储位置,然后由客户端把数据直接写入相应数据节点;在读取时,客户端从名称节点获得数据节点和文件块的映射关系,然后就可以到相应位置访问文件块。数据节点也要根据名称节点的命令创建、删除数据块和冗余复制。
3.2 HDFS简介
总体而言,HDFS要实现以下目标:
- 兼容廉价的硬件设备。
- 流数据读写。
- 大数据集。
- 简单的文件模型。
- 强大的跨平台兼容性
- 不适合低延迟数据访问
- 无法高效存储大量小文件
首先,HDFS 采用名称节点(NameNode)来管理文件系统的元数据,这些元数据被保存在内存中,从而使客户端可以快速获取文件实际存储位置。通常,每个文件、目录和块大约占150字节,如果有1 000万个文件,每个文件对应一个块,那么,名称节点至少要消耗3 GB的内存来保存这些元数据信息
不支持多用户写入及任意修改文件。HDFS只允许一个文件有一个写入者,不允许多个用户对同一个文件执行写操作,而且只允许对文件执行追加操作,不能执行随机写操作。
3.3 HDFS的相关概念
本节介绍HDFS中的相关概念,包括块、名称节点、数据节点、第二名称节点。
HDFS也同样采用了块的概念,默认的一个块大小是64 MB。在HDFS中的文件会被拆分成多个块,每个块作为独立的单元进行存储。
HDFS这么做的原因,是为了最小化寻址开销。
当客户端需要访问一个文件时,首先从名称节点获得组成这个文件的数据块的位置列表,然后根据位置列表获取实际存储各个数据块的数据节点的位置,最后数据节点根据数据块信息在本地Linux 文件系统中找到对应的文件,并把数据返回给客户端。
块的大小也不宜设置过大,因为,通常MapReduce中的Map任务一次只处理一个块中的数据,如果启动的任务太少,就会降低作业并行处理速度。
在 HDFS 中,名称节点(NameNode)负责管理分布式文件系统的命名空间(Namespace),保存了两个核心的数据结构(见图3-3),即FsImage和EditLog。FsImage用于维护文件系统树以及文件树中所有的文件和文件夹的元数据,操作日志文件EditLog中记录了所有针对文件的创建、删除、重命名等操作。
名称节点记录了每个文件中各个块所在的数据节点的位置信息,但是并不持久化存储这些信息,而是在系统每次启动时扫描所有数据节点重构得到这些信息
名称节点在启动时,会将 FsImage 的内容加载到内存当中,然后执行 EditLog 文件中的各项操作,使得内存中的元数据保持最新。这个操作完成以后,就会创建一个新的 FsImage 文件和一个空的EditLog文件。名称节点启动成功并进入正常运行状态以后,HDFS中的更新操作都会被写入到 EditLog,而不是直接写入 FsImage,这是因为对于分布式文件系统而言,FsImage文件通常都很庞大(一般都是GB级别以上),如果所有的更新操作都直接往FsImage文件中添加,那么系统就会变得非常缓慢。相对而言,EditLog通常都要远远小于FsImage,更新操作写入到EditLog是非常高效的。名称节点在启动的过程中处于“安全模式”,只能对外提供读操作,无法提供写操作。启动过程结束后,系统就会退出安全模式,进入正常运行状态,对外提供读写操作。
数据节点(DataNode)是分布式文件系统HDFS的工作节点,负责数据的存储和读取,会根据客户端或者名称节点的调度来进行数据的存储和检索,并且向名称节点定期发送自己所存储的块的列表。每个数据节点中的数据会被保存在各自节点的本地Linux文件系统中。
当名称节点重启时,需要将FsImage加载到内存中,然后逐条执行EditLog中的记录,使得FsImage保持最新。可想而知,如果EditLog很大,就会导致整个过程变得非常缓慢,使得名称节点在启动过程中长期处于“安全模式”,无法正常对外提供写操作,影响了用户的使用。
为了有效解决EditLog逐渐变大带来的问题,HDFS在设计中采用了第二名称节点(Secondary NameNode)。第二名称节点是HDFS架构的一个重要组成部分,具有两个方面的功能:首先,可以完成EditLog与FsImage的合并操作,减小EditLog文件大小,缩短名称节点重启时间;其次,可以作为名称节点的“检查点”,保存名称节点中的元数据信息。具体如下。
- (1)EditLog与FsImage的合并操作。
- (2)作为名称节点的“检查点”。
3.4 HDFS体系结构
本节首先简要介绍HDFS的体系结构,然后介绍HDFS的命名空间管理、通信协议、客户端,最后指出HDFS体系结构的局限性。
HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群包括一个名称节点和若干个数据节点(见图3-5)。名称节点作为中心服务器,负责管理文件系统的命名空间及客户端对文件的访问。集群中的数据节点一般是一个节点运行一个数据节点进程,负责处理文件系统客户端的读/写请求,在名称节点的统一调度下进行数据块的创建、删除和复制等操作。每个数据节点的数据实际上是保存在本地Linux文件系统中的。每个数据节点会周期性地向名称节点发送“心跳”信息,报告自己的状态,没有按时发送心跳信息的数据节点会被标记为“宕机”,不会再给它分配任何I/O请求。
HDFS的命名空间包含目录、文件和块。命名空间管理是指命名空间支持对HDFS中的目录、文件和块做类似文件系统的创建、修改、删除等基本操作。在当前的HDFS体系结构中,在整个HDFS 集群中只有一个命名空间,并且只有唯一一个名称节点,该节点负责对这个命名空间进行管理。
HDFS是一个部署在集群上的分布式文件系统,因此很多数据需要通过网络进行传输。所有的HDFS通信协议都是构建在TCP/IP协议基础之上的。客户端通过一个可配置的端口向名称节点主动发起TCP连接,并使用客户端协议与名称节点进行交互。名称节点和数据节点之间则使用数据节点协议进行交互。客户端与数据节点的交互是通过 RPC(Remote Procedure Call)来实现的。在设计上,名称节点不会主动发起RPC,而是响应来自客户端和数据节点的RPC请求。
明显的局限性
- (1)命名空间的限制。名称节点是保存在内存中的,因此名称节点能够容纳对象(文件、块)的个数会受到内存空间大小的限制。
- (2)性能的瓶颈
- (3)隔离问题。
- (4)集群的可用性
3.5 HDFS的存储原理
本节介绍HDFS的存储原理,包括数据的冗余存储、数据存取策略、数据错误与恢复。
- 1.数据存放为了提高数据的可靠性与系统的可用性,以及充分利用网络带宽,HDFS采用了以机架(Rack)为基础的数据存放策略。HDFS 默认的冗余复制因子是 3,每一个文件块会被同时保存到 3 个地方,其中,有两份副本放在同一个机架的不同机器上面,第三个副本放在不同机架的机器上面,这样既可以保证机架发生异常时的数据恢复,也可以提高数据读写性能。
- 2.数据读取HDFS提供了一个API可以确定一个数据节点所属的机架ID,客户端也可以调用API获取自己所属的机架ID
- 3.数据复制HDFS的数据复制采用了流水线复制的策略,大大提高了数据复制过程的效率。
HDFS具有较高的容错性,可以兼容廉价的硬件,它把硬件出错看成一种常态,而不是异常,并设计了相应的机制检测数据错误和进行自动恢复,主要包括以下3种情形。
(1).名称节点出错
Hadoop采用两种机制来确保名称节点的安全:第一,把名称节点上的元数据信息同步存储到其他文件系统(比如远程挂载的网络文件系统NFS)中;第二,运行一个第二名称节点,当名称节点宕机以后,可以把第二名称节点作为一种弥补措施,利用第二名称节点中的元数据信息进行系统恢复,但是从前面对第二名称节点的介绍中可以看出,这样做仍然会丢失部分数据。
(2).数据节点出错
每个数据节点会定期向名称节点发送“心跳”信息,向名称节点报告自己的状态。当数据节点发生故障,或者网络发生断网时,名称节点就无法收到来自一些数据节点的“心跳”信息,这时这些数据节点就会被标记为“宕机”,节点上面的所有数据都会被标记为“不可读”,名称节点不会再给它们发送任何I/O请求
(3).数据出错
客户端在读取到数据后,会采用md5和sha1对数据块进行校验,以确定读取到正确的数据。
3.7 HDFS编程实践
关于HDFS的Shell命令有一个统一的格式。hadoop command [genericOptions][commandOptions]
HDFS有很多命令,其中fs命令可以说是HDFS最常用的命令,利用fs命令可以查看HDFS文件系统的目录结构、上传和下载数据、创建文件等。该命令的用法如下。hadoop fs [genericOptions][commandOptions]
第三篇 大数据处理与分析
分布式并行编程框架MapReduce可以大幅提高程序性能,实现高效的批量数据处理。基于内存的分布式计算框架 Spark,是一个可应用于大规模数据处理的快速、通用引擎
流计算框架Storm是一个低延迟、可扩展、高可靠的处理引擎
大数据中包括很多图结构数据,但是MapReduce不适合用来解决大规模图计算问题,因此新的图计算框架应运而生,Pregel 就是其中一种具有代表性的产品
第7章 MapReduce
大数据时代除了需要解决大规模数据的高效存储问题,还需要解决大规模数据的高效处理问题。
MapReduce是一种并行编程模型,用于大规模数据集(大于1 TB)的并行运算,它将复杂的、运行于大规模集群上的并行计算过程高度抽象到两个函数:Map 和 Reduce。
7.1 概述
MapReduce以及它的核心函数Map和Reduce。
谷歌公司最先提出了分布式并行编程模型MapReduce,Hadoop MapReduce是它的开源实现。谷歌的MapReduce运行在分布式文件系统GFS上,与谷歌类似,Hadoop MapReduce运行在分布式文件系统HDFS上。
谷歌在2003年~2006年连续发表了3篇很有影响力的文章,分别阐述了GFS、MapReduce和BigTable的核心思想。其中,MapReduce是谷歌公司的核心计算模型。MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到两个函数:Map和Reduce,这两个函数及其核心思想都源自函数式编程语言。
在MapReduce中,一个存储在分布式文件系统中的大规模数据集会被切分成许多独立的小数据块,这些小数据块可以被多个Map任务并行处理。MapReduce框架会为每个Map任务输入一个数据子集,Map任务生成的结果会继续作为Reduce任务的输入,最终由Reduce任务输出最后结果,并写入分布式文件系统。特别需要注意的是,适合用MapReduce来处理的数据集需要满足一个前提条件:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
MapReduce编程之所以比较容易,是因为程序员只要关注如何实现Map和Reduce函数,而不需要处理并行编程中的其他各种复杂问题,如分布式存储、工作调度、负载均衡、容错处理、网络通信等,这些问题都会由MapReduce框架负责处理。
Map函数和Reduce函数都是以作为输入,按一定的映射规则转换成另一个或一批进行输出(见表7-1)
7.2 MapReduce的工作流程
理解MapReduce的工作流程,是开展MapReduce编程的前提。本节首先给出工作流程概述,并阐述MapReduce的各个执行阶段,最后对MapReduce的核心环节——Shuffle过程进行详细剖析。
大规模数据集的处理包括分布式存储和分布式计算两个核心环节
MapReduce的核心思想可以用“分而治之”来描述,如图7-1所示,也就是把一个大的数据集拆分成多个小数据块在多台机器上并行处理,也就是说,一个大的MapReduce作业,首先会被拆分成许多个Map任务在多台机器上并行执行,每个Map任务通常运行在数据存储的节点上,这样,计算和数据就可以放在一起运行,不需要额外的数据传输开销。当Map任务结束后,会生成以形式表示的许多中间结果。然后,这些中间结果会被分发到多个Reduce任务在多台机器上并行执行,具有相同key的会被发送到同一个Reduce任务那里,Reduce任务会对中间结果进行汇总计算得到最后结果,并输出到分布式文件系统中。
需要指出的是,不同的Map任务之间不会进行通信,不同的Reduce任务之间也不会发生任何信息交换;用户不能显式地从一台机器向另一台机器发送消息,所有的数据交换都是通过MapReduce框架自身去实现的。
为了让Reduce可以并行处理Map的结果,需要对Map的输出进行一定的分区(Portition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到形式的中间结果,再交给对应的 Reduce 进行处理,这个过程称为 Shuffle。从无序的到有序的,这个过程用Shuffle(洗牌)来称呼是非常形象的。
所谓Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。因此,Shuffle过程分为Map端的操作和Reduce端的操作,如图7-3所示,主要执行以下操作
在Map端的Shuffle过程Map的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序(Sort)和合并(Combine),之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并(Merge)成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自己处理的数据。(2)在Reduce端的Shuffle过程Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行归并(Merge)后交给Reduce处理。
2.Map 端的Shuffle过程
- (1)输入数据和执行Map任务
- (2)写入缓存
- (3)溢写(分区、排序和合并)。并非所有场合都可以使用Combiner,因为Combiner的输出是Reduce任务的输入,Combiner绝不能改变Reduce任务最终的计算结果,一般而言,累加、最大值等场景可以使用合并操作。经过分区、排序以及可能发生的合并操作之后,这些缓存中的键值对就可以被写入磁盘,并清空缓存。每次溢写操作都会在磁盘中生成一个新的溢写文件,写入溢写文件中的所有键值对都是经过分区和排序的。
- (4)文件归并每次溢写操作都会在磁盘中生成一个新的溢写文件,随着MapReduce任务的进行,磁盘中的溢写文件数量会越来越多。当然,如果Map输出结果很少,磁盘上只会存在一个溢写文件,但是通常都会存在多个溢写文件。最终,在Map任务全部结束之前,系统会对所有溢写文件中的数据进行归并(Merge),生成一个大的溢写文件,这个大的溢写文件中的所有键值对也是经过分区和排序的。
所谓“归并”,是指对于具有相同key的键值对会被归并成一个新的键值对。具体而言,对于若干个具有相同 key 的键值对, …… 会被归并成一个新的键值对>。
经过上述4个步骤以后,Map端的Shuffle过程全部完成,最终生成的一个大文件会被存放在本地磁盘上。这个大文件中的数据是被分区的,不同的分区会被发送到不同的Reduce任务进行并行处理。JobTracker会一直监测Map任务的执行,当监测到一个Map任务完成后,就会立即通知相关的Reduce任务来“领取”数据,然后开始Reduce端的Shuffle过程。
3.Reduce端的Shuffle过程
Reduce端的Shuffle过程非常简单,只需要从Map端读取Map结果,然后执行归并操作,最后输送给Reduce任务进行处理。具体而言,Reduce端的Shuffle过程包括3个步骤,如图7-5所示。
- (1)“领取”数据
Map端的Shuffle过程结束后,所有Map输出结果都保存在Map机器的本地磁盘上,Reduce任务需要把这些数据“领取”(Fetch)回来存放到自己所在机器的本地磁盘上。因此,在每个Reduce任务真正开始之前,它大部分时间都在从Map端把属于自己处理的那些分区的数据“领取”过来。每个Reduce任务会不断地通过RPC向JobTracker询问Map任务是否已经完成;JobTracker监测到一个Map任务完成后,就会通知相关的Reduce任务来“领取”数据;一旦一个Reduce任务收到JobTracker的通知,它就会到该Map任务所在机器上把属于自己处理的分区数据领取到本地磁盘中。一般系统中会存在多个Map机器,因此Reduce任务会使用多个线程同时从多个Map机器领回数据。
- (2)归并数据
从Map端领回的数据会首先被存放在Reduce任务所在机器的缓存中,如果缓存被占满,就会像Map端一样被溢写到磁盘中。
当溢写过程启动时,具有相同key的键值对会被归并(Merge),如果用户定义了Combiner,则归并后的数据还可以执行合并操作,减少写入磁盘的数据量
最终,当所有的 Map 端数据都已经被领回时,和 Map端类似,多个溢写文件会被归并成一个大文件,归并的时候还会对键值对进行排序,从而使得最终大文件中的键值对都是有序的
假设磁盘中生成了50个溢写文件,每轮可以归并10个溢写文件,则需要经过5轮归并,得到5个归并后的大文件。
- (3)把数据输入给Reduce任务
磁盘中经过多轮归并后得到的若干个大文件,不会继续归并成一个新的大文件,而是直接输入给Reduce任务,这样可以减少磁盘读写开销。由此,整个Shuffle过程顺利结束。接下来,Reduce任务会执行 Reduce 函数中定义的各种映射,输出最终结果,并保存到分布式文件系统中(比如GFS或HDFS)。
7.3 实例分析:WordCount
首先,需要检查WordCount程序任务是否可以采用MapReduce来实现。
MapReduce来处理的数据集需要满足一个前提条件:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
7.6 本章小结
MapReduce 执行的全过程包括以下几个主要阶段:从分布式文件系统读入数据、执行 Map任务输出中间结果、通过Shuffle阶段把中间结果分区排序整理后发送给Reduce任务、执行Reduce任务得到最终结果并写入分布式文件系统。在这几个阶段中,Shuffle阶段非常关键,必须深刻理解这个阶段的详细执行过程。
第8章 hadoop 再讨论
8.1 Hadoop的优化与发展
Hadoop1.0的核心组件(仅指MapReduce和HDFS,不包括Hadoop生态系统内的Pig、Hive、HBase等其他组件)主要存在以下不足。
在后续发展过程中,Hadoop对MapReduce和HDFS的许多方面做了有针对性的改进提升(见表8-1),同时在Hadoop生态系统中也融入了更多的新成员,使得Hadoop功能更加完善,比较有代表性的产品包括Pig、Oozie、Tez、Kafka等(见表8-2)。
8.2 HDFS2.0的新特性
在HDFS1.0中,只存在一个名称节点,这就是常说的“单点故障问题”
为了解决单点故障问题,HDFS2.0采用了HA(High Availability)架构。
在一个典型的HA集群中,一般设置两个名称节点,其中一个名称节点处于“活跃(Active)”状态,另一个处于“待命(Standby)”状态,如图8-1所示。
由于待命名称节点是活跃名称节点的“热备份”,因此活跃名称节点的状态信息必须实时同步到待命名称节点。
两种名称节点的状态同步,可以借助于一个共享存储系统来实现,比如 NFS (Network File System)、QJM(Quorum Journal Manager)或者Zookeeper。
Zookeeper可以确保任意时刻只有一个名称节点提供对外服务
HDFS1.0采用单名称节点的设计,不仅会带来单点故障问题,还存在可扩展性、性能和隔离性等问题
HDFS1.0中只有一个名称节点,不可以水平扩展,而单个名称节点的内存空间是有上限的,这限制了系统中数据块、文件和目录的数目。是否可以通过纵向扩展的方式(即为单个名称节点增加更多的CPU、内存等资源)解决这个问题呢?答案是否定的。
HDFS HA在本质上还是单名称节点,只是通过“热备份”设计方式解决了单点故障问题,并没有解决可扩展性、系统性能和隔离性三个方面的问题。
HDFS 联邦可以很好地解决上述三个方面的问题。
HDFS 联邦并不是真正的分布式设计,但是采用这种简单的“联合”设计方式,在实现和管理复杂性方面,都要远低于真正的分布式设计,而且可以快速满足需求。
在 HDFS 联邦中,所有名称节点会共享底层的数据节点存储资源,如图 8-2 所示。每个数据节点要向集群中所有的名称节点注册,并周期性地向名称节点发送“心跳”和块信息,报告自己的状态,同时也会处理来自名称节点的指令。
HDFS1.0不同的是,HDFS 联邦拥有多个独立的命名空间,其中,每一个命名空间管理属于自己的一组块,这些属于同一个命名空间的块构成一个“块池”(Block Pool)。每个数据节点会为多个块池提供块的存储。可以看出,数据节点是一个物理概念,而块池则属于逻辑概念,一个块池是一组块的逻辑集合,块池中的各个块实际上是存储在各个不同的数据节点中的。因此,HDFS 联邦中的一个名称节点失效,也不会影响到与它相关的数据节点继续为其他名称节点提供服务。
对于HDFS联邦中的多个命名空间,可以采用客户端挂载表(Client Side Mount Table)方式进行数据共享和访问。
客户可以访问不同的挂载点来访问不同的子命名空间。这就是 HDFS 联邦中命名空间管理的基本原理,即把各个命名空间挂载到全局“挂载表”(Mount-table)中,实现数据全局共享;
8.3 新一代资源管理调度框架YARN
MapReduce1.0 采用 Master/Slave 架构设计(见图 8-4),包括一个 JobTracker 和若干个TaskTracker,前者负责作业的调度和资源的管理,后者负责执行JobTracker指派的具体任务
- (1)存在单点故障。
- (2)JobTracker“大包大揽”导致任务过重。
- (3)容易出现内存溢出。
- (4)资源划分不合理。
资源(CPU、内存)被强制等量划分成多个“槽”(Slot),槽又被进一步划分为Map槽和Reduce槽两种,分别供Map任务和Reduce任务使用,彼此之间不能使用分配给对方的槽,也就是说,当Map任务已经用完Map槽时,即使系统中还有大量剩余的Reduce槽,也不能拿来运行Map任务,反之亦然
为了克服 MapReduce1.0 版本的缺陷,Hadoop2.0 以后的版本对其核心子项目 MapReduce1.0的体系结构进行了重新设计,生成了MapReduce2.0和YARN(Yet Another Resource Negotiator)。
YARN架构设计思路如图8-5所示,基本思路就是“放权”,即不让JobTracker这一个组件承担过多的功能,把原JobTracker三大功能(资源管理、任务调度和任务监控)进行拆分,分别交给不同的新组件去处理。重新设计后得到的 YARN 包括 ResourceManager、ApplicationMaster 和NodeManager,其中,由ResourceManager负责资源管理,由ApplicationMaster负责任务调度和监控,由 NodeManager 负责执行原 TaskTracker 的任务。通过这种“放权”的设计,大大降低了JobTracker的负担,提升了系统运行的效率和稳定性。
在Hadoop1.0中,其核心子项目MapReduce1.0既是一个计算框架,也是一个资源管理调度框架。到了Hadoop2.0以后,MapReduce1.0中的资源管理调度功能被单独分离出来形成了YARN,它是一个纯粹的资源管理调度框架,而不是一个计算框架;而被剥离了资源管理调度功能的MapReduce 框架就变成了MapReduce2.0,它是运行在YARN之上的一个纯粹的计算框架,不再自己负责资源调度管理服务,而是由YARN为其提供资源管理调度服务。
如图8-6所示,YARN体系结构中包含了三个组件:ResourceManager、ApplicationMaster和NodeManager。YARN各个组件的功能见表8-3。
ResourceManager(RM)是一个全局的资源管理器,负责整个系统的资源管理和分配,主要包括两个组件,即调度器(Scheduler)和应用程序管理器(Applications Manager)
调度器主要负责资源管理和分配,不再负责跟踪和监控应用程序的执行状态,也不负责执行失败恢复,因为这些任务都已经交给ApplicationMaster组件来负责。
调度器接收来自ApplicationMaster的应用程序资源请求,并根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),把集群中的资源以“容器”的形式分配给提出申请的应用程序,容器的选择通常会考虑应用程序所要处理的数据的位置,进行就近选择,从而实现“计算向数据靠拢”。
而在 YARN 中是以容器(Container)作为动态资源分配单位,每个容器中都封装了一定数量的CPU、内存、磁盘等资源,从而限定每个应用程序可以使用的资源量
在Hadoop平台上,用户的应用程序是以作业(Job)的形式提交的,然后一个作业会被分解成多个任务(包括Map任务和Reduce任务)进行分布式执行。
ApplicationMaster。ApplicationMaster 的主要功能是:(1)当用户作业提交时,ApplicationMaster 与 ResourceManager 协商获取资源,ResourceManager 会以容器的形式为ApplicationMaster分配资源;(2)把获得的资源进一步分配给内部的各个任务(Map任务或Reduce任务),实现资源的“二次分配”;(3)与NodeManager保持交互通信进行应用程序的启动、运行、监控和停止,监控申请到的资源的使用情况,对所有任务的执行进度和状态进行监控,并在任务发生失败时执行失败恢复(即重新申请资源重启任务);(4)定时向ResourceManager发送“心跳”消息,报告资源的使用情况和应用的进度信息;(5)当作业完成时,ApplicationMaster 向ResourceManager注销容器,执行周期完成。
NodeManager是驻留在一个YARN集群中的每个节点上的代理,主要负责容器生命周期管理,监控每个容器的资源(CPU、内存等)使用情况,跟踪节点健康状况,并以“心跳”的方式与ResourceManager保持通信,向ResourceManager汇报作业的资源使用情况和每个容器的运行状态,同时,它还要接收来自 ApplicationMaster 的启动/停止容器的各种请求
在集群部署方面,YARN 的各个组件是和 Hadoop 集群中的其他组件进行统一部署的
YARN的ResourceManager组件和HDFS的名称节点(NameNode)部署在一个节点上,YARN的ApplicationMaster及NodeManager是和HDFS的数据节点(DataNode)部署在一起的。YARN中的容器代表了CPU、内存、网络等计算资源,它也是和HDFS的数据节点一起的。
8.3.4 YARN工作流程
YARN的工作流程如图8-8所示,在YARN框架中执行一个MapReduce程序时,从提交到完成需要经历如下8个步骤。① 用户编写客户端应用程序,向 YARN 提交应用程序,提交的内容包括 ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。② YARN中的ResourceManager负责接收和处理来自客户端的请求。接到客户端应用程序请求后,ResourceManager里面的调度器会为应用程序分配一个容器。同时,ResourceManager的应用程序管理器会与该容器所在的 NodeManager 通信,为该应用程序在该容器中启动一个ApplicationMaster(即图8-8中的“MR App Mstr”)。③ ApplicationMaster 被创建后会首先向 ResourceManager 注册,从而使得用户可以通过ResourceManager 来直接查看应用程序的运行状态。接下来的步骤 4~7 是具体的应用程序执行步骤。④ ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请资源。⑤ ResourceManager 以“容器”的形式向提出申请的 ApplicationMaster 分配资源,一旦ApplicationMaster 申请到资源后,就会与该容器所在的 NodeManager 进行通信,要求它启动任务。⑥ 当ApplicationMaster要求容器启动任务时,它会为任务设置好运行环境(包括环境变量、JAR 包、二进制程序等),然后将任务启动命令写到一个脚本中,最后通过在容器中运行该脚本来启动任务。⑦ 各个任务通过某个 RPC 协议向 ApplicationMaster 汇报自己的状态和进度,让ApplicationMaster可以随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。⑧ 应用程序运行完成后,ApplicationMaster向ResourceManager的应用程序管理器注销并关闭自己。若 ApplicationMaster 因故失败,ResourceManager 中的应用程序管理器会监测到失败的情形,然后将其重新启动,直到所有的任务执行完毕。
8.3.5 YARN框架与MapReduce1.0框架的对比分析
而 YARN 则是一个纯粹的资源调度管理框架,在它上面可以运行包括 MapReduce 在内的不同类型的计算框架,默认类型是MapReduce
YARN 有着更加“宏伟”的发展构想,即发展成为集群中统一的资源管理调度框架,在一个集群中为上层的各种计算框架提供统一的资源管理调度服务。
8.4 Hadoop生态系统中具有代表性的功能组件
Pig通常用于 ETL(Extraction、Transformation、Loading)过程,即来自各个不同数据源的数据被收集过来以后,采用Pig进行统一加工处理,然后加载到数据仓库Hive中,由Hive实现对海量数据的分析。
图8-11中,group by和join操作都“跨越”了Map和Reduce两个阶段,这是因为,group by和join操作都涉及到Shuffle过程,根据“第7章MapReduce”可以知道,Shuffle过程包含了Map端和Reduce端,所以图中表示group by和join操作的矩形框与Map和Reduce两个阶段都存在重叠区域。
因此Tez框架可以发挥重要的作用。可以让 Tez 框架运行在 YARN 框架之上,如图 8-13所示,然后让MapReduce、Pig和Hive等计算框架运行在 Tez 框架之上,从而借助于 Tez 框架实现对MapReduce、Pig和Hive等的性能优化,更好地解决现有MapReduce框架在迭代计算(如PageRank计算)和交互式计算方面存在的问题。
Kafka是由LinkedIn公司开发的一种高吞吐量的分布式发布订阅消息系统,用户通过Kafka系统可以发布大量的消息,同时也能实时订阅消费消息。Kafka 设计的初衷是构建一个可以处理海量日志、用户行为和网站运营统计等的数据处理框架。为了满足上述应用需求,就需要同时提供实时在线处理的低延迟和批量离线处理的高吞吐量。
其他工具加入大数据生态系统后,只需要开发和这款通用工具的数据交换方案,就可以通过这个交换枢纽轻松实现和其他Hadoop组件的数据交换。Kafka就是一款可以实现这种功能的产品。
在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(如关系数据库、NoSQL数据库、流处理系统、批处理系统等)可以统一接入Kafka,如图8-14所示,实现和Hadoop各个组件之间的不同类型数据的实时高效交换,较好地满足各种企业的应用需求
第9章 Spark
而为了使编写程序更为容易,Spark使用简练、优雅的Scala语言编写,基于Scala提供了交互式的编程体验。
9.1 概述
Spark具有如下4个主要特点。
- ① 运行速度快。Spark使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比Hadoop MapReduce快上百倍,基于磁盘的执行速度也能快十倍。
- ② 容易使用。Spark支持使用Scala、Java、Python和R语言进行编程,简洁的API设计有助于用户轻松构建并行程序,并且可以通过Spark Shell进行交互式编程。
- ③ 通用性。Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算。
- ④ 运行模式多样。Spark可运行于独立的集群模式中,或者运行于Hadoop中,也可运行于Amazon EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。
scala 优势:
- ① Scala具备强大的并发性
- ② Scala语法简洁,
- ③ Scala兼容Java,
spark 优势:
- ① Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活。
- ② Spark提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率。
- ③ Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。
9.2 Spark生态系统
Spark 的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,既能够提供内存计算框架,也可以支持SQL即席查询、实时流式计算、机器学习和图计算等。Spark可以部署在资源管理器YARN之上,提供一站式的大数据解决方案。
9.3 Spark运行架构
本节首先介绍Spark的基本概念和架构设计方法,然后介绍Spark运行基本流程,最后介绍RDD的运行原理。
9.3.1 基本概念
在具体讲解Spark运行架构之前,需要先了解以下7个重要的概念。① RDD:是弹性分布式数据集(Resilient Distributed Dataset)的英文缩写,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。② DAG:是Directed Acyclic Graph(有向无环图)的英文缩写,反映RDD之间的依赖关系。③ Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行任务,并为应用程序存储数据。④ 应用:用户编写的Spark应用程序。⑤ 任务:运行在Executor上的工作单元。⑥ 作业:一个作业包含多个RDD及作用于相应RDD上的各种操作。⑦ 阶段:是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”,或者也被称为“任务集”。
Spark运行架构如图9-5所示,包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。
与Hadoop MapReduce计算框架相比,Spark所采用的Executor有两个优点:一是利用多线程来执行具体的任务(Hadoop MapReduce采用的是进程模型),减少任务的启动开销;二是Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时就可以直接读该存储模块里的数据,而不需要读写到HDFS等文件系统里,因而有效减少了IO开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写IO性能。
Spark运行基本流程如图9-7 所示,流程如下。(1)当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext 会向资源管理器注册并申请运行Executor的资源。(2)资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上。(3)SparkContext 根据 RDD 的依赖关系构建 DAG 图,DAG 图提交给 DAG 调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor 向 SparkContext 申请任务,任务调度器将任务分发给 Executor 运行,同时SparkContext将应用程序代码发放给Executor。(4)任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
RDD 的设计理念源自 AMP 实验室发表的论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个 RDD 的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算
总体而言,Spark采用RDD以后能够实现高效计算的主要原因如下。
- (1)高效的容错性
- (2)中间结果持久化到内存
- (3)存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化开销。
4.RDD之间的依赖关系
RDD中不同的操作会使得不同RDD中的分区产生不同的依赖。RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency),两种依赖之间的区别如图9-10所示。
窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。
宽依赖则表现为存在一个父 RDD的一个分区对应一个子 RDD的多个分区。
窄依赖典型的操作包括map、filter、union等,宽依赖典型的操作包括groupByKey、sortByKey等。对于连接(Join)操作,可以分为两种情况。
- (1)对输入进行协同划分,属于窄依赖
- (2)对输入做非协同划分,属于宽依赖,
对于窄依赖的RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合。对于宽依赖的RDD,则通常伴随着Shuffle操作,即首先需要计算好所有父分区数据,然后在节点之间进行Shuffle。
5.阶段的划分
Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算(具体的阶段划分算法请参见 AMP 实验室发表的论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》)
由上述论述可知,把一个 DAG 图划分成多个阶段以后,每个阶段都代表了一组关联的、相互之间没有 Shuffle 依赖关系的任务组成的任务集合。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行
6.RDD运行过程
(1)创建RDD对象。(2)SparkContext负责计算RDD之间的依赖关系,构建DAG。(3)DAGScheduler负责把DAG图分解成多个阶段,每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的Executor去执行。
9.4 Spark的部署和应用方式
本节首先介绍Spark支持的三种典型部署方式,即standalone、Spark on Mesos和Spark on YARN
9.5 Spark编程实践
表9-2 常用的几个Action API介绍
表9-3 常用的几个Transformation API介绍
Spark属于MapReduce计算模型,因此也可以实现MapReduce的计算流程,如实现单词统计,可以首先使用 flatMap()将每一行的文本内容通过空格进行划分为单词;然后使用 map()将单词映射为(K,V)的键值对,其中K为单词,V为1;最后使用reduceByKey()将相同单词的计数进行相加,最终得到该单词总的出现次数。具体实现命令如下:scala > val wordCounts = textFile.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)scala > wordCounts.collect() // 输出单词统计结果// Array[(String, Int)]= Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)…)在上面的代码中,flatMap()、map()和reduceByKey()都是属于“转换”操作,由于Spark采用了惰性机制,这些转换操作只是记录了 RDD 之间的依赖关系,并不会真正计算出结果。最后,运行collect(),它属于“行动”类型的操作,这时才会执行真正的计算,Spark会把计算打散成多个任务分发到不同的机器上并行执行。