微信号:dbaplus

介绍:围绕数据库、大数据、PaaS云,顶级大咖、技术干货,运营几个月受众过十万!成为运维圈最专注围绕“数据”的学习交流和专业社群!欢迎投稿,加入探讨.

一文详解大规模数据计算处理原理及操作重点

2018-08-07 07:15 李智慧


作者介绍

李智慧《大型网站技术架构:核心原理与案例分析》作者。曾供职于阿里巴巴与英特尔亚太研发中心,从事大型网站与大数据方面的研发工作,目前在做企业级区块链方面的开发工作。


一、RAID技术


大数据技术主要针对的是大规模数据的计算处理问题,那么要想解决的这一问题,首先要解决的就是大规模数据的存储问题。大规模数据存储要解决的核心问题有三个方面:


  • 数据存储容量的问题,既然大数据要解决的是数以PB计的数据计算问题,而一般的服务器磁盘容量通常1-2TB,那么如何存储这么大规模的数据?

  • 数据读写速度的问题,一般磁盘的连续读写速度为几十MB,以这样的速度,几十PB的数据恐怕要读写到天荒地老……

  • 数据可靠性的问题,磁盘大约是计算机设备中最易损坏的硬件了,在网站一块磁盘使用寿命大概是一年,如果磁盘损坏了,数据怎么办?


在大数据技术出现之前,人们就需要面对这些关于存储的问题,对应的解决方案就是RAID技术。


RAID(独立磁盘冗余阵列)技术主要是为了改善磁盘的存储容量、读写速度,增强磁盘的可用性和容错能力。目前服务器级别的计算机都支持插入多块磁盘(8块或者更多),通过使用RAID技术,实现数据在多块磁盘上的并发读写和数据备份。


常用RAID技术有以下几种,如图所示:


常用RAID技术原理图


假设服务器有N块磁盘:


RAID0

数据在从内存缓冲区写入磁盘时,根据磁盘数量将数据分成N份,这些数据同时并发写入N块磁盘,使得数据整体写入速度是一块磁盘的N倍,读取的时候也一样,因此RAID0具有极快的数据读写速度。但是RAID0不做数据备份,N块磁盘中只要有一块损坏,数据完整性就被破坏,所有磁盘的数据都会损坏。


RAID1

数据在写入磁盘时,将一份数据同时写入两块磁盘,这样任何一块磁盘损坏都不会导致数据丢失,插入一块新磁盘就可以通过复制数据的方式自动修复,具有极高的可靠性。


RAID10

结合RAID0和RAID1两种方案,将所有磁盘平均分成两份,数据同时在两份磁盘写入,相当于RAID1,但是在每一份磁盘里面的N/2块磁盘上,利用RAID0技术并发读写,既提高可靠性又改善性能,不过RAID10的磁盘利用率较低,有一半的磁盘用来写备份数据。


RAID3

一般情况下,一台服务器上不会出现同时损坏两块磁盘的情况,在只损坏一块磁盘的情况下,如果能利用其它磁盘的数据恢复损坏磁盘的数据,就能在保证可靠性和性能的同时,大幅提升磁盘利用率。


在数据写入磁盘的时候,将数据分成N-1份,并发写入N-1块磁盘,并在第N块磁盘记录校验数据,任何一块磁盘损坏(包括校验数据磁盘),都可以利用其它N-1块磁盘的数据修复。


但是在数据修改较多的场景中,任何磁盘修改数据都会导致第N块磁盘重写校验数据,频繁写入的后果是第N块磁盘比其它磁盘容易损坏,需要频繁更换,所以RAID3很少在实践中使用。


RAID5


相比RAID3,更多被使用的方案是RAID5。


RAID5和RAID3很相似,但是校验数据不是写入第N块磁盘,而是螺旋式地写入所有磁盘中。这样校验数据的修改也被平均到所有磁盘上,避免RAID3频繁写坏一块磁盘的情况。


RAID6


如果数据需要很高的可靠性,在出现同时损坏两块磁盘的情况下(或者运维管理水平比较落后,坏了一块磁盘但是迟迟没有更换,导致又坏了一块磁盘),仍然需要修复数据,这时候可以使用RAID6。


RAID6和RAID5类似,但是数据只写入N-2块磁盘,并螺旋式地在两块磁盘中写入校验信息(使用不同算法生成)。


