.Net之延迟队列

一颗小胡椒2022-07-27 17:54:49

介绍

具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

使用场景

延迟队列在项目中的应用还是比较多的,尤其像电商类平台:

  1. 订单成功后,在30分钟内没有支付,自动取消订单
  2. 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
  3. 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
  4. 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
该介绍来自其他文章

方案

下面的例子没有进行封装,所以代码仅供参考

Redis过期事件

注意:
不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大
不保证一定送达
发送即忘策略,不包含持久化
但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

配置

需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

-- 取消注释
notify-keyspace-events Ex
-- 注释
#notify-keyspace-events ""

然后重新启动服务器,比如windows

 .\redis-server.exe  .\redis.windows.conf

或者linux中使用docker-compose重新部署redis

  redis:
    container_name: redis
    image: redis
    hostname: redis
    restart: always
    ports: 
      - "6379:6379"
    volumes: 
      - $PWD/redis/redis.conf:/etc/redis.conf
      - /root/common-docker-compose/redis/data:/data
    command: 
      /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件

然后使用客户端订阅事件

-- windows
.\redis-cli
 
-- linux
docker exec -it 容器标识 redis-cli
 
psubscribe __keyevent@0__:expired

控制台订阅

使用StackExchange.Redis组件订阅过期事件

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");
var subscriber = connectionMultiplexer.GetSubscriber();
//订阅库0的过期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{
    Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
Console.ReadLine();

输出结果:

key过期 channel:keyevent@0:expired key:orderno:123456

如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

WebApi中订阅

创建RedisListenService继承自:BackgroundService

public class RedisListenService : BackgroundService
{
    private readonly ISubscriber _subscriber;
    public RedisListenService(IServiceScopeFactory serviceScopeFactory)
    {
        using var scope = serviceScopeFactory.CreateScope();
        var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
        var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
        var db = connectionMultiplexer.GetDatabase(0);
        _subscriber = connectionMultiplexer.GetSubscriber();
    }
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        //订阅库0的过期通知事件
        _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
        {
            Console.WriteLine($"key过期 channel:{channel} key:{key}");
        });
        return Task.CompletedTask;
    }
}

注册该后台服务

services.AddHostedService<RedisListenService>();

启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

RabbitMq延迟队列

版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

进入容器

docker exec -it 容器名称/标识 bash

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看是否启用

rabbitmq-plugins list

[E*]和[e*]表示启用,然后重启服务

rabbitmq-server restart

然后在管理界面添加交换机都可以看到

生产消息

发送的消息类型是:x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",//IP地址
        Port = 5672,//端口号
        UserName = "admin",//用户账号
        Password = "123456",//用户密码
        VirtualHost = "customer"
    };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();
    var exchangeName = "delay-exchange";
    var routingkey = "delay.delay";
    var queueName = "delay_queueName";
    //设置Exchange队列类型
    var argMaps = new Dictionary<string, object>()
    {
        {"x-delayed-type", "topic"}
    };
    //设置当前消息为延时队列
    channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
    channel.QueueDeclare(queueName, true, false, false, argMaps);
    channel.QueueBind(queueName, exchangeName, routingkey);
    var time = 1000 * 5;
    var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
    var body = Encoding.UTF8.GetBytes(message);
    var props = channel.CreateBasicProperties();
    //设置消息的过期时间
    props.Headers = new Dictionary<string, object>()
            {
                {  "x-delay", time }
            };
    channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
    Console.WriteLine("成功发送消息:" + message);
    return "success";
}

消费消息

消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

public class RabbitmqDelayedHostService : BackgroundService
{
    private readonly IModel _channel;
    private readonly IConnection _connection;
    public RabbitmqDelayedHostService()
    {
        var connFactory = new ConnectionFactory//创建连接工厂对象
        {
            HostName = "localhost",//IP地址
            Port = 5672,//端口号
            UserName = "admin",//用户账号
            Password = "123456",//用户密码
            VirtualHost = "customer"
        };
        _connection = connFactory.CreateConnection();
        _channel = _connection.CreateModel();
        //交换机名称
        var exchangeName = "exchangeDelayed";
        var queueName = "delay_queueName";
        var routingkey = "delay.delay";
        var argMaps = new Dictionary<string, object>()
        {
            {"x-delayed-type", "topic"}
        };
        _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
        _channel.QueueDeclare(queueName, true, false, false, argMaps);
        _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
        //声明为手动确认
        _channel.BasicQos(0, 1, false);
    }
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var queueName = "delay_queueName";
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var routingKey = ea.RoutingKey;
            Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
            //手动确认
            _channel.BasicAck(ea.DeliveryTag, true);
        };
        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        return Task.CompletedTask;
    }
    public override void Dispose()
    {
        _connection.Dispose();
        _channel.Dispose();
        base.Dispose();
    }
}

