微信号:we21cto

介绍:21CTO(21CTO.com)是中国项级技术专家的学习与服务平台.我们为CTO、技术总监、架构师等技术专家提供高质量的资讯、问答、活动等产品,同时与企业连接,提供技术咨询、研发、运维、技术支持、培训及人才招聘等服...

数据流是如何工作的?

2018-07-12 00:04 21CTO

21CTO导读:在本文中,我们将介绍Node.js和Java Streams以及Apache Kafka和Amazon Kinesis等工具,并简述每个工具的用法。



流是一系列元素的合集,就如同数组是存储值序列的数据结构。那么,一个流就是一个数组?呃,不是的 - 让我们看一下流到底是什么,来看它是如何工作的。


首先,数据流不存储元素,而数组存储元素。所以流并不是数组。此外,虽然集合和数组是有限的大小,但流是无限制。但是,如果流不存储元素,它如何成为一系列元素?


流实际上是一系列数据从一个点移动到另一个点,但它们是按需计算的。因此,流至少要有一个源,如数组,列表,I/O资源等。让我们以一个文件为例:当一个文件被打开进行编辑时,它的全部或部分会保留在内存中,允许进行更改,所以只有当它关闭时,才能保证不会丢失或损坏任何的数据。


幸运的是,流可以按块读取/写入数据块,而不会立即缓冲整个文件。您知道,缓冲区是物理内存存储区域(通常是RAM),用于在数据从一个位置移动到另一个位置时临时存储数据。


在Node.js中,存在四种数据流类型,如下:


可写:可以写入数据的流。例如,写入文件,发送HTTP请求/响应。

可读:可从中读取数据的流。例如,从文件读取,接收HTTP请求/响应。

双工:可读和可写的流。例如,TCP套接字。

转换:可以在写入和读取数据时修改或转换数据的双工数据流(例如,zlib压缩文件)。


在流上运行并生成另一个流的函数称为过滤器,可以在管道中连接,如下代码所示:


Arrays.asList(10,3,13,4,1,52)

  .stream()

      .filter(number -> number % 2 == 0) //10,4,52

      .sorted() //4,10,52

      .skip(1) //10,52

      .forEach(System.out::println); //prints 10 and prints 52


谈到Java Streams,Java提供给开发者较轻松处理的API。 JavaDoc中这样定义:


流操作分为中间(流生成)操作和终端(生成值)操作。


所以,如果我这样做时:


List<Integer> numbers = Arrays.asList(10,3,13,4,1,52);

Stream<Integer> numberStream = numbers.stream()

      .filter(number -> number % 2 == 0) //10,4,52

      .sorted() //4,10,52

      .skip(1) //10,52

      .peek(System.out::println); //used to execute something while stream is processing



流尚未执行,因为它足够聪明地等待终端操作被调用,如forEach,reduce,anyMatch等。 除了具有声明样式之外,一旦满足终端操作,它也足够智能地停止。 请看如下代码:


Integer integer = Arrays.asList(10,3,13,4,1,52)

    .stream()

      .filter(number -> number % 2 == 0)

      .sorted()

      .skip(1)

      .peek(System.out::println) //it prints only 10 instead of 10 and 52

      .findFirst().get();



以上的数据流上有sorted(),filter方法将在其流上运行,但skip不会在整个过滤和排序的流上运行。再看如下代码例子:



Integer integer = Arrays.asList(10,3,13,4,1,52)

    .stream()

      .filter(number -> number % 2 == 0)

      .findFirst().get();





有些人可能认为过滤器会在每个元素上运行然后再做查找第一个操作。但我们前面说过,Java的数据流处理足够智能。


Java Streams的另一个有趣的事情是并行流。如下代码:



Arrays.asList(10,3,13,4,1,52,2,6,8)

    .parallelStream()

      .filter(number -> number % 2 == 0)

      .forEach(number -> System.out.println(Thread.currentThread())); //prints which thread is being executed



当流并行执行时,Java运行时将流分区为多个子数据流。聚合操作迭代并且并行处理这些子流,然后再组合结果。




以上,我们已经理解了流的工作原理,再看一些工具。


Apache Kafka


Kafka是一个分布式流媒体平台,具有三个主要功能:


1、发布和订阅记录流,类似于消息队列或企业消息传递系统。

2、以容错,持久的方式存储记录流。

3、记录处理流的发生时间。


其目的是实现流的实时处理,使用Kafka Connect支持许多数据源(例如JDBC,ActiveMQ,REST API等)。 Kafka一些用例包括:消息传递,网站活动跟踪,度量标准,日志聚合,流处理,事件源和提交日志。


以下是使用Kafka Streams API 应用的内部架构。 它提供了包含多个流线程的Kafka Streams应用的逻辑视图,每个线程包含多个流任务




亚马逊Kinesis


Amazon Kinesis是完全托管的Amazon Web Service(AWS)内的产品,用来实时收集,处理和分析视频和数据流。 Kinesis有以下四种功能:


1、Kinesis视频流 - 捕获,处理和存储视频流。

2、Kinesis数据流 - 捕获,处理和存储数据流。

3、Kinesis Data Firehose - 将数据流加载到AWS数据存储中。

4、Kinesis Data Analytics - 使用标准SQL分析数据流。


其目的还在于实现流的实时处理以及一些实际用例:构建视频分析应用程序,从批处理演变为实时分析,构建实时应用以及分析IoT物联网设备数据等。


以下是Kinesis数据流的工作原理:



小结


介绍了流的工作原理。我们一起看到了关于Node.js流和Java Streams以及Apache Kafka和Amazon Kinesis等工具的优点和缺点。内容介绍完毕,希望您能喜欢本文,欢迎点赞转发。


作者:Raphael Amoedo

译者:海力布 

来源:https://dzone.com/articles/how-a-stream-works



 
21CTO 更多文章 物联网技术如何赋能教育培训行业? 八个面向开发人员的机器学习平台 忘记整洁代码吧,来开发有同情心的代码! 互联网后端基础设施 扎克伯格身家超越巴菲特,硅谷首次霸占全球富豪前三
猜您喜欢 同行评审之惑 Kotlin 1.0 正式发布: JVM 和 Android 上更好用的语言 《绝地求生》角色、场景、载具及武器设计赏析 是性格决定命运,还是命运造就性格?