在相同磁盘数目(N)的情况下,各种RAID技术的比较如下表所示:


几种RAID技术比较


RAID技术有硬件实现,比如专用的RAID卡或者主板直接支持,也可以通过软件实现,在操作系统层面将多块磁盘组成RAID,在逻辑视作一个访问目录。RAID技术在传统关系数据库及文件系统中应用比较广泛,是改善计算机存储特性的重要手段。


RAID技术只是在单台服务器的多块磁盘上组成阵列,大数据需要更大规模的存储空间和访问速度。将RAID技术原理应用到分布式服务器集群上,就形成了Hadoop分布式文件系统HDFS的架构思想。


二、HDFS架构思想


1、HDFS架构原理


和RAID在多个磁盘上进行文件存储及并行读写一样思路,HDFS在一个大规模分布式服务器集群上,对数据进行并行读写及冗余存储。因为HDFS可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可以供HDFS使用,所以整个HDFS的存储空间可以达到PB级容量。HDFS架构如图:


HDFS架构


HDFS中关键组件有两个,一个是NameNode,一个是DataNode。


DataNode负责文件数据的存储和读写操作,HDFS将文件数据分割成若干块(block),每个DataNode存储一部分block,这样文件就分布存储在整个HDFS服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得HDFS可以在服务器集群规模上实现数据并行访问,极大地提高访问速度。实践中HDFS集群的DataNode服务器会有很多台,一般在几百台到几千台这样的规模,每台服务器配有数块磁盘,整个集群的存储容量大概在几PB到数百PB。


NameNode负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名,数据block的ID以及存储位置等信息,承担着操作系统中文件分配表(FAT)的角色。HDFS为了保证数据的高可用,会将一个block复制为多份(缺省情况为3份),并将三份相同的block存储在不同的服务器上。这样当有磁盘损坏或者某个DataNode服务器宕机导致其存储的block不能访问的时候,Client会查找其备份的block进行访问。


block多份复制存储如下图所示:


HDFS的block复制备份策略


对于文件/users/sameerp/data/part-0,其复制备份数设置为2,存储的block ID为1,3,block1的两个备份存储在DataNode0和DataNode2两个服务器上,block3的两个备份存储DataNode4和DataNode6两个服务器上,上述任何一台服务器宕机后,每个block都至少还有一个备份存在,不会影响对文件/users/sameerp/data/part-0的访问。


事实上,DataNode会通过心跳和NameNode保持通信,如果DataNode超时未发送心跳,NameNode就会认为这个DataNode已经失效,立即查找这个DataNode上存储的block有哪些,以及这些block还存储在哪些服务器上,随后通知这些服务器再复制一份block到其它服务器上,保证HDFS存储的block备份数符合用户设置的数目,即使再有服务器宕机,也不会丢失数据。


2、HDFS应用


Hadoop分布式文件系统可以像一般的文件系统那样进行访问:使用命令行或者编程语言API进行文件读写操作。我们以HDFS写文件为例看HDFS处理过程,如下图:


HDFS写文件操作


  • 应用程序Client调用HDFS API,请求创建文件,HDFS API包含在Client进程中;

  • HDFS API将请求参数发送给NameNode服务器,NameNode在meta信息中创建文件路径,并查找DataNode中空闲的block,然后将空闲block的id、对应的DataNode服务器信息返回给Client。因为数据块需要多个备份,所以即使Client只需要一个block的数据量,NameNode也会返回多个NameNode信息;

  • Client调用HDFS API,请求将数据流写出;

  • HDFS API连接第一个DataNode服务器,将Client数据流发送给DataNode,该DataNode一边将数据写入本地磁盘,一边发送给第二个DataNode,同理第二个DataNode记录数据并发送给第三个DataNode;

  • Client通知NameNode文件写入完成,NameNode将文件标记为正常,可以进行读操作了。


HDFS虽然提供了API,但是在实践中,我们很少自己编程直接去读取HDFS中的数据,原因正如开篇提到,在大数据场景下,移动计算比移动数据更划算。

参考链接:https://zhuanlan.zhihu.com/p/34436007


与其写程序去读取分布在这么多DataNode上的数据,不如将程序分发到DataNode上去访问其上的block数据。但是如何对程序进行分发?分发出去的程序又如何访问HDFS上的数据?计算的结果如何处理,如果结果需要合并,该如何合并?


