阿里云消息队列 Kafka-消息检索实践

一颗小胡椒2022-07-21 11:23:10

场景痛点介绍

Cloud Native

在消息队列的使用过程中,由于其分布式特性难免会遇到消息丢失、消息重传等问题。

  • 例如在日志聚合场景中,通常是多个异构数据源生产数据到 Kafka 中以提供给下游的 Spark 等计算引擎消费。而当某些日志缺失时,由于消息数据的发送方式、数据结构等种类繁杂,导致难以直接从客户端的日志来排查。
  • 再例如消息转发的过程中,消费端可能会重复消费到同样的数据,这就需要根据内容从消息队列中检索数据以判断消息是否重复生产,而消息队列通常只能按照分区和消费位点遍历扫描,并不能灵活的实现消息检索。

业内现有的消息队列产品都没有较好的工具和方式来实现对消息内容的检索,这将使得排查难度和投入成本大大增加。

Kafka 消息检索组件

Cloud Native

检索组件介绍

消息队列 Kafka 「检索组件」是一个全托管、高弹性、交互式的检索组件,具备万亿级消息内容检索的秒级响应能力,旨在解决业内消息产品不支持检索消息内容的难题。消息队列 Kafka 「检索组件」是通过 Kafka Connector 将 Topic 中的消息数据转存到表格存储(Tablestore)中,基于表格存储的多元索引功能提供消息检索能力。能够支持通过消息的分区、位点、发送的时间范围等一个或多个条件组合检索,还支持根据消息 Key、Value 全文检索消息。

案例实践

Cloud Native

案例背景

假设某运维团队需要监控线上集群的运行情况,采集进程级别的日志导入到 Kafka 中,下游使用 Flink 消费,实时计算各进程资源消耗情况。当在 Flink 中发现某个进程的某个时间段的日志数据丢失时,需要使用消息队列 Kafka 「检索组件」,基于消息 Value 和时间范围检索消息数据,判断日志是否已经成功推送到了消息队列 Kafka 中。

例如采集的日志数据为 JSON 结构,某一条日志数据格式为:

key   =  276value =  {"PID":"276","COMMAND":"Google Chrom","CPU_USE":"7.2","TIME":"00:01:44","MEM":"8836K","STATE":"sleeping","UID":"0","IP":"164.29.0.1"}

开通消息检索

1. 首先需要登录到阿里云消息队列 Kafka 控制台中,选择对应的 topic,开通消息检索服务。

  1. 消息检索服务开通后,将自动创建一个 Tablestore 实例,之后将消息数据转存到 Tablestore,并创建索引提供消息检索能力。每一个 topic 对应了 Tablestore 中的一张数据表。可以在消息队列 Kafka 控制台上查看每个 topic 的消息检索组件详情。

消息检索实践

  1. 消息检索服务开通后,就可以使用消息中的多个搜索项检索消息,实现上述案例。例如指定一个时间范围,并且检索消息 Value 中包含 PID = 276 的消息。

  1. 返回结果示例

能力扩展

Cloud Native

表格存储 Tablestore 介绍

表格存储 Tablestore 是基于底层飞天平台构建的结构化数据存储,能够提供千亿级规模数据存储、毫秒级数据检索的服务能力。消息队列 Kafka 转存消息到 Tablestore 后,支持通过 Tablestore 原生的数据访问方式来检索消息,Tablestore 支持更复杂的检索逻辑,同时支持通过 SQL 语法检索消息。下面列举两种消息检索方式:

多元索引搜索

  1. 登录到表格存储 Tablestore 控制台中,进入 Kafka 消息数据转存对应的 Tablestore 实例和数据表中,在索引管理页面选择多元索引搜索消息。

  1. 例如需要检索消息 Value 中包含 PID=276 或者 PID=277 的消息。

  1. 返回结果

SQL 检索消息

  1. 表格存储 Tablestore 支持基于 SQL 语法来检索消息,首先需要在消息转存的数据表上创建一张 SQL 映射表。

  1. 基于 Tablestore SQL 检索 PID=276 的消息。

总结

Cloud Native

阿里云消息队列 Kafka 「检索组件」是消息队列领域率先支持交互式消息内容检索的组件,基于数据转存表格存储 Tablestore 提供消息检索服务能力,支持根据 Key、Value、分区等任意个条件自由组合检索消息,同时支持 Key、Value 全文检索消息,具备免开发、免运维、高弹性的特点。同时也可以直接通过表格存储 Tablestore 索引或者 SQL 来检索消息,极大地提高了日常排查消息存在或正确性的速度。

消息队列kafka
本作品采用《CC 协议》,转载必须注明作者和本文链接
场景痛点介绍Cloud Native在消息队列的使用过程中,由于其分布式特性难免会遇到消息丢失、消息重传等问题。
近年来,随着大数据技术产品的不断发展和多样化,各个应用系统也会依据不同的业务场景选择多个不同的技术组件,数据也随之散落在各个存储平台之中。这种状况给后续数据分析师在不同数据源之上进行数据的即席关联查询和分析带来了新的难题,本文介绍了在数据不移动的前提下进行多源数据即席访问的具体探索与实践。
这是关于如何使用Spring for Apache Kafka在跨多个微服务的MongoDB中管理分布式数据模型。
安全设备在企业安全防护中起到攻击监测告警和攻击拦截作用,是安全日志的直接输出者和防护策略生效者。统一日志平台在Elastic Stack技术栈的基础上,加入Kafka消息队列,实现对安全设备告警日志的统一管理,系统架构如图3所示。日志采集主要由Syslog、Beats、Kafka三部分组成,完成了从安全设备源端将告警日志采集至消息队列的过程。
添加消息的任务我们称为producer,而取出并使用消息的任务,我们称之为consumer。kafka应运而生,它是专门设计用来做消息中间件的系统。这两点也是kafka要解决的核心问题。为此,kafka提出了partition的概念。由于消息不会被删除,因此可以等消费者明确告知kafka这条消息消费成功以后,再去更新游标。对于同一个topic,不同的消费组有各自的游标。
之前,针对以下我们调研目前的开源队列方案:beanstalkdbeanstalkd?消费者,通过 reserve/release/bury/delete 来获取 job 或改变 job 的状态;很幸运的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。但是这对不熟悉 beanstalkd 操作的 go 开发者而言,需要学习成本
分布式流平台Kafka
2022-08-02 10:13:27
无论消息是否被消费,Kafka集群都会持久的保存所有发布的消息,直到过期。Kafka中采用分区的设计主要有两个目的:第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。
如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。
Serverless应用安全浅谈
2022-06-02 14:08:43
我是火线安全的曾垚,今天分享的议题是Serverless应用安全浅谈,我们发现近年来主流的云厂商,或者是像K8S、CNCF生态出现了非常多的Serverless Faas的相关技术,像backend也是非常流行的。 整个Serverless产生或者是容器的产生,都是为了大幅度提高软件的开发效率和降低后续的维护成本。 希望可以通过这次分享,可以让相关Serverless开发者了解在Serverl
Spring框架是一个开放源代码的J2EE应用程序框架,是针对bean的生命周期进行管理的轻量级容器。Spring可以单独应用于构筑应用程序,也可以和Struts、Webwork、Tapestry等众多Web框架组合使用,并且可以与 Swing等桌面应用程序AP组合。 Spring框架主要由七部分组成,分别是 Spring Core、 Spring AOP、 Spring ORM、 Spring
一颗小胡椒
暂无描述