注册该后台任务

services.AddHostedService<RabbitmqDelayedHostService>();

输出结果

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

其他方案

  • Hangfire延迟队列
BackgroundJob.Schedule(
  () => Console.WriteLine("Delayed!"),
   TimeSpan.FromDays(7));
  • 时间轮
  • Redisson DelayQueue
  • 计时管理器


redis消息队列
本作品采用《CC 协议》,转载必须注明作者和本文链接
添加消息的任务我们称为producer,而取出并使用消息的任务,我们称之为consumer。kafka应运而生,它是专门设计用来做消息中间件的系统。这两点也是kafka要解决的核心问题。为此,kafka提出了partition的概念。由于消息不会被删除,因此可以等消费者明确告知kafka这条消息消费成功以后,再去更新游标。对于同一个topic,不同的消费组有各自的游标。
之前,针对以下我们调研目前的开源队列方案:beanstalkdbeanstalkd?消费者,通过 reserve/release/bury/delete 来获取 job 或改变 job 的状态;很幸运的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。但是这对不熟悉 beanstalkd 操作的 go 开发者而言,需要学习成本
代表的a的二进制位的修改。对应的ASCII码是97,转换为二进制数据是01100001. 因为bit非常节省空间,可以用来做大数据量的统计。BITOPNOTdestkeykey ,对给定 key 求逻辑非,并将结果保存到 destkey 。获取今天点击最多的15条:zrevrange hotNews:20190926 0 15 withscores
前言日前拜读阿牛老师的大作《领导:谁再用定时任务实现关闭订单,立马滚蛋!》发现其方案有若干瑕疵,特此抛砖引玉讨论一二。在使用 Redisson DelayQueue 等定时任务中间件时可以同时使用扫描数据库的方法作为补偿机制,避免中间件故障造成任务丢失。
时间在2020年,当时使用xray,发现它的的反射型xss扫描很好用,于是想知道原理,好奇探索了下大概的xss扫描规则。当时自己的机器都是2H1G的小机器,想提高效率,于是学习用分布式,但是又由此带来了很多第三方的数据库,队列什么的,更加压迫了我机器的性能..做了这么多,成果也很喜人,各大src,微软都有,运气好也获得了微软1000多刀的赏金。
Spring框架是一个开放源代码的J2EE应用程序框架,是针对bean的生命周期进行管理的轻量级容器。Spring可以单独应用于构筑应用程序,也可以和Struts、Webwork、Tapestry等众多Web框架组合使用,并且可以与 Swing等桌面应用程序AP组合。 Spring框架主要由七部分组成,分别是 Spring Core、 Spring AOP、 Spring ORM、 Spring
痛苦的纯文本日志管理日子一去不复返了。虽然纯文本数据在某些情况下仍然很有用,但是在进行扩展分析以收集有洞察力的基础设施数据并改进代码质量时,寻找一个可靠的日志管理解决方案是值得的,该解决方案可以增强业务工作流的能力。 日志不是一件容易处理的事情,但无论如何都是任何生产系统的一个重要方面。当您面临一个困难的问题时,使用日志管理解决方案要比在遍布系统环境的无休止的文本文件循环中穿梭容易得多。
一文读懂HW护网行动
2022-07-26 12:00:00
随着《网络安全法》和《等级保护制度条例2.0》的颁布,国内企业的网络安全建设需与时俱进,要更加注重业务场景的安全性并合理部署网络安全硬件产品,严防死守“网络安全”底线。“HW行动”大幕开启,国联易安誓为政府、企事业单位网络安全护航!
Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。使用Kibana,可以通过各种图表进行高级数据分析及展示。
钓鱼常用手法总结
2022-03-24 13:48:29
雷神众测拥有对此文章的修改和解释权。如欲转载或传播此文章,必须保证此文章的完整性,包括版权声明等全部内容。未经雷神众测允许,不得任意修改或者增减此文章内容,不得以任何方式将其用于商业目的。
一颗小胡椒
暂无描述