Hadoop提供了对存储在HDFS上的大规模数据进行并行计算的框架,就是MapReduce。


三、MapReduce


Hadoop解决大规模数据分布式计算的方案是MapReduce。MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。我们先看一下作为编程模型的MapReduce。


1、MapReduce编程模型


MapReduce是一种非常简单又非常强大的编程模型。


简单在于其编程模型只包含map和reduce两个过程,map的主要输入是一对<key , value>值,经过map计算后输出一对<key , value>值;然后将相同key合并,形成<key , value集合>;再将这个<key , value集合>输入reduce,经过计算输出零个或多个<key , value>对。


但是MapReduce同时又是非常强大的,不管是关系代数运算(SQL计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过MapReduce编程来实现。


我们以WordCount程序为例。WordCount主要解决文本处理中的词频统计问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十K到几M的数据,那么写一个程序,将数据读入内存,建一个Hash表记录每个词出现的次数就可以了,如下图:


小数据量的词频统计


但是如果想统计全世界互联网所有网页(数万亿计)的词频数(这正是google这样的搜索引擎典型需求),你不可能写一个程序把全世界的网页都读入内存,这时候就需要用MapReduce编程来解决。


WordCount的MapReduce程序如下:


public class WordCount {

  public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    public void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

  public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }}


其核心是一个map函数,一个reduce函数。


map函数的输入主要是一个<key , value>对,在这个例子里,value是要统计的所有文本中的一行数据,key在这里不重要,我们忽略。


public void map(Object key, Text value, Context context

                    )


map函数的计算过程就是,将这行文本中的单词提取出来,针对每个单词输出一个<word , 1>这样的<key , value>对。


MapReduce计算框架会将这些<word , 1>收集起来,将相同的word放在一起,形成<word , <1,1,1,1,1,1,1.....>>这样的<key , value集合>数据,然后将其输入给reduce函数。


   public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       )


这里的reduce的输入参数values就是由很多个1组成的集合,而key就是具体的单词word。


reduce函数的计算过程就是,将这个集合里的1求和,再将单词(word)和这个和(sum)组成一个<key , value>(<word , sum>)输出。每一个输出就是一个单词和它的词频统计总和。


假设有两个block的文本数据需要进行词频统计,MapReduce计算过程如下图:


MapReduce计算过程


一个map函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是HDFS所做的),MapReduce计算框架为每个块分配一个map函数去计算,从而实现大数据的分布式计算。


2、MapReduce计算框架架构原理


前面提到MapReduce编程模型将大数据计算过程切分为map和reduce两个阶段,在map阶段为每个数据块分配一个map计算任务,然后将所有map输出的key进行合并,相同的key及其对应的value发送给同一个reduce任务去处理。


这个过程有两个关键问题需要处理:


  • 如何为每个数据块分配一个map计算任务,代码是如何发送数据块所在服务器的,发送过去是如何启动的,启动以后又如何知道自己需要计算的数据在文件什么位置(数据块id是什么)?

  • 处于不同服务器的map输出的<key , value> ,如何把相同的key聚合在一起发送给reduce任务?


这两个关键问题正好对应前面文章中“MapReduce计算过程”一图中两处“MapReduce框架处理”:


MapReduce计算过程中两处MapReduce框架处理


我们先看下MapReduce是如何启动处理一个大数据计算应用作业的:


MapReduce作业启动和运行机制


我们以Hadoop1为例,MapReduce运行过程涉及以下几类关键进程:


  • 大数据应用进程:启动用户MapReduce程序的主入口,主要指定Map和Reduce类、输入输出文件路径等,并提交作业给Hadoop集群。

  • JobTracker进程:根据要处理的输入数据量启动相应数量的map和reduce进程任务,并管理整个作业生命周期的任务调度和监控。JobTracker进程在整个Hadoop集群全局唯一。

  • TaskTracker进程:负责启动和管理map进程以及reduce进程。因为需要每个数据块都有对应的map函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器,也就是说,Hadoop集群中绝大多数服务器同时运行DataNode进程和TaskTacker进程。


如下图所示:


MapReduce作业启动和运行机制


