SparkStreaming和Kafka基于Direct Approach如何管理offset

VSole2022-08-03 16:33:42

本文主要介绍,SparkStreaming和Kafka使用Direct Approach方式处理任务时,如何自己管理offset?

SparkStreaming通过Direct Approach接收数据的入口:

KafkaUtils.createDirectStream。在调用该方法时,会先创建

KafkaCluster:val kc = new KafkaCluster(kafkaParams)

KafkaCluster负责和Kafka,该类会获取Kafka的分区信息、创建DirectKafkaInputDStream,每个DirectKafkaInputDStream对应一个topic,每个DirectKafkaInputDStream也会持有一个KafkaCluster实例。

到了计算周期后,会调用DirectKafkaInputDStream的compute方法,执行以下操作:

  1. 获取对应Kafka Partition的untilOffset,以确定需要获取数据的区间
  2. 构建KafkaRDD实例。每个计算周期里,DirectKafkaInputDStream和KafkaRDD是一一对应的
  3. 将相关的offset信息报给InputInfoTracker
  4. 返回该RDD

关于KafkaRDD和Kafka的分区对应关系,可以参考这篇文章

《重要 | Spark分区并行度决定机制》

SparkStreaming和Kafka通过Direct方式集成,自己管理offsets代码实践:

1. 业务逻辑处理

object SparkStreamingKafkaDirect {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(
        s"""
           |Usage: SparkStreamingKafkaDirect <brokers> <topics> <groupid>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupid> is a consume group
           |
        """.stripMargin)
      System.exit(1)
    }
    val Array(brokers, topics, groupId) = args
    val sparkConf = new SparkConf().setAppName("DirectKafka")
    sparkConf.setMaster("local[*]")
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(sparkConf, Seconds(6))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "group.id" -> groupId,
      "auto.offset.reset" -> "smallest"
    )
    val km = new KafkaManager(kafkaParams)
    val streams = km.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
   streams.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        // 先处理消息
        do something...
        // 再更新offsets
        km.updateZKOffsets(rdd)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

2. offset管理核心逻辑

2.1 利用zookeeper

注意:自定义的KafkaManager必须在包org.apache.spark.streaming.kafka下

package org.apache.spark.streaming.kafka
/**
* @Author: 微信公众号-大数据学习与分享
*  Spark-Streaming和Kafka直连方式:自己管理offsets
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
  private val kc = new KafkaCluster(kafkaParams)
  def createDirectStream[
  K: ClassTag,
  V: ClassTag,
  KD <: Decoder[K] : ClassTag,
  VD <: Decoder[V] : ClassTag](ssc: StreamingContext,
                               kafkaParams: Map[String, String],
                               topics: Set[String]): InputDStream[(K, V)] = {
    val groupId = kafkaParams.get("group.id").get
    //从zookeeper上读取offset前先根据实际情况更新offset
    setOrUpdateOffsets(topics, groupId)
    //从zookeeper上读取offset开始消费message
    val messages = {
      //获取分区      //Either处理异常的类,通常Left表示异常,Right表示正常
      val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics)
      if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed:${consumerOffsetsE.left.get}")
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
  }
  /** 创建数据流之前,根据实际情况更新消费offsets */
  def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach { topic =>
      var hasConsumed = true
      //获取每一个topic分区
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}")
      //正常获取分区结果
      val partitions = partitionsE.right.get
      //获取消费偏移量
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      if (hasConsumed) {
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
        val earliestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earliestLeaderOffsetsE.right.get
        val consumerOffsets: Map[TopicAndPartition, Long] = consumerOffsetsE.right.get
        var offsets: mutable.HashMap[TopicAndPartition, Long] = mutable.HashMap[TopicAndPartition, Long]()
        consumerOffsets.foreach { case (tp, n) =>
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          //offsets += (tp -> n)
          if (n < earliestLeaderOffset) {
            println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + "offsets已过时,更新为:" + earliestLeaderOffset)
            offsets += (tp -> earliestLeaderOffset)
          }
          println(n, earliestLeaderOffset, kc.getLatestLeaderOffsets(partitions).right)
        }
        println("map...." + offsets)
        if (offsets.nonEmpty) kc.setConsumerOffsets(groupId, offsets.toMap)
        //        val cs = consumerOffsetsE.right.get
        //        val lastest = kc.getLatestLeaderOffsets(partitions).right.get
        //        val earliest = kc.getEarliestLeaderOffsets(partitions).right.get
        //        var newCS: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
        //        cs.foreach { f =>
        //          val max = lastest.get(f._1).get.offset
        //          val min = earliest.get(f._1).get.offset
        //          newCS += (f._1 -> f._2)
        //          //如果zookeeper中记录的offset在kafka中不存在(已过期)就指定其现有kafka的最小offset位置开始消费
        //          if (f._2 < min) {
        //            newCS += (f._1 -> min)
        //          }
        //          println(max + "-----" + f._2 + "--------" + min)
        //        }
        //        if (newCS.nonEmpty) kc.setConsumerOffsets(groupId, newCS)
      } else {
        println("没有消费过....")
        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        val leaderOffsets: Map[TopicAndPartition, LeaderOffset] = if (reset == Some("smallest")) {
          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsetsE.right.get
        } else {
          //largest
          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsetsE.right.get
        }
        val offsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) }
        kc.setConsumerOffsets(groupId, offsets)
        /*
        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
    val result = for {
      topicPartitions <- kc.getPartitions(topics).right
      leaderOffsets <- (if (reset == Some("smallest")) {
        kc.getEarliestLeaderOffsets(topicPartitions)
      } else {
        kc.getLatestLeaderOffsets(topicPartitions)
      }).right
    } yield {
      leaderOffsets.map { case (tp, lo) =>
          (tp, lo.offset)
      }
    }
        */
      }
    }
  }
  /** 更新zookeeper上的消费offsets */
  def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
    val groupId = kafkaParams("group.id")
    val offsetList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetList.foreach { offset =>
      val topicAndPartition = TopicAndPartition(offset.topic, offset.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offset.untilOffset)))
      if (o.isLeft) println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
    }
  }
}

