`
kavy
  • 浏览: 866807 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Structured Streaming + Kafka 集成 + Redis管理Offset(Kafka broker version 0.10.0 or h

阅读更多

Google一下发现 Structured Streaming + Kafka集成,记录Offset的文章挺少的,我自己摸索一下,写了一个DEMO。Github地址

 

1. 准备

配置起始和结束的offset值(默认)

 

Schema信息

读取后的数据的Schema是固定的,包含的列如下:

 

ColumnType说明

keybinary信息的key

valuebinary信息的value(我们自己的数据)

topicstring主题

partitionint分区

offsetlong偏移值

timestamplong时间戳

timestampTypeint类型

example:

 

val df = spark

  .read

  .format("kafka")

  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  .option("subscribe", "topic1,topic2")

  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")

  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")

  .load()

 

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

  .as[(String, String)]

1

2

3

4

5

6

7

8

9

10

11

批处理还是流式处理都必须为Kafka Source设置如下选项:

 

选项值意义

assignjson值{“topicA”:[0,1],“topicB”:[2,4]}指定消费的TopicPartition,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个

subscribe一个以逗号隔开的topic列表订阅的topic列表,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个

subscribePatternJava正则表达式订阅的topic列表的正则式,Kafka Source只能指定"assign",“subscribe”,"subscribePattern"选项中的一个

kafka.bootstrap.servers以逗号隔开的host:port列表Kafka的"bootstrap.servers"配置

startingOffsets 与 endingOffsets 说明:

 

选项值默认值支持的查询类型意义

startingOffsets“earliest”,“lates”(仅streaming支持);或者json 字符"""{“topicA”:{“0”:23,“1”:-1},“TopicB”:{“0”:-2}}"""对于流式处理来说是"latest",对于批处理来说是"earliest"streaming和batch查询开始的位置可以是"earliest"(从最早的位置开始),“latest”(从最新的位置开始),或者通过一个json为每个TopicPartition指定开始的offset。通过Json指定的话,json中-2可以用于表示earliest,-1可以用于表示latest。注意:对于批处理而言,latest值不允许使用的。

endingOffsetslatest or json string{“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}latestbatch一个批查询的结束位置,可以是"latest",即最近的offset,或者通过json来为每个TopicPartition指定一个结束位置,在json中,-1表示latest,而-2是不允许使用的

2. 主要代码

/**

  * StructuredStreaming

  * 记录kafka上一次的Offset,从之前的Offset继续消费

  */

object StructuredStreamingOffset {

 

  val LOGGER: Logger = LogManager.getLogger("StructuredStreamingOffset")

 

  //topic

  val SUBSCRIBE = "log"

  

  case class readLogs(context: String, offset: String)

 

  def main(args: Array[String]): Unit = {

    val spark = SparkSession

      .builder()

      .master("local[*]")

      .appName("StructuredStreamingOffset")

      .getOrCreate()

 

    //开始 offset

    var startOffset = -1

 

    //init

    val redisSingle: RedisSingle = new RedisSingle()

    redisSingle.init(Constants.IP, Constants.PORT)

    //get redis

    if (redisSingle.exists(Constants.REDIDS_KEY) && redisSingle.getTime(Constants.REDIDS_KEY) != -1) {

      startOffset = redisSingle.get(Constants.REDIDS_KEY).toInt

    }

 

    //sink

    val df = spark

      .readStream

      .format("kafka")

      .option("kafka.bootstrap.servers", "localhost:9092")

      .option("subscribe", SUBSCRIBE)

      .option("startingOffsets", "{\"" + SUBSCRIBE + "\":{\"0\":" + startOffset + "}}")

      .load()

 

    import spark.implicits._

 

    //row 包含: key、value 、topic、 partition、offset、timestamp、timestampType

    val lines = df.selectExpr("CAST(value AS STRING)", "CAST(offset AS LONG)").as[(String, Long)]

 

    val content = lines.map(x => readLogs(x._1, x._2.toString))

 

    val count = content.toDF("context", "offset")

 

    //sink foreach 记录offset

    val query = count

      .writeStream

      .foreach(new RedisWriteKafkaOffset)

      .outputMode("update")

      .trigger(Trigger.ProcessingTime("5 seconds"))

      .format("console")

      .start()

 

    query.awaitTermination()

  }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

Spark Structured streaming API支持的输出源有:Console、Memory、File和Foreach, 这里用到Foreach,用Redis存储value

 

class RedisWriteKafkaOffset extends ForeachWriter[Row] {

  var redisSingle: RedisSingle = _

 

  override def open(partitionId: Long, version: Long): Boolean = {

    redisSingle = new RedisSingle()

    redisSingle.init(Constants.IP, Constants.PORT)

    true

  }

  

  override def process(value: Row): Unit = {

    val offset = value.getAs[String]("offset")

    redisSingle.set(Constants.REDIDS_KEY, offset)

  }

 

  override def close(errorOrNull: Throwable): Unit = {

    redisSingle.getJedis().close()

    redisSingle.getPool().close()

  }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

3. 总结

Structured Streaming 是一个基于 Spark SQL 引擎的、可扩展的且支持容错的流处理引擎。你可以像表达静态数据上的批处理计算一样表达流计算。Spark SQL 引擎将随着流式数据的持续到达而持续运行,并不断更新结果。性能上,结构上比Spark Streaming的有一定优势。

 

点赞

————————————————

版权声明:本文为CSDN博主「vitahao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/vitahao/article/details/88994480

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics