微信号:infoqchina

介绍:有内容的技术社区媒体

保持简单:Uber流处理架构演进的四字箴言 | 附124页PPT下载

2016-06-17 08:00 袁泳
作为一款即时用车软件,为了保障城市交通健康运行,Uber需要实时了解:哪里是不是打不到车?市场未来可能出现哪些问题?如何应对问题?系统是不是存在哪些bug?Uber从场景出发,抽象出「数据」和「函数」,并通过足够简单的架构(运算、存储层)支撑,而保持简单、恰恰体现了Uber技术团队的功力。

本文整理自Uber软件工程师袁泳在ArchSummit全球架构师峰会的分享。

整理:Tony & Betty

后台回复关键词:【Uber】,获取PPT下载链接。

视频观看戳这里:Uber的流处理系统和实践有什么特别的?丨视频

老司机介绍

袁泳Uber软件工程师。袁泳目前负责Uber物流平台的流处理系统。在加入Uber之前,袁泳曾参与Netflix的云平台构建。工作领域包括动态扩容、分布式跟踪服务、每天千亿级别事务量的实时数据流处理以及Netflix的低延迟加密服务。

今天给大家的分享的是Uber的流处理应用,让我们一起看看,一个新生的系统应该怎么样搭建?怎么样为我们的产品服务?

Uber不仅是一款打车软件,Uber的目标是让交通变得非常方便,司机、乘客、车辆与城市融为一体。就像自来水一样,你打开你的水龙头,水就流了出来。或者你打开你的手机,你就应该有车,你就可以打车。

1
城市的脉搏:我们需要怎样的流数据?

大街小巷、车和乘客是城市的脉搏,而城市的脉搏实际上是由我们的流数据来提供的。现在的问题就是什么样的数据,我们能真正感兴趣?

1.1 市场动态

我们需要知道的是,这个打车市场是不是健康,我们需要知道的是不是某一个角落也许用户不能打车,那如果我们的市场不够健康的话,我们能做什么事情?所以 我们第一想知道的是市场动态,到底现在这个市场是不是平衡、是不是有足够的供求,我们的系统是有Bug、或者我们的系统是不是有没有想到的情况。

具体到市场动态我想给大家举几个例子,看看有什么样的市场动态。比如说我们 想知道当前全球有多少车可用,这样的信息能不能够马上得到呢?实际上是可以的。

请看上图,这是个动画,是从我们的生产系统直接Capture下来的,点了按纽以后,马上就可以得到数据,可以知道全球每一个区域到底有多少车,它们的分布是怎么样的。

但是如果光知道这个也没有什么用,因为只是一个非常大略的粗略的情况,如果我们能够再细化一点,问一些非常具体的问题就复杂多了,比如说:

上图这句话中,红色的字体展现的是不同的维度,我们现在有更多的维度和更精细的问题了,我们能不能回答这样的问题呢?我们也是可以的。

我们把城市分成了几个比较大的区域,那这些大的区域里面发出一条请求,在不到一秒钟的时间内,我们可以得到整个城市交通状况的分布。

但是如果光是这样,其实还是没有多少用,因为你拿到的是非常粗略的这种分布,一个大的区域大概有几平方公里,那几平方公里里面也许只是一个社区有问题,那我们能不能够再进一步的细化呢?

还是可以,Uber把整个城市分成非常小的六边形。一个城市大概有成千上万个六边形,那现在我们想问的是过去10分钟里面旧金山每个六边形里面有多少车载客呢?也是不到一秒,于是我们知道整个城市我们把它分成成千上万个小格子,每个小格子里面交通状况怎么样,我们也是知道的。

但是到了这一步,我只知道现在发生了什么事情,如果我们发现现在发生的情况或者市场动态不是我们预计的,我们怎么知道到底什么原因造成的这种情况呢?所以我们还需要知道它的历史的变化。

1.2 历史变化

同样用这个工具,我们不仅知道我们交通状况,即车流的分布图,我们同时也可以了解各种各样的数据指标,各种各样的流出来的时序数据。这些数据也都是通过我们的后台系统,通过流处理以后拿到的,也是实时的。但目前,只能说我们在跟踪这个市场的变化。

