解析SparkStreaming和Kafka集成的两种方式

VSole2022-08-04 16:32:08

spark streaming是基于微批处理的流式计算引擎,通常是利用spark core或者spark core与spark sql一起来处理数据。在企业实时处理架构中,通常将spark streaming和kafka集成作为整个大数据处理架构的核心环节之一。

针对不同的spark、kafka版本,集成处理数据的方式分为两种:Receiver based Approach和Direct Approach,不同集成版本处理方式的支持,可参考下图:

Receiver based Approach

基于receiver的方式是使用kafka消费者高阶API实现的。

对于所有的receiver,它通过kafka接收的数据会被存储于spark的executors上,底层是写入BlockManager中,默认200ms生成一个block(通过配置参数spark.streaming.blockInterval决定)。然后由spark streaming提交的job构建BlockRdd,最终以spark core任务的形式运行。

关于receiver方式,有以下几点需要注意:

  1. receiver作为一个常驻线程调度到executor上运行,占用一个cpu
  2. receiver个数由KafkaUtils.createStream调用次数决定,一次一个receiver
  3. kafka中的topic分区并不能关联产生在spark streaming中的rdd分区
  4. 增加在KafkaUtils.createStream()中的指定的topic分区数,仅仅增加了单个receiver消费的topic的线程数,它不会增加处理数据中的并行的spark的数量
  5. 【topicMap[topic,num_threads]map的value对应的数值是每个topic对应的消费线程数】
  6. receiver默认200ms生成一个block,建议根据数据量大小调整block生成周期
  7. receiver接收的数据会放入到BlockManager,每个executor都会有一个BlockManager实例,由于数据本地性,那些存在receiver的executor会被调度执行更多的task,就会导致某些executor比较空闲
  8. 建议通过参数spark.locality.wait调整数据本地性。该参数设置的不合理,比如设置为10而任务2s就处理结束,就会导致越来越多的任务调度到数据存在的executor上执行,导致任务执行缓慢甚至失败(要和数据倾斜区分开
  9. 多个kafka输入的DStreams可以使用不同的groups、topics创建,使用多个receivers接收处理数据
  10. 两种receiver
  11. 可靠的receiver:可靠的receiver在接收到数据并通过复制机制存储在spark中时准确的向可靠的数据源发送ack确认
  12. 不可靠的receiver:不可靠的receiver不会向数据源发送数据已接收确认。这适用于用于不支持ack的数据源
  13. 当然,我们也可以自定义receiver。
  14. receiver处理数据可靠性默认情况下,receiver是可能丢失数据的
  15. 可以通过设置spark.streaming.receiver.writeAheadLog.enable为true开启预写日志机制,将数据先写入一个可靠地分布式文件系统如hdfs,确保数据不丢失,但会失去一定性能
  16. 限制消费者消费的最大速率
  17. 涉及三个参数:
  18. spark.streaming.backpressure.enabled:默认是false,设置为true,就开启了背压机制
  19. spark.streaming.backpressure.initialRate:默认没设置初始消费速率,第一次启动时每个receiver接收数据的最大值
  20. spark.streaming.receiver.maxRate:默认值没设置,每个receiver接收数据的最大速率(每秒记录数)。每个流每秒最多将消费此数量的记录,将此配置设置为0或负数将不会对最大速率进行限制
  21. 在产生job时,会将当前job有效范围内的所有block组成一个BlockRDD,一个block对应一个分区
  22. kafka082版本消费者高阶API中,有分组的概念,建议使消费者组内的线程数(消费者个数)和kafka分区数保持一致。如果多于分区数,会有部分消费者处于空闲状态

Direct Approach

direct approach是spark streaming不使用receiver集成kafka的方式,一般在企业生产环境中使用较多。相较于receiver,有以下特点:

  1. 不使用receiver
  2. 不需要创建多个kafka streams并聚合它们
  3. 减少不必要的CPU占用
  4. 减少了receiver接收数据写入BlockManager,然后运行时再通过blockId、网络传输、磁盘读取等来获取数据的整个过程,提升了效率
  5. 无需wal,进一步减少磁盘IO操作
  6. direct方式生的rdd是KafkaRDD,它的分区数与kafka分区数保持一致一样多的rdd分区来消费,更方便我们对并行度进行控制
  7. 注意:在shuffle或者repartition操作后生成的rdd,这种对应关系会失效
  8. 可以手动维护offset,实现exactly once语义
  9. 数据本地性问题。在KafkaRDD在compute函数中,使用SimpleConsumer根据指定的topic、分区、offset去读取kafka数据。
  10. 但在010版本后,又存在假如kafka和spark处于同一集群存在数据本地性的问题
  11. 限制消费者消费的最大速率
  12. spark.streaming.kafka.maxRatePerPartition:从每个kafka分区读取数据的最大速率(每秒记录数)。这是针对每个分区进行限速,需要事先知道kafka分区数,来评估系统的吞吐量
kafkaspark
本作品采用《CC 协议》,转载必须注明作者和本文链接
在企业实时处理架构中,通常将spark streaming和kafka集成作为整个大数据处理架构的核心环节之一。
假设Mysql中canal_test库下有一张表policy_cred,需要统计实时统计policy_status状态为1的mor_rate的的变化趋势,并标注比率的风险预警等级。?本次安装的canal版本为1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ选用Kafka。服务器系统为Centos
本文主要介绍,SparkStreaming和Kafka使用Direct Approach方式处理任务时,如何自己管理offset?在调用该方法时,会先创建KafkaCluster:val kc = new KafkaCluster. | is a list of one or more kafka topics to consume from
Log4Shell目前依然是一个广泛且严重的安全威胁。4月26日,安全公司Rezilion发布了一份最新研究报告,显示截至4月底,仍然有40%的易受攻击的Log4j版本被运用。Rezilion 认为导致目前较差的更新状况原因较为复杂,但包括了缺乏适当的漏洞管理流程,以及漏洞的可见性差等因素。因此,专家建议用户扫描并确认系统环境,找到正在使用的版本,如果是过时或易受攻击的Log4j版本,务必尽快制定升级计划。
前言Scala是以JVM为运行环境的面向对象的函数式编程语言,它可以直接访问Java类库并且与Java框架进行交互操作。正如之前所介绍,Spark是用Scala语言编写的,Kafka server端也是,那么深入学习Scala对掌握SparkKafka是必备掌握技能。
Kafka消息积压的典型场景:1.实时/消费任务挂掉比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。
场景痛点介绍Cloud Native在消息队列的使用过程中,由于其分布式特性难免会遇到消息丢失、消息重传等问题。
此外,PyDeequ 可以与 Pandas DataFrames 进行流畅的接口,而不是在 Apache Spark DataFrames 内进行限制。Deequ 负责导出要对数据进行计算的所需指标集。Deequ 生成数据质量报告,其中包含约束验证的结果。包装器将命令转换为底层 Deequ 调用并返回它们的响应。
近日,绿盟科技联合广州大学网络空间先进技术研究院发布2021年《APT组织情报研究年鉴》(下文简称“年鉴”)。 年鉴借助网络空间威胁建模知识图谱和大数据复合语义追踪技术,对全球372个APT组织知识进行了知识图谱归因建档,形成APT组织档案馆,并对APT组织活动进行大数据追踪,从而对本年度新增和活跃的攻击组织的攻击活动态势进行分析。目前年鉴所涉及的相关情报和技术已经应用在绿盟科技的威胁情报平台(
VSole
网络安全专家