具体作业启动和计算过程如下:


  • 应用进程将用户作业jar包存储在HDFS中,将来这些jar包会分发给Hadoop集群中的服务器执行MapReduce计算;

  • 应用程序提交job作业给JobTracker;

  • JobTacker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树;

  • JobInProcess根据输入数据分片数目(通常情况就是数据块的数目)和设置的reduce数目创建相应数量的TaskInProcess;

  • TaskTracker进程和JobTracker进程进行定时通信;

  • 如果TaskTracker有空闲的计算资源(空闲CPU核),JobTracker就会给它分配任务。分配任务的时候会根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据,以实现我们一开始就提到的“移动计算比移动数据更划算”;

  • TaskRunner收到任务后根据任务类型(map还是reduce),任务参数(作业jar包路径,输入数据文件路径,要处理的数据在文件中的起始位置和偏移量,数据块多个备份的DataNode主机名等)启动相应的map或者reduce进程;

  • map或者reduce程序启动后,检查本地是否有要执行任务的jar包文件,如果没有,就去HDFS上下载,然后加载map或者reduce代码开始执行;

  • 如果是map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机);如果是reduce进程,将结果数据写出到HDFS。


通过以上过程,MapReduce可以将大数据作业计算任务分布在整个Hadoop集群中运行,每个map计算任务要处理的数据通常都能从本地磁盘上读取到,而用户要做的仅仅是编写一个map函数和一个reduce函数就可以了,根本不用关心这两个函数是如何被分布启动到集群上的,数据块又是如何分配给计算任务的。这一切都由MapReduce计算框架完成。


MapReduce数据合并与连接机制


在WordCount例子中,要统计相同单词在所有输入数据中出现的次数,而一个map只能处理一部分数据,一个热门单词几乎会出现在所有的map中,这些单词必须要合并到一起进行统计才能得到正确的结果。


事实上,几乎所有的大数据计算场景都需要处理数据关联的问题,简单如WordCount只要对key进行合并就可以了,复杂如数据库的join操作,需要对两种类型(或者更多类型)的数据根据key进行连接。


MapReduce计算框架处理数据合并与连接的操作就在map输出与reduce输入之间,这个过程有个专门的词汇来描述,叫做shuffle。


MapReduce shuffle过程


每个map任务的计算结果都会写入到本地文件系统,等map任务快要计算完成的时候,MapReduce计算框架会启动shuffle过程,在map端调用一个Partitioner接口,对map产生的每个<key , value>进行reduce分区选择,然后通过http通信发送给对应的reduce进程。这样不管map位于哪个服务器节点,相同的key一定会被发送给相同的reduce进程。reduce端对收到的<key , value>进行排序和合并,相同的key放在一起,组成一个<key , value集合>传递给reduce执行。


MapReduce框架缺省的Partitioner用key的哈希值对reduce任务数量取模,相同的key一定会落在相同的reduce任务id上,实现上,这样的Partitioner代码只需要一行,如下所示:


/** Use {@link Object#hashCode()} to partition. */

public int getPartition(K2 key, V2 value, int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

 }


shuffle是大数据计算过程中发生奇迹的地方,不管是MapReduce还是Spark,只要是大数据批处理计算,一定会有shuffle过程,让数据关联起来,数据的内在关系和价值才会呈现出来。不理解shuffle,就会在map和reduce编程中产生困惑,不知道该如何正确设计map的输出和reduce的输入。shuffle也是整个MapReduce过程中最难最消耗性能的地方,在MapReduce早期代码中,一半代码都是关于shuffle处理的。


3、工具——Hive


既然MapReduce计算模型可以解决绝大多数的数据分析与数据挖掘任务,那么对于如下我们常见的一条SQL分析语句,MapReduce如何编程实现?


SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;


这是一条非常常见的SQL统计分析语句,统计不同年龄的用户访问不同网页的兴趣偏好,对于产品运营和设计很有价值。具体数据输入和执行结果如下图示例:


group by输入输出示例


左边是要分析的数据表,右边是分析结果。实际上把左边表相同的行累计求和,就得到右边的表了,看起来跟WordCount的计算很一样。确实也是这样,我们看下这条SQL语句的MapReduce的计算过程,map和reduce函数的输入输出以及函数处理过程分别是什么样。


首先,看下map函数的输入key和value,key不重要,忽略掉,value就是左边表中每一行的数据,<1, 25>这样。map函数的输出就是以输入的value作为key,value统一设为1,<<1, 25>, 1>这样。