1.3 状态跟踪

下一个应用的场景是这样的,我们想跟踪状态。如果你看着Uber的系统,实际上Uber系统是一个分布式的状态机,不管是乘客也好,还是司机也好,他们都有大量的状态,那这些状态的话,就会产生很多这种所谓的状态分布图,我们能不能够一下拿到所有状态,然后看他们的分布是怎么样的?

我们也可以,上图的数据也是通过仅仅是一条查询就可以拿到。而这个状态,实际上是通过所谓的Graph得到的,这个是旧金山所有的司机,所有的乘客,所有的状态分布在过去十分钟里面出现的情况,我们可以用一条数据把它拿到,而得到数据的速度,就像你们看到的不到一秒钟。

那我们还可以继续细化,我们甚至可以知道一位司机在过去,比如说几天的时间内,他们经历了什么样的状态,你也可以马上拿到。同时如果我们点击这条图的边、或者这个图的点,我们可以知道这个图和边相关的一些数据,我们也可以把它综合起来。

2
如何搭建系统?本质是分布式状态机

那现在问题就是,这些都是我们的应用场景,我们应该怎样搭建一个系统,能够很快的去把这些数据拿到?

优步平台的本质实际是一个分布式的状态机。我们有不同的服务,这些服务去跟踪用户的状态,用户就是司机和乘客。我暂时用这两个作为例子,我们不仅跟踪他们状态,我们也可以把它们捕捉下来,并且把它们通过事件的形式发送出去,所以各类应用就生成各种各样的事件。这些事件必须要能保证延迟不能太多,因为我们需要做实时的处理,并且事件不应该丢失,虽然说偶尔我们可以丢失一两条数据,但是如果丢失太多,会影响我们分析的精度。

3
数据挑战:多维度、细粒度、查询模式不固定...

因为我们有大量的事件,每天有上亿、甚至几十亿、上百亿,这些事件的收集必须要是低开销、易扩展的。

因为每一条消息如果你花很多钱去收集的话,那就得不偿失了,并且随着我们的流量的迅速的增长,我们要保证我们的系统也可以扩展开来,不至于因为太多的数据而导致我们系统的宕机。所以,我们数据收集的部分是Apache KafKa。

KafKa实际上是低延迟、高增量,并且是分布式的队列系统。你把数据放进去,数据会自动的分布到不同的所谓broker上面。放到broker上面后consumer很快可以把数据重新读出来,是稳定的系统。那我们现在有了KafKa,收集的数据的挑战在哪里?

有数据了,我们马上搭建一个非常简单的一个服务,然后把数据汇总起来,内存里面处理一下,然后放到一个webpage上面,好像非常的简单,那为什么对我们来说这样的系统仍然是一个挑战呢?

首先,每一条数据都是多维度的,每个事件包含着数十个字段。比如车的状态,包括它到底在哪里,它在什么时间发生的,它在哪个城市,这个车的类型是什么,还有包括我们内部去跟踪的一系列的、与我们的商务逻辑有关的这些字段。

其次,数据是细粒度的。有时我们想任意组合这些字段,从某一个角度去看这些数据,让它们组合起来到底是怎么样的。如果你单看这样的数据,其实它的挑战并不是那么大,一个城市大概有二三十个区域,那一条简单的SQL都可以完成我们需要的查询。但是,我们的数据是非常细粒度的,如果我们把我们的城市分成成千上万个小的单元,这时如果要占用一个SQL,也许这个计算量就变得非常的大了。

比如说每个城市,它至少有一万个六边形,因为我们把城市分成非常小的六边形,那我们有大概有7种车型,平均来说,一天有1440分钟,并且我们有大概13种司机的状态,300座城市,当然现在不止了,那一天的数据量满打满算,大概是三千多亿个可能组合。虽然实际不会那么多,但随着我们业务量的扩展,我们需要考虑到也许将来会扩展到这种情况。

