Google一下发现 Structured Streaming + Kafka集成,记录Offset的文章挺少的,我自己摸索一下,写了一个DEMO。Github地址
1. 准备
配置起始和结束的offset值(默认)
Schema信息
读取后的数据的Schema是固定的,包含的列如下:
ColumnType说明
keybinary信息的key
valuebinary信息的value(我们自己的数据)
topicstring主题
partitionint分区
offsetlong偏移值
timestamplong时间戳
timestampTypeint类型
example:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
1
2
3
4
5
6
7
8
9
10
11
批处理还是流式处理都必须为Kafka Source设置如下选项:
选项值意义
assignjson值{“topicA”:[0,1],“topicB”:[2,4]}指定消费的TopicPartition,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个
subscribe一个以逗号隔开的topic列表订阅的topic列表,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个
subscribePatternJava正则表达式订阅的topic列表的正则式,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个
kafka.bootstrap.servers以逗号隔开的host:port列表Kafka的"bootstrap.servers"配置
startingOffsets 与 endingOffsets 说明:
选项值默认值支持的查询类型意义
startingOffsets“earliest”,“lates”(仅streaming支持);或者json 字符"""{“topicA”:{“0”:23,“1”:-1},“TopicB”:{“0”:-2}}"""对于流式处理来说是"latest",对于批处理来说是"earliest"streaming和batch查询开始的位置可以是"earliest"(从最早的位置开始),“latest”(从最新的位置开始),或者通过一个json为每个TopicPartition指定开始的offset。通过Json指定的话,json中-2可以用于表示earliest,-1可以用于表示latest。注意:对于批处理而言,latest值不允许使用的。
endingOffsetslatest or json string{“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}latestbatch一个批查询的结束位置,可以是"latest",即最近的offset,或者通过json来为每个TopicPartition指定一个结束位置,在json中,-1表示latest,而-2是不允许使用的
2. 主要代码
/**
* StructuredStreaming
* 记录kafka上一次的Offset,从之前的Offset继续消费
*/
object StructuredStreamingOffset {
val LOGGER: Logger = LogManager.getLogger("StructuredStreamingOffset")
//topic
val SUBSCRIBE = "log"
case class readLogs(context: String, offset: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("StructuredStreamingOffset")
.getOrCreate()
//开始 offset
var startOffset = -1
//init
val redisSingle: RedisSingle = new RedisSingle()
redisSingle.init(Constants.IP, Constants.PORT)
//get redis
if (redisSingle.exists(Constants.REDIDS_KEY) && redisSingle.getTime(Constants.REDIDS_KEY) != -1) {
startOffset = redisSingle.get(Constants.REDIDS_KEY).toInt
}
//sink
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", SUBSCRIBE)
.option("startingOffsets", "{\"" + SUBSCRIBE + "\":{\"0\":" + startOffset + "}}")
.load()
import spark.implicits._
//row 包含: key、value 、topic、 partition、offset、timestamp、timestampType
val lines = df.selectExpr("CAST(value AS STRING)", "CAST(offset AS LONG)").as[(String, Long)]
val content = lines.map(x => readLogs(x._1, x._2.toString))
val count = content.toDF("context", "offset")
//sink foreach 记录offset
val query = count
.writeStream
.foreach(new RedisWriteKafkaOffset)
.outputMode("update")
.trigger(Trigger.ProcessingTime("5 seconds"))
.format("console")
.start()
query.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
Spark Structured streaming API支持的输出源有:Console、Memory、File和Foreach, 这里用到Foreach,用Redis存储value
class RedisWriteKafkaOffset extends ForeachWriter[Row] {
var redisSingle: RedisSingle = _
override def open(partitionId: Long, version: Long): Boolean = {
redisSingle = new RedisSingle()
redisSingle.init(Constants.IP, Constants.PORT)
true
}
override def process(value: Row): Unit = {
val offset = value.getAs[String]("offset")
redisSingle.set(Constants.REDIDS_KEY, offset)
}
override def close(errorOrNull: Throwable): Unit = {
redisSingle.getJedis().close()
redisSingle.getPool().close()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
3. 总结
Structured Streaming 是一个基于 Spark SQL 引擎的、可扩展的且支持容错的流处理引擎。你可以像表达静态数据上的批处理计算一样表达流计算。Spark SQL 引擎将随着流式数据的持续到达而持续运行,并不断更新结果。性能上,结构上比Spark Streaming的有一定优势。
点赞
————————————————
版权声明:本文为CSDN博主「vitahao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/vitahao/article/details/88994480
相关推荐
The Delta Architecture Delta Lake + Apache Spark Structured Streaming
structured streaming 简介 1. Spark Streaming 不足 2. Structured Streaming 介绍 3. Structured Streaming 核心设计 Structured Streaming 编程模型 ...
3,综合运用HttpClient+Jsoup+Kafka+SparkStreaming+StructuredStreaming+SpringBoot+Echarts等多种实用技术 适用人群 1、对大数据感兴趣的在校生及应届毕业生。 2、对目前职业有进一步提升要求,希望从事大数据...
Structured Streaming was the 2nd (and the latest) major streaming effort in Spark. Its design decouples the frontend (user-facing APIs) and backend (execution), and allows us to change the execution ...
A Deep Dive into Stateful Stream Processing in Structured Streaming A Deep Dive into Stateful Stream Processing in Structured Streaming
机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识...
数据工程师的纠结与运维的凌乱 • Delta Lake基本原理 • Delta 架构 • Delta 架构的特性 • Delta 架构的经典案例 & Demo • Delta Lake 社区
藏经阁-From Spark Streaming to Structured Streaming.pdf
藏经阁-Building Structured Streaming.pdf
藏经阁-Structured Streaming for Colum.pdf
spark2018欧洲峰会中关于StructuredStreaming中stateful stream processing的ppt
藏经阁-Building Structured Streaming Connector for Continous Applic
藏经阁-Online Learning with Structured Streaming.pdf
28:Spark2.3.x StructuredStreaming实时计算
深入Apache Spark流计算引擎:Structured Streaming
高级Java人才培训专家-3-Structured Streaming.doc
29:Spark2.3.x StructuredStreaming项目实时分析
深入Apache Spark流计算引擎:Structured Streaming.pdf
Spark Structured Streaming 一、概述 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 简单来说Spark Structured Streaming提供了流数据的快速、可靠、容错、端对端的精确一次...
Easy, Scalable, Fault-tolerant stream processing with Structured Streaming-TD