过去一段时间很长的一段时间内都在写用 spark streaming 来做一些规则引擎的工作,工作第一阶段暂时告一段落,这里简单做一下总结。
streaming 不言而喻,也就是实时流式处理。严格来说 spark streaming 并不能算实时流式处理,它的工作原理是一种 micro batch 的方式,也就是说它会将很多 record 放在一起组成一个 batch 然后当成一个批处理作业进行处理。这也是它和 storm, flink 最本质的区别。
micro batch 的好处在于吞吐更大,延迟取决于 batch interval,如果对于实时要求不是特别的高,同时也在使用 Spark 的其他功能,Spark Streaming 往往是一个不错的选择。
DStream,也就是 discretized stream,是 Spark Streaming 提供了一种 high level 的抽象,用来表示数据流,数据一般从 Receiver(比如 Kafka, Flume等) 中获取。在内部,DStream 由一系列的 RDD 组成。RDD 是 Spark 中定义的一种数据模型,全称是 Resilent Distributed Dataset,可以简单理解为一个不可变的分布式数据集合。这些写代码的时候就可以像下面这么写:
DStream.foreachRDD(rdd => {rdd 处理})
大括号里面的 rdd 处理和普通的 Spark 程序处理基本没有区别,主要是通过一系列的 RDD 算子构造一个 DAG。这样其实就是把 Spark Streaming 转化成了一个个 Spark 作业了。
StreamingContext 是 Spark Streaming 程序的入口,我们一般先初始化一个 SparkConf,然后 StreamingContext 初始化的时候使用这个 SparkConf,代码如下。
val conf = new SparkConf().setMaster("local[2]").setAppName("Example App")val ssc = new StreamingContext(conf, batchInterval)// create DStream with ssc// Dstream processssc.start()ssc.awaitTermination()
local[2] 表示 local 模式使用 2 个线程运行 Spark Streaming 程序,注意如果是 local 模式一定要多初始化几个线程,因为 receiver 会独占一个线程,也就是 n > receiver_num。
通过上面初始化的 ssc 就可以构造 DStream 了,比如 Spark 自带的 WordCount 示例代码。
// tcp as receiverval lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)// kafka as receiverval lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)// flume as receiver// Create a flume streamval stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
Spark Streaming 支持的 receiver 有 Kafka, Kinesis, Flume, Tcp socket,已经通过其他算子产生的流。除此之外还支持 custom Receiver,customReceiver 继承类 org.apache.spark.streaming.receiver.Receiver
,然后实现特定的方法即可。示例代码如下。
// Create an input stream with the custom receiver on target ip:port and count the// words in input stream of \n delimited text (eg. generated by 'nc')val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() logInfo("Stopped receiving") restart("Trying to connect again") } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } }}
如果在一个 Spark Streaming 程序里面要处理多个 DStream 怎么办呢?DStream Join
val stream1: DStream[String, String] = ...val stream2: DStream[String, String] = ...val joinedStream = stream1.join(stream2)
Spark RDD 支持的算子基本都可以应用到 DStream 上。