更重要的是我们的查询模式不固定。当我们需要过滤一条消息的时候,相当于我们在SQL里面,where clause的时候,我们并不知道到底是什么样的条件,我们可以任意选取任意维度的组合。就像刚才的问题一样,也许我想知道是旧金山的Uber X的某一种、某一条数据,又也许我想知道的是北京的人民优步的正在载客、或者正在拒载的司机的信息。

那么,我们怎么样才能够非常快的切入这些数据?得到我们想要的答案呢?并且我们需要支持多种的聚合,我们可以支持那个Heat map,我们需要支持头十条、比如最繁忙的十个区域,我们需要拿到他们的直方图,我们需要去数某些司机的数量、或者求平均值、求总和......诸如此类。

并且,我们有多变的地理位置的聚合查询。这里有三张图:

在我还没有加入Uber的时候,我看到的是最左边的那个,是基于我们自己的地理划分。但是当我加入Uber的时候,我们开始用,Google Earth,类似于那个geo hash,但是它用的算法是不一样的。但我开始搭建这个系统大概是五个月之前的时候,我们开始用右边的六边形。

所以公司的业务在飞快发展,我们在不断的尝试新的东西,怎么才能找到一个系统,能够去适应不同的这种所谓Business Requirements?才能找到一个系统去支持各种各样的地理信息的聚合呢?

同时我们有一定的高流量,但并不是高到Google那种地步需要我们花大量的时间去做一个别人没有做过的系统,我们大概的流量是每秒钟大概是数十万条消息,一天大概是几百亿,并且每条消息包含几十个字段。

同时我们需要非常短的时间交货。Uber仍然是个年轻的公司,那这个年轻的公司就充满了活力,充满了活力的公司不轮是管理层,还是下面的员工都愿意尽快的完成自己的任务。

经常管理层会说,你能不能够三天之后,或者说星期五之前把什么东西交给我。那我们怎么样才能够尽快的上线一个可用的系统,帮助我们去探索我们的数据?然后在这个过程中不断进化?

大家都见过这种所谓的三角图,你只能这三个条件只能选两个,要么快、并且质量好,要么质量好、并且花销大。那我们怎么样才能够达到一定的平和和取舍,能够尽快地做出足够好的、并且足够便宜的系统呢?

4
如何应对挑战?关键:问题一般化

对我们来说,唯一的关键是把问题一般化。刚才我们举了各种各样的数据、各种各样的查询、甚至各种各样的聚合,所以我们需要找到一个抽象层,这个抽象层能够尽可能一般化的描述我们的需求。

我们 先看数据类型,在我们看来这些数据可以被抽象成所谓的多维时序数据,它有多个纬度。它是跟时间有关的,比如,上图左边是维度:状态、车的类型、时间、时间戳;右边是它的不同的值。

再看数据查询,我们决定把它看成是基于单表的、就是一张表的时空数据的在线分析平台。为什么是单表呢?因为你会发现,一旦我们引入了表的连接的话,整个计算量,或者说整个系统的复杂度会大大的增高,所以说我们宁愿把我们的情况简化一下,只支持单表。

所以我们的查询也就简化成了一个简单的英文select statemen ,唯一需要做到的是,我们必须要支持group by,必须支持Having,我们需要支持order by和Limit。并且我们需要再做一些所谓的事后的处理,那我们引入了一个扩展叫做Do,那我们现在有了自己的抽象的数据,自己的抽象层,并且有了自己的这个消息的收集系统。

5
我们为什么选择Elasticsearch做数据索引?

5.1 存储系统的最低要求

首先,我们看一下存储系统的最低要求是什么?

就像刚才的状态图里有,我们查到的某一个司机的单个的状态,所以显然那个司机的所有状态变化的原始数据都必须被存储。

5.2 键值数据库的局限

那我们先看一下能不能用所谓的KVStore或者键值数据库呢。我不知道在座的试过,支持这种查询、并且使用Redies之类的东西来做的吗?我们的结论是我们不能用这样的东西,即使KVStore,或者键值数据库非常的流行,并且它们的读写也是非常的快速,但不能用。为什么呢?