2.2 不利用zookeeper

/**
  * @author 大数据学习与分享
  * Spark Streaming和Kafka082通过mysql维护offset
  */
object SaveOffset2Mysql {
  def getLastOffsets(database: String, sql: String, jdbcOptions:Map[String,String]): HashMap[TopicAndPartition, Long] = {
    val getConnection: () => Connection = JdbcUtils.createConnectionFactory(new JDBCOptions(jdbcOptions))
    val conn = getConnection()
    val pst = conn.prepareStatement(sql)
    val res = pst.executeQuery()
var map: HashMap[TopicAndPartition, Long] = HashMap()
while (res.next()) {
      val o = res.getString(1)
      val jSONArray = JSONArray.fromObject(o)
      jSONArray.toArray.foreach { offset =>
        val json = JSONObject.fromObject(offset)
        val topicAndPartition = TopicAndPartition(json.getString("topic"), json.getInt("partition"))
        map += topicAndPartition -> json.getLong("untilOffset")
      }
    }
    pst.close()
    conn.close()
    map
  }
  def offsetRanges2Json(offsetRanges: Array[OffsetRange]): JSONArray = {
    val jSONArray = new JSONArray
    offsetRanges.foreach { offsetRange =>
      val jSONObject = new JSONObject()
      jSONObject.accumulate("topic", offsetRange.topic)
      jSONObject.accumulate("partition", offsetRange.partition)
      jSONObject.accumulate("fromOffset", offsetRange.fromOffset)
      jSONObject.accumulate("untilOffset", offsetRange.untilOffset)
      jSONArray.add(jSONObject)
    }
    jSONArray
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaParams = Map("metadata.broker.list" -> SystemProperties.BROKERS,
"zookeeper.connect" -> SystemProperties.ZK_SERVERS,
"zookeeper.connection.timeout.ms" -> "10000")
    val topics = Set("pv")
    val tpMap = getLastOffsets("test", "select offset from res where id = (select max(id) from res)")
var messages: InputDStream[(String, String)] = null
if (tpMap.nonEmpty) {
      messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
        ssc, kafkaParams, tpMap, (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))
    } else {
      kafkaParams + ("auto.offset.reset" -> "largest")
      messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }
//    var oRanges = Array[OffsetRange]()
//    messages.transform { rdd =>
//      oRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//      rdd
//    }.foreachRDD { rdd =>
//      val offset = offsetRanges2Json(oRanges).toString
//    }
    messages.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.map(_._2).flatMap(_.split(" ")).map((_, 1L)).reduceByKey(_ + _).repartition(1)
       .foreachPartition { par =>
if (par.nonEmpty) {
           val conn = MysqlUtil.getConnection("test")
           conn.setAutoCommit(false)
           val pst = conn.prepareStatement("INSERT INTO res (word,count,offset,time) VALUES (?,?,?,?)")
           par.foreach { case (word, count) =>
             pst.setString(1, word)
             pst.setLong(2, count)
             pst.setString(3, offset)
             pst.setTimestamp(4, new Timestamp(System.currentTimeMillis()))
             pst.addBatch()
           }
           pst.executeBatch()
           conn.commit()
           pst.close()
           conn.close()
         }
       }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}
