第四篇|Spark Streaming编程指南(1)
Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分。Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今已经成为了在企业中广泛使用的流处理平台。在2016年7月,Spark2.0版本中引入了Structured Streaming,并在Spark2.2版本中达到了生产级别,Structured Streaming是构建在Spark SQL之上的流处理引擎,用户可以使用DataSet/DataFreame API进行流处理,目前Structured Streaming在不同的版本中发展速度很快。值得注意的是,本文不会对Structured Streaming做过多讲解,主要针对Spark Streaming进行讨论,包括以下内容:
- Spark Streaming介绍
- Transformations与Output Operations
- Spark Streaming数据源(Sources)
- Spark Streaming 数据汇(Sinks)
Spark Streaming介绍
什么是DStream
Spark Streaming是构建在Spark Core的RDD基础之上的,与此同时Spark Streaming引入了一个新的概念:DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。DStream抽象是Spark Streaming的流处理模型,在内部实现上,Spark Streaming会对输入数据按照时间间隔(如1秒)分段,每一段数据转换为Spark中的RDD,这些分段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。如下图所示:
如上图,这些底层的RDD转换操作是由Spark引擎来完成的,DStream的操作屏蔽了许多底层的细节,为用户提供了比较方便使用的高级API。
计算模型
在Flink中,批处理是流处理的特例,所以Flink是天然的流处理引擎。而Spark Streaming则不然,Spark Streaming认为流处理是批处理的特例,即Spark Streaming并不是纯实时的流处理引擎,在其内部使用的是microBatch
模型,即将流处理看做是在较小时间间隔内(batch interval)的一些列的批处理。关于时间间隔的设定,需要结合具体的业务延迟需求,可以实现秒级或者分钟级的间隔。
Spark Streaming会将每个短时间间隔内接收的数据存储在集群中,然后对其作用一系列的算子操作(map,reduce, groupBy等)。执行过程见下图:
如上图:Spark Streaming会将输入的数据流分割成一个个小的batch,每一个batch都代表这一些列的RDD,然后将这些batch存储在内存中。通过启动Spark作业来处理这些batch数据,从而实现一个流处理应用。
Spark Streaming的工作机制
概览
- 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上
- 每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)
- Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据
执行细节
- 1.启动StreamingContext
- 2.StreamingContext启动receiver,该receiver会一直运行在Executor的task中。用于连续不断地接收数据源,有两种主要的reciver,一种是可靠的reciver,当数据被接收并且存储到spark,发送回执确认,另一种是不可靠的reciver,对于数据源不发送回执确认。接收的数据会被缓存到work节点内存中,也会被复制到其他executor的所在的节点内存中,用于容错处理。
- 3.Streaming context周期触发job(根据batch-interval时间间隔)进行数据处理。
- 4.将数据输出。
Spark Streaming编程步骤
经过上面的分析,对Spark Streaming有了初步的认识。那么该如何编写一个Spark Streaming应用程序呢?一个Spark Streaming一般包括一下几个步骤:
1.创建
StreamingContext
2.创建输入
DStream
来定义输入源3.通过对DStream应用转换操作和输出操作来定义处理逻辑
4.用streamingContext.start()来开始接收数据和处理流程
5.streamingContext.awaitTermination()方法来等待处理结束
object StartSparkStreaming { |
Transformations与Output Operations
DStream是不可变的, 这意味着不能直接改变它们的内容,而是通过对DStream进行一系列转换(Transformation)来实现预期的应用程序逻辑。 每次转换都会创建一个新的DStream,该DStream表示来自父DStream的转换后的数据。 DStream转换是惰性(lazy)的,这意味只有执行output操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation
。
Transformations
Spark Streaming提供了丰富的transformation操作,这些transformation又分为了有状态的transformation和无状态的transformation。除此之外,Spark Streaming也提供了一些window操作,值得注意的是window操作也是有状态的。具体细节如下:
无状态的transformation
无状态的transformation是指每一个micro-batch的处理是相互独立的,即当前的计算结果不受之前计算结果的影响,Spark Streaming的大部分算子都是无状态的,比如常见的map(),flatMap(),reduceByKey()等等。
- map(func)
对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream
/** Return a new DStream by applying a function to all elements of this DStream. */ |
- flatMap(func)
与map相似,但是每个输入项可用被映射为0个或者多个输出项
/** |
- filter(func)
返回一个新的DStream,仅包含源DStream中满足函数func的项
/** Return a new DStream containing only the elements that satisfy a predicate. */ |
- repartition(numPartitions)
通过创建更多或者更少的分区改变DStream的并行程度
/** |
- reduce(func)
利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream
/** |
- count()
统计源DStream中每个RDD的元素数量
/** |
- union(otherStream)
返回一个新的DStream,包含源DStream和其他DStream的元素
/** |
- countByValue()
应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数,比如lines.flatMap(_.split(" ")).countByValue().print()
,对于输入:spark spark flink
,将输出:(spark,2),(flink,1)
,即按照元素值进行分组,然后统计每个分组的元素个数。
从源码可以看出:底层实现为map((_,1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions),即先按当前的元素映射为一个tuple,其中key即为当前元素的值,然后再按照key做汇总。
/** |
- reduceByKey(func, [numTasks])
当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来
比如:lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).print()
对于输入:spark spark flink,将输出:(spark,2),(flink,1)
/** |
- join(otherStream, [numTasks])
当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新Dstream
/** |
- cogroup(otherStream, [numTasks])
当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组
> // 输入:spark
> // 输出:(spark,(CompactBuffer(1),CompactBuffer(1)))
> val DS1 = lines.flatMap(_.split(" ")).map((_,1))
> val DS2 = lines.flatMap(_.split(" ")).map((_,1))
> DS1.cogroup(DS2).print()
>
/** |
- transform(func)
通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作
>// 输入:spark spark flink
>// 输出:(spark,2)、(flink,1)
>val lines = ssc.socketTextStream("localhost", 9999)
>val resultDStream = lines.transform(rdd => {
> rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
>})
>resultDStream.print()
>
/** |
有状态的transformation
有状态的transformation是指每个micro-batch的处理不是相互独立的,即当前的micro-batch处理依赖于之前的micro-batch计算结果。常见的有状态的transformation主要有countByValueAndWindow, reduceByKeyAndWindow , mapWithState, updateStateByKey等等。其实所有的基于window的操作都是有状态的,因为追踪整个窗口内的数据。
关于有状态的transformation和Window Operations,参见下文。
Output Operations
使用Output operations可以将DStream写入多外部存储设备或打印到控制台。上文提到,Spark Streaming的transformation是lazy的,因此需要Output Operation进行触发计算,其功能类似于RDD的action操作。具体详见下文Spark Streaming 数据汇(Sinks)。
Spark Streaming数据源
Spark Streaming的目的是成为一个通用的流处理框架,为了实现这一目标,Spark Streaming使用Receiver来集成各种各样的数据源。但是,对于有些数据源(如kafka),Spark Streaming支持使用Direct的方式去接收数据,这种方式比Receiver方式性能要好。
基于Receiver的方式
Receiver的作用是从数据源收集数据,然后将数据传送给Spark Streaming。基本原理是:随着数据的不断到来,在相对应的batch interval时间间隔内,这些数据会被收集并且打包成block,只要等到batch interval时间完成了,收集的数据block会被发送给spark进行处理。
如上图:当Spark Streaming启动时,receiver开始收集数据。在t0
的batch interval结束时(即收集完了该时间段内的数据),收集到的block #0会被发送到Spark进行处理。在t2
时刻,Spark会处理t1
的batch interval的数据block,与此同时会不停地收集t2
的batch interval对应的block#2。
常见的基于Receiver的数据源包括:Kafka, Kinesis, Flume,Twitter。除此之外,用户也可以通过继承 Receiver抽象类,实现onStart()
与onStop()
两个方法,进行自定义Receiver。本文不会对基于Receiver的数据源做过多讨论,主要针对基于Direct的Kafka数据源进行详细解释。
基于Direct的方式
Spark 1.3中引入了这种新的无Receiver的Direct方法,以确保更强的端到端保证。该方法不是使用Receiver来接收数据,而是定期查询Kafka每个topic+partition中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。启动用于处理数据的作业时,Kafka的简单consumer API用于读取Kafka定义的偏移量范围(类似于从文件系统读取文件)。请注意,此功能是在Scala和Java API的Spark 1.3引入的,在Python API的Spark 1.4中引入的。
基于Direct的方式具有以下优点:
- 简化并行读取
如果要读取多个partition,不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间,有一一对应的关系。
- 高性能
如果要保证数据零丢失,在基于Receiver的方式中,需要开启WAL机制。这种方式其实效率很低,因为数据实际被复制了两份,kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于Direct的方式,不依赖于Receiver,不需要开启WAL机制,只要kafka中做了数据的复制,那么就可以通过kafka的副本进行恢复。
- Exactly-once语义
基于Receiver的方式,使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制,可以保证数据零丢失的高可靠性,但是却无法保证Exactly-once语义(Spark和Zookeeper之间可能是不同步的)。基于Direct的方式,使用kafka的简单API,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据时消费一次且仅消费一次。
Spark Streaming集成kafka
使用方式
使用KafkaUtils添加Kafka数据源,源码如下:
def createDirectStream[K, V]( |
具体参数解释:
K:Kafka消息key的类型
V:Kafka消息value的类型
ssc:StreamingContext
locationStrategy: LocationStrategy,根据Executor中的主题的分区来调度consumer,即尽可能地让consumer靠近leader partition。该配置可以提升性能,但对于location的选择只是一种参考,并不是绝对的。可以选择如下方式:
- PreferBrokers:Spark和Kafka运行在同一个节点上,可以使用此种方式
- PreferConsistent:大部分情况使用此方式,它将一致地在所有Executor之间分配分区
- PreferFixed:将特定的主题分区放置到特定的主机上,在数据负载不均衡时使用
注意:多数情况下使用PreferConsisten,其他两种方式只是在特定的场景使用。这种配置只是一种参考,具体的情况还是会根据集群的资源自动调整。
consumerStrategy:消费策略,主要有下面三种方式:
- Subscribe:订阅指定主题名称的主题集合
- SubscribePattern:通过正则匹配,订阅相匹配的主题数据
- Assign:订阅一个主题+分区的集合
注意:大多数情况下使用Subscribe方式。
使用案例
object TolerateWCTest { |
Spark Streaming 数据汇(Sinks)
Output Operation介绍
Spark Streaming提供了下面内置的Output Operation,如下:
- print()
打印数据数据到标准输出,如果不传递参数,默认打印前10个元素
- saveAsTextFiles(prefix, [suffix])
将DStream内容存储到文件系统,每个batch interval的文件名称为`prefix-TIME_IN_MS[.suffix]
- saveAsObjectFiles(prefix, [suffix])
将DStream的内容保存为序列化的java对象的SequenceFile,每个batch interval的文件名称为prefix-TIME_IN_MS[.suffix]
,Python API不支持此方法。
- saveAsHadoopFiles(prefix, [suffix])
将DStream内容保存为Hadoop文件,每个batch interval的文件名称为prefix-TIME_IN_MS[.suffix]
,Python API不支持此方法。
- foreachRDD(func)
通用的数据输出算子,func函数将每个RDD的数据输出到外部存储设备,比如将RDD写入到文件或者数据库。
/** |
foreachRDD是一个非常重要的操作,用户可以使用它将处理的数据输出到外部存储设备。关于foreachRDD的使用,需要特点别注意一些细节问题。具体分析如下:
如果将数据写入到MySQL,需要获取连接Connection。用户可能不经意的在Spark Driver中创建一个连接对象,然后在Work中使用它将数据写入外部设备,代码如下:
dstream.foreachRDD { rdd => |
尖叫提示:上面的使用方式是错误的,因为需要将connection对象进行序列化,然后发送到driver节点,而这种connection对象是不能被序列化,所以不能跨节点传输。上面代码会报序列化错误,正确的使用方式是在worker节点创建connection,即在
rdd.foreach
内部创建connection。方式如下:
dstream.foreachRDD { rdd => |
上面的方式解决了不能序列化的问题,但是会为每个RDD的record创建一个connection,通常创建一个connection对象是会存在一定性能开销的,所以频繁创建和销毁connection对象会造成整体的吞吐量降低。一个比较好的做法是将rdd.foreach
替换为``rdd.foreachPartition,这样就不用频繁为每个record创建connection,而是为RDD的partition创建connection,大大减少了创建connection带来的开销。
dstream.foreachRDD { rdd => |
其实上面的使用方式还可以进一步优化,可以通过在多个RDD或者批数据间重用连接对象。用户可以维护一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开销:
dstream.foreachRDD { rdd => |
使用案例
- 模拟数据库连接池
/** |
- 实时统计写入MySQL
object WordCount { |
总结
由于篇幅限制,本文主要对Spark Streaming执行机制、Transformations与Output Operations、Spark Streaming数据源(Sources)、Spark Streaming 数据汇(Sinks)进行了讨论。下一篇将分享基于时间的窗口操作、有状态的计算、检查点Checkpoint、性能调优等内容。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
- 本文链接:https://jiamaoxiang.top/2020/07/27/第四篇-Spark-Streaming编程指南/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享