关键就是,如果我们用一个键值数据库是一键一值,换句话说,我们得知道所有的组合,每当我们查询一个聚合,一个gropu by的时候,我们得知道,你用的这个组合是什么样子的。如果我们所有的组合都是临时想起来的,而不是说事先定义好的,那不得以,我们就必须要预算所有的组合。

那再举一个例子,比如说我一个非常简单的表,只有两个维度,A和B,我们要支持各种各样的布尔操作,AND  OR  NOT,那这样的话,实际上我们就有四种组合,A但不是B,B但不是A,或者是A,或者B,或者说既不是A,也不是B,这就是四种组合。那这样实际上是对应的找到所有的子集,这个是在计算机科学里面所谓著名的幂集问题。

而这个问题的复杂度是多少呢?这个复杂度是指数级别的,那你知道既然是指数级别的,一旦我们的数据量多了,一旦我们的的维度多了,那我们的系统就会迅速的就会降级。

5.3 关系数据库的局限

那既然这样,我们下一个可能用关系数据库,但关系数据库其实也是不行的,为什么?首先是关系数据库自己有它的局限,我们非常难以管理它的索引。

比如说大家比较熟悉的MySQL Data Base,你如果有多个索引想联合起来使用,那它的性能是不怎么样的。但你要说那我就创建这种索引的话,你得事先知道所有的组合,那你又回到刚才的问题,即指数级别的组合。

并且它的扫描速度也不快。上图是一张简化的表,这个是绝大多数关系数据库用的内部的数据结构:「B-Tree  Structure」。B+Tree  Structure并不是二维的树,而是二元的树,有多个分支。并且每个分支,大概每一个结点是一个Page,就算是这样,它一层、或两层,大概可以容纳比如说上百万,甚至是上千万的数据量。但是我们要扫描的不是上百万上千万,而是上亿的数据量,在这种情况下,我们就不得已会扫描多层。

那扫描多层的问题就是,这个东西是放在磁盘里的,你知道磁盘里面扫描的代价是非常高的,最后我们实验的结果发现,如果我们用关系数据库,那这个查询的速度是不够理想的。

其实我们也经历了很多争论,其中一个争论是,有人说我的数据库真是很快,我的键值数据库,比如说Readies,每秒钟一简简单单几十个实例cluster里面我几秒钟的读写上百万次,上百万次还不够你用的,但是光快是不行的。

5.4 光快是不行的

让我们来看一下,假设一个城市里面每个六边形的车的数目是我们想要的查询,因为是一键一值,那我们如果要知道所有每个六边形的情况,那不得已我们要做一万八千次的查询。

假设这个城市有一万个六边形,假设每一次查询的平均延迟是一毫秒,这个百分位,99.99%百分位的延迟是两秒,这已经是非常宽松了,绝大部分的数据,或者当你们去监控这些数据的时候,你们需要看到是99%个分位,而不是99.99%。

假设这个失败率也是非常低的,0.001%,每十万次请求里面才有一次失败。那么当你发出一万八千个并行的请求,然后你得等着,等所有的请求拿结果回来,你再把它总合起来,那一次查询的延迟超过99.99%的概率是多少呢?这是非常简单的概率的计算,83%。

几乎每一次、每两次查询,你就会有一次失败。而一次查询的成功的概率是多少?84%。也是不到两次的查询里面就会有一次彻底的失败,换句话说,这个系统是不行的。

5.5 那到底用什么样的系统呢?

我们先来重新回顾一下我们最低的要求:快速扫描,布尔查询,原始数据,各种各样的聚合。

最后我们决定用一个不是数据库的数据库,Elasticsearch。

为什么?因为如果你看这个Search engine,它解决了我们的维度问题,Search engine就是用来处理高维度的,不管你有多少个维度,你都可以把它看成一个索引,所以它是一个基于高效倒排索引的布尔查询的一个engine。

并且它已经内建了分布式的查询。你不需要说我只有一台机器,因为Elasticsearch用的是Lucene只是一个Libary,那你也不需要说我只有一台机器,我不能够处理这么大的流量,因为它已经帮你分布式做好了。如果你有大量的查询,他可以自动的把这个查询分布到不同的机器上面让它们同步执行。并且它的扫描是非常快速的,它的聚合也是非常灵活的,实际上Elasticsearch提供各种各样的聚合函数让我们使用。