map函数的输出经过shuffle以后,相同的key及其对应的value被放在一起组成一个<key,value集合>,作为输入交给reduce函数处理。如<<2, 25>, 1>被map函数输出两次,那么到了reduce这里,就变成输入<<2, 25>, <1, 1>>,key是<2, 25>, value集合是<1, 1>。在reduce函数内部,value集合里所有的数字被相加,然后输出。reduce的输出就是<<2, 25>, 2>。


计算过程如下图示例:


group by的MapReduce计算过程示例


这样一条很有实用价值的SQL就这样被很简单的MapReduce计算过程处理好了。在数据仓库中,SQL是最常用的分析工具,那么有没有能够自动将SQL生成MapReduce代码的工具呢?这个工具就是Hadoop大数据仓库Hive。


自动将SQL生成MapReduce代码的工具——Hive


Hive能够直接处理用户输入的SQL语句(Hive的SQL语法和数据库标准SQL略有不同),调用MapReduce计算框架完成数据分析操作。具体架构如下图:


Hive架构


用户通过Hive的Client(Hive的命令行工具,JDBC等)向Hive提交SQL命令。如果是创建数据表的DDL语句,Hive就会通过执行引擎Driver将数据表的信息记录在Metastore组件中,这个组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联HDFS文件路径等这些数据库的meta信息(元信息)。


如果用户提交的是查询分析数据的DQL语句,Driver就会将该语句提交给自己的编译器Compiler进行语法分析、语法解析、语法优化等一系列操作,最后生成一个MapReduce执行计划。然后根据该执行计划生成一个MapReduce的作业,提交给Hadoop MapReduce计算框架处理。


对于一个较简单的SQL命令,比如:


SELECT * FROM status_updates WHERE status LIKE ‘michael jackson’;


其对应的Hive执行计划如下图:


Hive执行计划示例


Hive内部预置了很多函数,Hive的执行计划就是根据SQL语句生成这些函数的DAG(有向无环图),然后封装进MapReduce的map和reduce函数中。这个例子中,map函数调用了三个Hive内置函数TableScanOpoerator、FilterOperator、FileOutputOperator,就完成了map计算,而且无需reduce函数。


除了上面这些简单的聚合(group by)、过滤(where)操作,Hive还能执行连接(join on)操作。上面例子中,pv_users表的数据在实际中是无法直接得到的,因为pageid数据来自用户访问日志,每个用户进行一次页面浏览,就会生成一条访问记录,保存在page_view表中。而年龄age信息则记录在用户表user中。如下图:


page_view表和user表示例


这两张表都有一个相同的字段userid,根据这个字段可以将两张表连接起来,生成前面的pv_users表,SQL命令如下:


SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid);


同样,这个SQL命令也可以转化为MapReduce计算,如下图:


join的MapReduce计算过程示例


join的MapReduce计算过程和前面的group by稍有不同,因为join涉及两张表,来自两个文件(夹),所以需要在map输出的时候进行标记,比如来自第一张表的输出value就记录为<1, X>,这里的1表示数据来自第一张表。这样经过shuffle以后,相同的key被输入到同一个reduce函数,就可以根据表的标记对value数据求笛卡尔积,输出就join的结果。


在实践中,工程师并不需要经常编写MapReduce程序,因为网站最主要的大数据处理就是SQL分析,在Facebook,据说90%以上的MapReduce任务都是Hive产生的。Hive在大数据应用中的作用非常重要。




更多数据领域的先进理念与实战经验,尽在2018 Gdevops全球敏捷运维峰会北京站!峰会议题覆盖AIOps与DevOps落地、数据库选型、SQL优化、技术管理等多方面实战,全方位为你充电!


↓↓↓ 点击链接了解更多详情 ↓↓↓

2018 Gdevops全球敏捷运维峰会-北京站

 
DBAplus社群 更多文章 58速运架构实战:拆分服务与DB,突破“中心化”瓶颈 29岁成为阿里P8,怎么在五年内完成晋升三连跳? 从分库分表后遗症,总结数据库表拆分策略 华为软件BSS老枪的离职感言:山重水复疑无路,期待柳岸花明又一村 实时数据平台设计:解决从OLTP到OLAP实时流转缺失
猜您喜欢 湾区一月见闻 DevOps转型成功之路 - 误区、实践和实施路径 马化腾:更开放的腾讯,要做互联网的连接器 ZStack v1.0 正式版发布 一个优秀的运营应该具备这四方面数据分析的能力