// Spark Streaming和Kafka010整合维护offset
val kafkaParams = Map[String, Object]("bootstrap.servers" -> SystemProperties.BROKERS,
"key.deserializer" -> classOf[StringDeserializer],
"key.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val messages = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(topicSet, kafkaParams, getLastOffsets(kafkaParams, topicSet)))
    messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        iter.foreach { each =>
          s"Do Something with $each"
        }
      }
      messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }
   
 def getLastOffsets(kafkaParams: Map[String, Object], topicSet: Set[String]): Map[TopicPartition, Long] = {
val props = new Properties()
    props.putAll(kafkaParams.asJava)
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(topicSet.asJavaCollection)
    paranoidPoll(consumer)
val consumerAssign = consumer.assignment().asScala.map(tp => tp -> consumer.position(tp)).toMap
    consumer.close()
    consumerAssign
  }
/** 思考: 消息已消费但提交offsets失败时的offsets矫正? */
  def paranoidPoll(consumer: KafkaConsumer[String, String]): Unit = {
    val msg = consumer.poll(Duration.ZERO)
if (!msg.isEmpty) {
// position should be minimum offset per topic partition
//      val x: ((Map[TopicPartition, Long], ConsumerRecord[String, String]) => Map[TopicPartition, Long]) => Map[TopicPartition, Long] = msg.asScala.foldLeft(Map[TopicPartition, Long]())
      msg.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic(), m.partition())
val off = acc.get(tp).map(o => Math.min(o, m.offset())).getOrElse(m.offset())
        acc + (tp -> off)
      }.foreach { case (tp, off) =>
        consumer.seek(tp, off)
      }
    }
  }

上述给出一个demo思路。实际生产中,还要结合具体的业务场景,根据不同情况做特殊处理。

offsetkafka
本作品采用《CC 协议》,转载必须注明作者和本文链接
本文主要介绍,SparkStreaming和Kafka使用Direct Approach方式处理任务时,如何自己管理offset?在调用该方法时,会先创建KafkaCluster:val kc = new KafkaCluster. | is a list of one or more kafka topics to consume from
在企业实时处理架构中,通常将spark streaming和kafka集成作为整个大数据处理架构的核心环节之一。
分布式流平台Kafka
2022-08-02 10:13:27
无论消息是否被消费,Kafka集群都会持久的保存所有发布的消息,直到过期。Kafka中采用分区的设计主要有两个目的:第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。
如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。
Kafka消息积压的典型场景:1.实时/消费任务挂掉比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。
随机读写会导致寻址时间延长,从而影响磁盘的读写速度。而Kafka在将数据持久化到磁盘时,采用只追加的顺序写,有效降低了寻址时间,提高效率。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。对应到Kafka生产和消费消息中:producer把消息发到broker后,数据并不是直接落入磁盘的,而是先进入PageCache。
假设Mysql中canal_test库下有一张表policy_cred,需要统计实时统计policy_status状态为1的mor_rate的的变化趋势,并标注比率的风险预警等级。?本次安装的canal版本为1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ选用Kafka。服务器系统为Centos
添加消息的任务我们称为producer,而取出并使用消息的任务,我们称之为consumer。kafka应运而生,它是专门设计用来做消息中间件的系统。这两点也是kafka要解决的核心问题。为此,kafka提出了partition的概念。由于消息不会被删除,因此可以等消费者明确告知kafka这条消息消费成功以后,再去更新游标。对于同一个topic,不同的消费组有各自的游标。
之前,针对以下我们调研目前的开源队列方案:beanstalkdbeanstalkd?消费者,通过 reserve/release/bury/delete 来获取 job 或改变 job 的状态;很幸运的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。但是这对不熟悉 beanstalkd 操作的 go 开发者而言,需要学习成本
目前业界常见的延时消息方案
VSole
网络安全专家