最后,这就是我们得到的架构:用KafKa来收集数据,并且用Elasticsearch来做数据的索引。

并且我们把Elasticsearch分成两个不同的cluster,我们叫做Warm storage,因为有些数据刚刚采集的数据往往也是大家马上想用的数据,所以我们把它全部放在内存里面,并且用SSD作为那个长久的介质。如果是几个月前的数据,那这些数据也许没人需要,但是我们可以等,所以我们就把它放到所谓的cold storage里面。

6
数据转换:为什么选择Samza?

那我们做完了吗?还没有,因为如果你看我们需要的查询,还涉及到有很多各种各样的数据的转换。比如说我们拿到的原始的数据总是拿到它的经纬度,拿经纬度的话,并不能提供英文,并不能提供邮编、也不能提供六边形所在的位置,所以我们需要有一个系统执行这些基本的转换。

我们有新的应用场景,比如需要做动态定价、需要做趋势预测,我们需要根据我们历史数据知道一小时后会发生什么情况、两小时后会发什么情况、甚至一天之后会发生什么样的情况?

我们还需要做一些所谓供求分布的分析,就是我们的那个可视化组做的一些可视化动画。它给我们体现的是,他们拿到我们的数据,然后动态的展示出来,到底哪些地方,哪些地方有很多打车的人?也就是黄色那些点。哪些地方有很多车?也就是蓝色点。有时候你看到蓝色点和黄色点并不是互相重合的。但是我们为什么要这些数据呢?

我们并不是说要这些数据看着好看,它一定要有具体的用处,那从技术上说呢,我们需要知道哪些地方有所谓的热点,或者说有大量的供求的分布。并且我们想知道,它们之间对司机来说会不会有一些概率上的分布,比如说如果我们给定一个司机,并且给定一个他的起始位置和接受位置,我们想知道这个司机他有多大的概率,从给定的起始位置开到给定的结束的位置。换句话说,我们有新的场景,我们也有了新的需求,那根据需求,我们需要进一步的改善我们的系统。

第一步需要改善的就是所谓的预处理。我们有大量的事件、大量的数据,这个数据需要我们马上就把它们改变一下,让我们的查询变得更加的方便。

比如说,记得我们是单表,万一我们的确需要查询两个表呢?那我们就用预处理的这个系统,把不同的表、或者数据流先把它们连接起来,那同时我们需要做一个Sessionization的工作。比如你怎么知道这个司机什么时候上线的,这个司机到底什么时候下线的,或者说这个司机的状态的改变之间它们的延迟是多少,有时候我们通过我们的数据直接的看出来,但是有些时候,我们的确需要通过某一些特征来把这个所谓的这个Session,或者这段绘画给推断出来,这个工作也需要通过一个系统来完成。

有时候,我们需要做多级的处理,也许这个数据的处理并不是简简单单的做一次简单的转化,也不一定仅仅是做一次的简单的索引,我们有时候需要把这些数据先分派开来,根据某种原则把它们汇总起来,再分到另外一个地方,又根据另外一套原则再重新做一次处理。那怎么样去处理这种多级的处理呢?我们还需要做状态管理,如果我们做Sessionization我们总得需要维持一段时间的所谓的time window。再比如,在过去的半个小时之内我们需要看这半个小时里面发生了什么事情,要维护一个半个小时的滚动的窗口,这样的工作,就叫做状态管理。

那我们有这么多的需求,用什么样的系统呢?最后我们决定用Apache  Samza。

Apache  Samza其实是一个非常聪明的系统。我觉得很牛的地方就是,大家都很熟悉所谓的Hadoop  System,尤其是下一代的Hadoop  System,那Hadoop它的计算模型其实是非常普世的,或者说非常简单的,你无非就是用Map  Reduce,你把你的数据通过一定的键值给shuffle一下,放到不同的Map上面去处理,最后又放到不同的Reduce上面给组合起来。但为什么,我们不用Hadoop呢?因为Hadoop不够实时。

如果你仔细看它不够实时的原因,主要就是因为它的这个持久化太花时间了,在某一方面,至少在我们要做的事情里面,持久化,你每一次都会需要先把你的数据先写到你的HDFS里面,中间的,再从HDFS里面读出来,那这个过程实际上是非常花时间的。

LinkedIn的工程师就发现,既然你的计算模型是非常好的,你唯一的瓶颈,或者说在这种应用场景下,你的瓶颈是HDFS,那就把HDFS就换了,换成什么呢?换成一个高速的,单向的读写系统,也就是KafKa。换句话说,这个就是,我们把它叫做DAG on KafKa,DAG on KafKa是有向图,无环有向图,所以说你把计算抽象成一个无环的有向图,放在KafKa上面一下子,你就有了一个高速的时时的预处理系统,并且他给Samza提供一流的KafKa整合。

如果大家熟悉Kafka系统,会知道大的KafKa读数据的时候并不是非常稳定的,尤其是你用它的high level consumer,经常会遇到一些奇怪的问题,而在Samza里面,他们重新的去实现了这套系统,让这个KafKa message这种读写变得非常的方便,并且还有内置的检查点。如果一个Samza任务突然宕掉了,没关系,回到上一个检查点,然后从那个地方开始,不需要从头去完成你的任务,并且内置的状态管理,你的状态不光是放在内存里面,也可以放在你的磁盘上面,它有一个内置的数据库,并且这数据库本身也可以被复制到不同的节点里面,去保证可以容灾容错。

现在的话,我们的架构就变成了这样的架构,仍然比较简单,我们有一个处理系统,我们有一个存储系统。

7
批处理系统:Spark

现在的问题就是如果存储层宕掉怎么办?Elasticsearch会宕掉的,甚至你会在一些罕见的情况下甚至会丢失数据。还有一个问题,如果这个实时的域处理耗时太久怎么办?甚至你根本不可能得到实时的数据,但是里仍然想执行实时的查询怎么办?那我们就加上一个批处理系统,也就是下图红框标出来的Spark。

我们有了Spark,我把那些数据放在了一个更可靠的地方。如果我们的储存系统宕掉了,我们可以做这种back fill,就是把历史的数据重新计算一遍,放回我们的储存系统,如果有些批处理或者域处理耗时太长的话,那我们就把它放在批处理里面来做。

查询结果转换和平滑处理

这下终于搞定了,我们有收集系统,我们有预处理系统,我们有储存系统,一个系统好像已经成型了。但是,我们仍然有更多的需求!

这是刚才出现的粗线图:右边是我们的热点分布,左边是各种各样的时序数据。你会发现在右边还有一个白边框出来的一个更大的区域,我们想知道的并不是单个的点它的时序数据的分布到底是怎么样的。我们想知道的是给定任意区域,那相关的数据是怎么样分布的,那这样的话,就需要做后期处理,比如说查询结果的转换和平滑处理,那平滑处理你刚才看到那张图,你们会觉得那张图像什么?

拿上图做示例,这是一张低分辨率的图片,于是我们会做一些图案处理,比如「平滑处理」。即用算法把每个单个的六边形里面的数据、平滑地分到相邻的六边形里面。原因很简单,假设在一个六边形里有一百辆车,而旁边的邻居的六边形里一辆车也没有,那么你能说这个邻居打不到车吗?

显然是能打到车,他只是需要多等几秒钟。在这种情况下最好的办法是,把那100辆车的六边形的数据分一点给它的邻居。这样的话,这个整个数据的分布会变得更加现实一点。我们看一下计算的规模:一个城市至少十万个六边形,每个六边形至少需要331个邻居需要处理,其实不只331个,但是大致是这样的,在比较好的情况下面,所以说一次的查询,我们需要310万个六边形。

这个数据看起来比较大,因为每个六边形的处理其实我们需要做一些比较复杂的计算。如果把它看成一张图片,计算量不是那么大的,关键是看你用什么样的算法,用什么样的架构,最后我们能够做到的Poformance是99个百分位的是70毫秒,这个地方我需要强调的是,一定要用简单的架构,不要用复杂的架构。

这篇论文非常有意思,是Frank McSherry在今年年初的Hard  Ways上面的一篇文章:《Scalability  But  at  what  COST?》。他假设的是什么呢?特有意思,我给你们一个例子,你们就知道了。

他考察了很多种常见的分布系统和一些复杂的计算,比如下图他考察了四种系统Spark、Giraph、GraphLab和GraphX,都是图处理系统,或者说是一些通用的处理系统。

那这四个系统动用了128个Core,CPU Core,他们干的事情是PageRanking,就执行这个Page Ranking算法,并且用两种不同的图,一个是Tuitter-RV,一个是Uk-2007-05,最后发现他们大概差不多,花的时间是从两百秒到甚至上千秒。

但是如果你用一个Labtop,用一个Core,只是改动一下你的算法,你会发现你只需要用153到417秒,再稍微的优化一下,15秒就够了。换句话说,当你用能够一台机器处理这个问题的时候,你为什么要用很多台机器呢?Paul  Graham就说了个很有意思的:,你有权利用第二台机器,前提是你得知道怎么样用一台机器。

我们来看一下我们的后期处理。后期处理的每一个处理单元,或者刚才我提到的那些计算,不管是平滑处理、或者预测,本质上都是数学计算。每个处理单元都是纯粹的函数,数据进去,数据出去,没有任何状态的改变,所以说我们完全可以把它们抽象成就是函数,并且我们可以创造各种各样的组合算子。

把这些函数组合起来就是数学的强项,有这些强项我们就何以用。并且我们可以执行非常高度并行化的执行,可以用流水线的方法。流水线是一个非常强大的方法,当你拿到一个数据之后,你拿到的是整个程式的数据,但是你并不一定等到整个程式的数据都就位以后你才去处理整个单元。

相反你可以猜出来,或者说计算出来,到底他们这些计算处理之间有什么样的依赖关系。当这个依赖关系不存在的时候,就并行处理,只有让你需要等的时候你再等一下,光是这一条就可以让我们的速度提高十倍左右。

我们所有这些东西、即一个程式的算法,就是最后在一台机器上做的。几年前,刚开始做架构的时候用Python,那是一个分布式的管理任务的一个平台。你会发现,我们只有比如说上千行的业务逻辑,但会有几千行的平台逻辑,平台逻辑和业务逻辑还交织在一块,让这个代码变得无比的复杂。而且还把这些非常简单运算任务分布到多台机器上去,你分布也要花时间的,你等也要花时间的,你聚合也要花时间,最后的结果就是,一个计算虽然说我们现在可以用一台机器做,当时要用好多台机器,甚至上十台机器。

那么我们再看一下,我们还有一些务实的考量。比如数据发现,我们有了这套系统,但是用户呢?用户会说,你有什么数据,你有什么维度?我可以做什么样的query?所有这些问题的话都需要用一个在数据库领域叫做所谓的cutting log service,你要告诉大家数据在哪里,它们有样什么样的特征,你可以做什么样的操作。

同时,Elasticsearch的查询语句过度复杂,毕竟呢这是做Search engine用的,而我们把它用来作为一个分析的平台。上图是一个例子,这是一个非常简单的程序,这么多行,这么多东西,如果每个用户都要学习的话,实际上是会非常花时间的,但是如果你把它抽象成我们刚才说的简单的单表的OLAB查询,实际上就是一个SQL query,非常简短,也非常直白。

并且Elasticsearch查询自己也可以优化,再一次地我们可以用流水线的优化,我们必须要查询以后做一些验证的工作,比如说有时候一条查询也许会用掉上GB的内存,那这个时候我们需要知道,这个查询的话花的时间和花的资源太多了,我们应该把它切分成非常多的小的查询,最后把它聚合起来。

并且有时候,有些用户也许他不小心,一下会发出,比如说上千个查询,那这个时候时候的话,我们需要对它们做进行限速。这是我们当时做的一个优化,非常简单的优化,就改了一下那个filter的位置,结果我们优化的成果是大概80倍的提速,所以可以看到,虽然说我们有了存储层,我们仍然需要做一些工作让这个存储层变得足够的好用。

Elasticsearch也许可以替换,谁说我们用了这套存储系统就一定要永远的用下去,也许我们的业务高速的发展,我们发现这套系统已经不能再用了,也许我们有了新的应用场景,我们发现我们需要用一套自己的全新的系统。那这个时候,怎么样替换我们的储存系统,而不影响我们终端的用户呢?所以我们说我们就加了一层Query,这一层服务他要做得事情就是,执行后期的处理,比如平滑操作,并且执行Query的优化,诸如此类。

 复杂行为分析

那在这上面其实我们还做了一件事情,如果观察这些数据流你会发现,数据流里面总是有不同的模式,或者不同的规律。我们想发现这些规律,并且我们总是有需要快速的去探索,去试验这些规律,或者摸索这些规律的需要。

我举两个例子,一个例子是,多少司机在5分钟内会连续取消请求五次以上,很显然,这些司机也许有什么难言之隐,或者他们遇到什么样的技术困难,如果我们能够非常快的发现这些司机,也许我们可以做出相应的反映。再比如,哪些乘客在半小时之内,在相距超过一百公里的地方相继叫车,非常夸张,这个在中国就经常发生,所以说我们需要能够迅速的去查找这样的情况,并且把它们做出相应的反映,那这个是我们需要用的叫做Complex  Event  Processing。

这个东西实际上相当于,数据在源源不断的流过我们的系统,我们在中间加了个黑盒子,这个黑盒子允许用户去执行一段非常简单的查询语句,并且语句一定要支持所谓的window function方式,而且我不用看单个的语句,我看的是过去半小时,一个滚动窗口里面的所有的信息,并且发现它有什么样的规律。

有了这个规律以后我们可以做什么事情呢?或者我们发现了这个规律可以做什么事情呢?我想提醒大家看到最后一行,Hipchat是一家澳大利亚公司提供的聊天软件,我们用这个聊天软件做不同组织之间的协作,一个语句发现某种模式,然后把这个消息发布到这个聊天室里面供大家看到,所以你很快就知道这个到底这个司机干了什么事情,到底是哪些司机,诸如此类。

更重要的是,其实这个实现是非常简单的。一旦有前面的系统以后,我们已经做出了,这套系统已经足够的普适了,它可以执行各种各样的计算,它已经解决了分布的问题,它可以把这些数据流分布到不同的处理单元上面。我们只需要解决的是,在每一个处理单元上面加一个CEP的模块就行。

为什么我提这个呢?我想说的是,一旦你的架构足够简单,或者说足够明白的话,加一些额外的功能其实是非常方便的,刚才我讲的就是我们做得流处理的一些应用。谢谢大家!

7
总结

Uber的架构总共只有四层,消息收集、处理、存储和请求,好的架构之所以健壮易扩展恰恰是因为其足够简单,并且选择那些适合的技术栈。当然,前期对业务的抽象也十分关键,找到普遍的需求是做好架构设计的基础。


百度开放云移动游戏和直播技术解读—本期百度技术沙龙将邀请百度开放云内部专家,分享移动游戏和直播在发展中对云的需求痛点,以及百度开放云在移动游戏和直播上的技术优势,并就相关实际案例给出具体讲解。

阅读原文,免费报名!



本文系InfoQ原创首发,未经授权谢绝转载。

 
InfoQ 更多文章 年前挖的坑都填了吗?技术债务偿还计划 程序员VS武林高手:技术为外功,思维乃内力 腾讯游戏大数据服务场景与应用(附PPT) 偷师饿了么:怎样用HTTP/2优化iOS APP网络层次架构? 作为高颜值的女程序员是一种怎样的体验?
猜您喜欢 送好礼啦!只要装管家QQ公仔就是你的!(活动预告) 雪晴直播课堂预告:《利用R语言进行数据处理及建模》 推荐!国外程序员整理的 PHP 资源大全 一个捡破烂的!三年竟然赚了270万 一位俄罗斯程序员移民美国的故事