当前位置: 首页 > news >正文

RocketMQ - 常见问题

RocketMQ常见问题

文章目录

  • RocketMQ常见问题
    • 一:消息幂等问题
      • 1:什么是消费幂等
      • 2:消息重复的场景分析
        • 2.1:发送时消息重复
        • 2.2:消费时消息重复
        • 2.3:Rebalance时消息重复
      • 3:通用解决方案
        • 3.1:两要素(幂等令牌 & 唯一性处理)
        • 3.2:通用解决方案
        • 3.3:解决方案举例
        • 3.4:消息幂等的实现
    • 二:消息堆积问题
      • 1:产生原因分析
        • 1.1:消息拉取
        • 1.2:消息消费
      • 2:消费耗时
      • 3:消费并发度
      • 4:单机线程的计算
      • 5:如何避免消息堆积和消费延迟
        • 5.1:梳理消息的消费耗时
        • 5.2:设置消费并发度
    • 三:消息清理问题
    • 四:消息过滤问题
      • 1:Tag方式过滤-简单
      • 2:SQL过滤-功能强大
      • 3:代码举例
        • 3.1:Tag方式进行过滤例子
        • 3.2:SQL方式进行过滤例子
    • 五:消息重试问题
      • 1:消息发送的重试机制
        • 1.1:同步发送失败策略
        • 1.2:异步发送失败策略
        • 1.3:消息刷盘失败策略
      • 2:消费重试
        • 2.1:顺序消息的消费重试
        • 2.2:无序消息的消费重试
        • 2.3:消息重试次数和间隔
        • 2.4:重试队列
        • 2.5:消息重试/不重试配置方式
          • 2.5.1:重试配置
          • 2.5.2:不重试配置
        • 2.6:死信队列
          • 2.6.1:死信队列特征
          • 2.6.2:查询死信消息
          • 2.6.3:死信队列处理
    • 六:如何保证消息不丢失
      • 1:Producer如何保证消息不丢失
      • 2:事务消息保证消息不丢失
      • 3:broker保证消息不丢失
      • 4:Broker主从同步如何保证不丢失
      • 5:Consumer如何保证消费不丢失
      • 6:如果MQ服务全挂了,如何保证不丢失

一:消息幂等问题

1:什么是消费幂等

重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。

幂等:若某操作执行多次与执行一次对系统产生的影响是相同的,则称该操作是幂等的。

在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。

如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。

2:消息重复的场景分析

什么情况下可能会出现消息被重复消费呢?最常见的有以下三种情况:

2.1:发送时消息重复

当一条消息已被成功发送到Broker并完成持久化,此时出现了网络闪断,从而导致Broker对Producer应答失败。

如果此时Producer意识到消息发送失败并尝试再次发送消息,此时Broker中就可能会出现两条内容相同并且Message ID也相同的消息,那么后续Consumer就一定会消费两次该消息。

2.2:消费时消息重复

消息已投递到Consumer并完成业务处理,当Consumer给Broker反馈应答时网络闪断,Broker没有接收到消费成功响应。

为了保证消息至少被消费一次的原则,Broker将在网络恢复后再次尝试投递之前已被处理过的消息。

此时消费者就会收到与之前处理过的内容相同、Message ID也相同的消息。

2.3:Rebalance时消息重复

当Consumer Group中的Consumer数量发生变化时,或其订阅的Topic的Queue数量发生变化时,会触发Rebalance

此时Consumer可能会收到曾经被消费过的消息。

3:通用解决方案

重复消费没有办法剔除,但是要保证重复的消费幂等 <- 所以解决方案都是围绕保证幂等性的

3.1:两要素(幂等令牌 & 唯一性处理)

幂等解决方案的设计中涉及到两项要素:幂等令牌 & 唯一性处理。只要充分利用好这两要素,就可以设计出好的幂等解决方案。

  • 幂等令牌:是生产者和消费者两者中的既定协议,通常指具备唯一业务标识的字符串。
    • 例如,订单号、流水号。一般由Producer随着消息一同发送来的。
  • 唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行成功多次。
    • 例如,对同一笔订单的多次支付操作,只会成功一次。
3.2:通用解决方案

对于常见的系统,幂等性操作的通用性解决方案是:

  • 首先通过缓存去重。在缓存中如果已经存在了某幂等令牌,则说明本次操作是重复性操作;若缓存没有命中,则进入下一步
  • 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作;若不存在,则进入下一步。
  • 在同一事务中完成三项操作:唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到DB中。

在这里插入图片描述

第 1 步已经判断过是否是重复性操作了,为什么第 2 步还要再次判断?

一般缓存中的数据是具有有效期的。缓存中数据的有效期一旦过期,就是发生缓存穿透,使请求直接就到达了DBMS。

3.3:解决方案举例

以支付场景为例:

  • 当支付请求到达后,首先在Redis缓存中却获取key为支付流水号的缓存value
    • 若value不空,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;
    • 若value为空,则进入下一步操作
  • 到DBMS中根据支付流水号查询是否存在相应实例
    • 若存在,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;
    • 若不存在,则说明本次操作是首次操作,进入下一步完成唯一性处理
  • 在分布式事务中完成三项操作:
    • 完成支付任务
    • 将当前支付流水号作为key,任意字符串作为value,通过set(key, value, expireTime)将数据写入到Redis缓存
    • 将当前支付流水号作为主键,与其它相关数据共同写入到DBMS
3.4:消息幂等的实现

消费幂等的解决方案很简单:为消息指定不会重复的唯一标识。因为Message ID有可能出现重复的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。

最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);

消费者收到消息时可以根据消息的Key即订单号来实现消费幂等:

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context ) {for(MessageExt msg : msgs) {String key = msg.getKeys();// 根据业务唯一标识Key做幂等处理// ......}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});

RocketMQ能够保证消息不丢失,但不能保证消息不重复

二:消息堆积问题

在这里插入图片描述
消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多,这部分消息就被称为堆积消息

消息出现堆积进而会造成消息的消费延迟。以下场景需要重点关注消息堆积和消费延迟问题:

  • 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
  • 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受。

1:产生原因分析

在这里插入图片描述
Consumer使用长轮询Pull模式消费消息时,分为以下两个阶段:

1.1:消息拉取

consumer通过长轮询Pull模式批量拉取的方式从服务端获取消息,将拉取的消息缓存到本地缓冲队列中

对于拉取式消费,在内网环境下会有很高的吞吐量,所以这一阶段一般不会是消息堆积的瓶颈

一个单线程分区的低规格主机其可达到几万的TPS,如果是多个分区的多个线程,可以轻松达到几十万的TPS

1.2:消息消费

Consumer将本地缓存的消息提交到消费线程中,使用业务消费逻辑对消息进行处理,处理完毕后获取到一个结果。这是真正的消息消费过程。

此时Consumer的消费能力就完全依赖于消息的消费耗时消费并发度了。

如果由于业务处理逻辑复杂等原因,导致处理单条消息的耗时较长,则整体的消息吞吐量肯定不会高,此时就会导致Consumer本地缓冲队列达到上限,停止从服务端拉取消息。

所以,消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时消费并发度决定。

注意,消费耗时的优先级要高于消费并发度。即在保证了消费耗时的合理性前提下,再考虑消费并发度问题。

2:消费耗时

影响消息处理时长的主要因素是代码逻辑。而代码逻辑中可能会影响处理时长代码主要有两种类型:CPU内部计算型代码外部I/O操作型代码

通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部I/O操作来说几乎可以忽略。所以外部IO型代码是影响消息处理时长的主要症结所在。

外部IO操作型代码举例:

  • 读写外部数据库,例如对远程MySQL的访问
  • 读写外部缓存系统,例如对远程Redis的访问
  • 下游系统调用,例如Dubbo的RPC远程调用,Spring Cloud的对下游系统的Http接口调用

关于下游系统调用逻辑需要进行提前梳理,掌握每个调用操作预期的耗时,这样做是为了能够判断消费逻辑中IO操作的耗时是否合理。

通常消息堆积是由于下游系统出现了服务异常达到了DBMS容量限制,导致消费耗时增加。

服务异常,并不仅仅是系统中出现的类似 500 这样的代码错误,而可能是更加隐蔽的问题。例如,网络带宽问题。

达到了DBMS容量限制,其也会引发消息的消费耗时增加。

3:消费并发度

一般情况下,消费者端的消费并发度由单节点线程数和节点数量共同决定,其值为value = 单节点线程数*节点数量

通常需要优先调整单节点的线程数,若单机硬件资源达到了上限,则需要通过横向扩展来提高消费并发度。

单节点线程数,即单个Consumer所包含的线程数量

节点数量,即Consumer Group所包含的Consumer数量

对于普通消息、延时消息及事务消息,并发度计算都是单节点线程数*节点数量。但对于顺序消息则是不同的。

顺序消息的消费并发度等于Topic的Queue分区数量。

全局顺序消息:

  • 该类型消息的Topic只有一个Queue分区。其可以保证该Topic的所有消息被顺序消费。
  • 为了保证这个全局顺序性,Consumer Group中在同一时刻只能有一个Consumer的一个线程进行消费。所以其并发度为 1 。

分区顺序消息:

  • 该类型消息的Topic有多个Queue分区。其仅可以保证该Topic的每个Queue分区中的消息被顺序消费,不能保证整个Topic中消息的顺序消费。
  • 为了保证这个分区顺序性,每个Queue分区中的消息在Consumer Group中的同一时刻只能有一个Consumer的一个线程进行消费。
  • 在同一时刻最多会出现多个Queue分蘖有多个Consumer的多个线程并行消费。所以其并发度为Topic的分区数量。

4:单机线程的计算

对于一台主机中线程池中线程数的设置需要谨慎,不能盲目直接调大线程数,设置过大的线程数反而会带来大量的线程切换的开销。

理想环境下单节点的最优线程数计算模型为:C *(T1 + T2)/ T1。

  • C:CPU内核数
  • T1:CPU内部逻辑计算耗时
  • T2:外部IO操作耗时

最优线程数 = C *(T1 + T2)/ T1 = C * T1/T1 + C * T2/T1 = C + C * T2/T1

注意,该计算出的数值是理想状态下的理论数据,在生产环境中,不建议直接使用。而是根据当前环境,先设置一个比该值小的数值然后观察其压测效果,然后再根据效果逐步调大线程数,直至找到在该环境中性能最佳时的值。

5:如何避免消息堆积和消费延迟

为了避免在业务使用时出现非预期的消息堆积和消费延迟问题,需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。

其中最重要的就是梳理消息的消费耗时设置消息消费的并发度

5.1:梳理消息的消费耗时

通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息:

  • 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
  • 消息消费逻辑中的I/O操作是否是必须的,能否用本地缓存等方案规避。
  • 消费逻辑中的复杂耗时的操作是否可以做异步化处理。如果可以,是否会造成逻辑错乱。
5.2:设置消费并发度

对于消息消费并发度的计算,可以通过以下两步实施:

  • 逐步调大单个Consumer节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。
  • 根据上下游链路的流量峰值计算出需要设置的节点数

节点数 = 流量峰值 / 单个节点消息吞吐量

三:消息清理问题

消息被消费过后会被清理掉吗?不会的。

消息是被顺序存储在commitlog文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的

消息是以commitlog文件为单位进行清理的。否则会急剧下降清理效率,并实现逻辑复杂。

commitlog文件存在一个过期时间,默认 72 小时

除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:

  • 文件过期,且到达清理时间点(默认为凌晨 4 点)后,自动清理过期文件
  • 文件过期,且磁盘空间占用率已达过期清理警戒线(默认75%)后,无论是否达到清理时间点,都会自动清理过期文件
  • 磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的规则清理文件,无论是否过期。默认会从最老的文件开始清理
  • 磁盘占用率达到系统危险警戒线(默认90%)后,Broker将拒绝消息写入

需要注意以下几点:

  • 对于RocketMQ系统来说,删除一个1G大小的文件,是一个压力巨大的IO操作。
    • 在删除过程中,系统性能会骤然下降。所以,其默认清理时间点为凌晨 4 点,访问量最小的时间。
    • 也正因如果,我们要保障磁盘空间的空闲率,不要使系统出现在其它时间点删除commitlog文件的情况。
  • 官方建议RocketMQ服务的Linux文件系统采用ext4。因为对于文件删除操作,ext4要比ext3性能更好

四:消息过滤问题

消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。

对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。

1:Tag方式过滤-简单

通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

2:SQL过滤-功能强大

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。

通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符。

支持的常量类型:

  • 数值:比如: 123 ,3.1415
  • 字符:必须用单引号包裹起来,比如:‘abc’
  • 布尔:TRUE 或 FALSE
  • NULL:特殊的常量,表示空

支持的运算符有:

  • 数值比较:>,>=,<,<=,BETWEEN,=
  • 字符比较:=,<>,IN
  • 逻辑运算 :AND,OR,NOT
  • NULL判断:IS NULL 或者 IS NOT NULL

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

enablePropertyFilter = true

在启动Broker时需要指定这个修改过的配置文件。

例如对于单机Broker的启动,其修改的配置文件是conf/broker.conf,启动时使用如下命令:

sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

3:代码举例

3.1:Tag方式进行过滤例子

定义Tag过滤Producer

public class FilterByTagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("pg");producer.setNamesrvAddr("rocketmqOS:9876");producer.start();String[] tags = {"myTagA","myTagB","myTagC"};for (int i = 0 ; i < 10 ; i++) {byte[] body = ("Hi," + i).getBytes();String tag = tags[i % tags.length];// 指定tagMessage msg = new Message("myTopic", tag, body);SendResult sendResult = producer.send(msg);System.out.println(sendResult);}producer.shutdown();}
}

定义Tag过滤Consumer

public class FilterByTagConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");consumer.setNamesrvAddr("rocketmqOS:9876"); // 设置NameServerconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 消费者订阅指定的tag, 多个tag用||分割consumer.subscribe("myTopic", "myTagA || myTagB"); // 订阅主题和tag,只有tagA或者tagB才进行消费// 定义异步并发监听,准备消费consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt me:msgs){System.out.println(me);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started");}
}
3.2:SQL方式进行过滤例子

定义SQL过滤Producer

public class FilterBySQLProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("pg");producer.setNamesrvAddr("rocketmqOS:9876");producer.start();for (int i = 0 ; i < 10 ; i++) {try {byte[] body = ("Hi," + i).getBytes();Message msg = new Message("myTopic", "myTag", body);// 自己声明一个属性,并赋值,到时候sqmsg.putUserProperty("age", i + "");SendResult sendResult = producer.send(msg);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}}producer.shutdown();}
}

定义SQL过滤Consumer

public class FilterBySQLConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");consumer.setNamesrvAddr("rocketmqOS:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 注意下面这句consumer.subscribe("myTopic", MessageSelector.bySql("age between 0 and 6"));// 监听并consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt me:msgs){System.out.println(me);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started");}
}

五:消息重试问题

1:消息发送的重试机制

Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。

注意事项

  • 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的
  • 只有普通消息具有发送重试机制,顺序消息是没有的
  • 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在RocketMQ中是无法避免的问题
  • 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
1.1:同步发送失败策略

对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。

如果发送失败,默认重试 2 次。

  • 重试时是不会选择上次发送失败的Broker,而是选择其它Broker。
  • 只有一个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。
// 创建一个producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定nameServer地址
producer.setNamesrvAddr("rocketmqOS:9876");
// 设置同步发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendFailed( 3 );
// 设置发送超时时限为5s,默认3s
producer.setSendMsgTimeout( 5000 );

同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标Broker。

这样做可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。

思考:让我们自己实现失败隔离功能,如何来做?

  • 方案一:Producer中维护某JUC的Map集合,其key是发生失败的时间戳,value为Broker实例。Producer中还维护着一个Set集合,其中存放着所有未发生发送异常的Broker实例。选择目标Broker是从该Set集合中选择的。再定义一个定时任务,定期从Map集合中将长期未发生发送异常的Broker清理出去,并添加到Set集合。
  • 方案二:为Producer中的Broker实例添加一个标识,例如是一个AtomicBoolean属性。只要该Broker上发生过发送异常,就将其置为true。选择目标Broker就是选择该属性值为false的Broker。再定义一个定时任务,定期将Broker的该属性置为false。
  • 方案三:为Producer中的Broker实例添加一个标识,例如是一个AtomicLong属性。只要该Broker上发生过发送异常,就使其值增一。选择目标Broker就是选择该属性值最小的Broker。若该值相同,采用轮询方式选择。

如果超过重试次数,则抛出异常,由Producer去保证消息不丢。

当然当生产者出现RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息。

1.2:异步发送失败策略

异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢

DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed( 0 );
1.3:消息刷盘失败策略

消息刷盘超时或slave不可用(slave在做数据同步时向master返回状态不是SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。

对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK = true来开启。

2:消费重试

2.1:顺序消息的消费重试

对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。

消费重试默认间隔时间为 1000 毫秒。重试期间应用会出现消息消费被阻塞的情况。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为 1000 ,其取值范围为[10,30000]
consumer.setSuspendCurrentQueueTimeMillis( 100 );

由于对顺序消息的重试是无休止的,不间断的,直至消费成功

⚠️ 所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。

⚠️ 顺序消息没有发送失败重试机制,但具有消费失败重试机制

2.2:无序消息的消费重试

对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。

不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。

对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息

2.3:消息重试次数和间隔

对于无序消息集群消费下的重试消费,每条消息默认最多重试 16 次,但每次重试的间隔时间是不同的,会逐渐变长。

每次重试的间隔时间如下表。

重试次数与上次重试的间隔时间重试次数与上次重试的间隔时间
110秒97分钟
230108 分钟
31分钟119 分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162 小时

若一条消息在一直消费失败的前提下,将会在正常消费后的第 4 小时 46 分后进行第 16 次重试。

若仍然失败,则将消息投递到死信队列

修改消费重试次数

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消费重试次数
consumer.setMaxReconsumeTimes( 10 );

对于修改过的重试次数,将按照以下策略执行:

  • 若修改值 < 16 ,则按照指定间隔进行重试
  • 若修改值 > 16 ,则超过 16 次的重试时间间隔均为 2 小时

对于Consumer Group

  • 若仅修改了一个Consumer的消费重试次数,则会应用到该Group中所有其它Consumer实例。

  • 若出现多个Consumer均做了修改的情况,则采用覆盖方式生效。即最后被修改的值会覆盖前面设置的值。

2.4:重试队列

对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。

当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称为%RETRY%consumerGroup@consumerGroup的重试队列。

  • 这个重试队列是针对消息才组的,而不是针对每个Topic设置的
  • 只有当出现需要进行重试消费的消息时,才会为该消费者组创建重试队列

在这里插入图片描述

注意,消费重试的时间间隔与延时消费延时等级十分相似

除了没有延时等级的前两个时间外,其它的时间都是相同的

Broker对于重试消息的处理是通过延时消息实现的。

  1. 先将消息保存到SCHEDULE_TOPIC_XXXX延迟队列中
  2. 延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup重试队列中。
2.5:消息重试/不重试配置方式
2.5.1:重试配置

集群消费方式下,消息消费失败后若希望消费重试,则需要在消息监听器接口的实现中明确进行如下三种方式之一的配置

  • 返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
  • 返回Null
  • 抛出异常
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {...} catch (Throwable e) {// 以下三种方式都可以引发消息重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;// return null;// throw new RuntimeException("消费异常");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
2.5.2:不重试配置

在捕获到异常后同样也返回与消费成功后的相同的结果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,则不进行消费重试。

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {...} catch (Throwable e) {// 也return成功,就不进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
2.6:死信队列

当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ)

死信队列其中的消息称为死信消息(Dead-Letter Message,DLM)。

死信队列是用于处理无法被正常消费的消息的。

2.6.1:死信队列特征
  • 死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的
  • 死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间), 3 天后会被自动删除
  • 死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,即每个消费者组都有一个死信队列
  • 如果一个消费者组未产生死信消息,则不会为其创建相应的死信队列
  • 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。
2.6.2:查询死信消息

消息队列RocketMQ版提供的查询死信消息的方式对比如下表所示。

查询方式查询条件查询类别说明
按Group ID查询Group ID + 时间段范围查询根据Group ID和时间范围,批量获取符合条件的所有消息;
查询量大,不易匹配。
按Message ID查询Group ID + Message ID精确查询根据Group ID和Message ID可以精确定位任意一条消息。
  1. 登录消息队列RocketMQ版控制台,在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏单击死信队列,然后在死信队列页面,选择以下任一方式查询死信消息:

  • 按 Group 查询 -> 根据Group ID和死信消息产生的时间范围,批量查询该Group ID在某段时间内产生的所有死信消息。页面中会显示所有符合筛选条件的死信消息

在这里插入图片描述

  • 按Message ID查询消息属于精确查询。您可以根据Group ID与Message ID精确查询到任意一条消息。页面中会显示所有符合筛选条件的死信消息

在这里插入图片描述

2.6.3:死信队列处理

实际上,当一条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消费该消息,比如代码中原本就存在Bug。

因此需要开发人员进行特殊处理。最关键的步骤是要排查可疑因素,解决代码中可能存在的Bug,然后再将原来的死信消息再次进行投递消费

六:如何保证消息不丢失

1:Producer如何保证消息不丢失


生产者发送消息之后,给生产者一个确定的通知,这个消息在Broker端是否写入完成了。

//异步发送,不需要broker确认,效率高,但是会有丢消息的可能。
producer.sendOneway(msg);
//同步发送,生产者等待Broker确认。消息最安全,但效率最低
SendResult sendResult = producer.send(msg,20*1000);
//异步发送,生产者另起一个线程等待broker确认,收到Broker确认之后直接触发回调方法。消息安全和效率之间比较均衡,但是会加大客户端的负担。
producer.send(msg,new SendCallback(){@Overridepublic void onSuccess(SendResult sendResult){//do something}@Overridepublic void onException(Throwable e){//do something}
})

​与之类似的,Kafka同样也提供了这种同步和异步的发送消息机制

//直接send发送消息,返回的是一个Future。这就相当于是异步调用
Future<RecordMetadata> future = producer.send(record);
//调用future的get方法才会世纪获取到发送的结果,生产者收到这个结果后,就可以知道消息是否成功发送到broker了。这个过程就变成了一个同步的过程
RecordMetadata recordMetadata = producer.send(record).get();

​而在RabbitMQ中,则是提供了一个Publisher Confirms生产者确认机制。

其思路也是Publisher收到Broker的响应后再发出对应的回调方法。

//获取channel
Channel ch = ...;
//添加两个回调,一个处理ack响应,一个处理nack响应
ch.addConfirmListener(ConfirmCallback ackCallback,ConfirmCallback nackCallback)

2:事务消息保证消息不丢失

在这里插入图片描述
先定义本地事务监听器

package com.mytest.mqdemo.producer;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;/*** <p>* 功能描述:* </p>** @author cui haida* @date 2024/03/24/17:52*/
public class TransactionListener implements org.apache.rocketmq.client.producer.TransactionListener {/*** 执行本地事务* @param message 消息体* @param o 参数* @return 执行本地事务的结果*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// todo: 执行本地事务,这里这是模拟String tags = message.getTags();if (StringUtils.contains("A", tags)) {return LocalTransactionState.COMMIT_MESSAGE;}if (StringUtils.contains("B", tags)) {return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}/*** 回查本地事务* @param message 待回查的本地事务信息* @return 回查结果*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt message) {String tags = message.getTags();if (StringUtils.contains("C", tags)) {return LocalTransactionState.COMMIT_MESSAGE;}if (StringUtils.contains("D", tags)) {return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}
}

生产者指定本地事务监听器,并进行消息的发送

package com.mytest.mqdemo.producer;import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** <p>* 功能描述:* </p>** @author cui haida* @date 2024/03/24/17:47*/
public class TransactionProducer {public static void main(String[] args) throws Exception {// 注意这里使用的是事务消息生产者TransactionMQProducer transactionProducer = new TransactionMQProducer("transaction_producer");transactionProducer.setNamesrvAddr("192.168.111.128:9876");// 异步提交,提高性能ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, // 4大基本参数new ArrayBlockingQueue<>(2000), // 队列类型和大小new ThreadFactory() { // 线程工厂指定@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("test-for-trans");return thread;}});transactionProducer.setExecutorService(threadPoolExecutor);// 创建一个本地事务的监听器transactionProducer.setTransactionListener(new TransactionListener());transactionProducer.start();String[] tags = new String[]{"A", "B", "C", "D", "E"};for (int i = 0; i < 10; i++) {String body = (tags[i % tags.length] + "trans_message");Message message = new Message("trans", tags[i % tags.length], body.getBytes(StandardCharsets.UTF_8));// 发送TransactionSendResult transactionSendResult = transactionProducer.sendMessageInTransaction(message, null);System.out.println("消息发送成功_" + transactionSendResult);Thread.sleep(10);}Thread.sleep(10000);transactionProducer.shutdown();}
}

定义消费者信息

因为半消息和回查对于消费者无感知,所以消费者就是最基本的实现

package com.mytest.mqdemo.customer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.nio.charset.StandardCharsets;/*** @author cuihaida* <p>*     简单消费* </p>*/
public class PushConsumer {public static void main(String[] args) throws MQClientException {// 消费的推拉模式// 拉模式:pull -> 消费者主动去Broker上拉取消息// 推模式:push -> 消费者等待Broker将消息推送过来// 定义一个push消费者String consumerGroup = "cg";DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);// 指定nameServerconsumer.setNamesrvAddr("192.168.111.129:9876");// 指定消费topic与tagString consumerTopic = "someTopic";String subTag = "*"; // 当前主题下的任意标签消息consumer.subscribe(consumerTopic, subTag);// 指定采用“广播模式”进行消费,默认为“集群模式”// consumer.setMessageModel(MessageModel.BROADCASTING);// 注册消息监听器// 一旦broker中有了其订阅的消息就会触发该方法的执行,其返回值为当前consumer消费的状态// MessageListener有两个实现类:// --> MessageListenerConcurrently -> 并发消费// --> MessageListenerOrderly -> 顺序消费// 第一个参数是消息列表,第二个参数是上下文consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {// 逐条消费消息for (MessageExt msg : messages) {try {String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);System.out.println(msgBody);// todo: processMessage(msgBody);// 手动ACK,返回消费状态:消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {System.out.println("消息消费异常");// todo: 打印堆栈信息,异常处理// 手动ACK,返回消费状态:需要重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 返回消费状态:消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 开启消费者消费consumer.start();System.out.println("=========> Consumer Started <=============");}
}

3:broker保证消息不丢失

Producer把消息发送到Broker上了之后,Broker是不是能保证消息不丢失呢?这里有一个核心问题——PageCache缓存

​ 数据会优先写入缓存,然后过一段时间再写图到磁盘。但是缓存中的数据有个特点,就是断点即丢失,所以,如果服务器发生非正常断电,内存中的数据还没有写入磁盘,这是就会造成消息丢失。

以Linux为例,用户态就是应用程序,不管是什么应用程序,想要写入磁盘文件时,都只能调用操作系统提供的write系统调用,申请写磁盘。至于消息如何经过PageCache再写入到磁盘中,这个过程就是在内核态执行的,也就是操作系统自己执行的,应用程序无法干预。这个过程中,应用系统唯一能够干预的,就是调用操作系统提供的sync系统调用,申请一次刷盘操作,主动将PageCache中的数据写入到磁盘。

4:Broker主从同步如何保证不丢失

对于Broker来说,通常Slaver的作用就是做一个数据备份,当Broker服务宕机时,甚至是磁盘坏了时,可以从Slaver上获取数据记录。

但是,如果主从同步失败了,那么Broker的这一层保证就会失效。因此,主从同步也有可能造成消息的丢失。

  • 普通集群 - 指定角色,各司其职
  • Dledger高可用集群,自行选举,多数同意

5:Consumer如何保证消费不丢失

几乎所有的MQ产品都设置了消费者消费者状态确认机制。也就是消费者处理完消息之后,需要给Broker一个响应,表示消息被正常处理了。

如果Broker没有拿到这个响应,不管是因为Consumer没有拿到还是Consumer处理完消息后咩有给出响应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这些没有处理成功的消息。

6:如果MQ服务全挂了,如何保证不丢失

​ 针对这种情况,通常做法是设计一个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进行后续的业务。

​ 此时,再启动一个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,至少当MQ服务恢复之后,这些消息可以尽快进入到MQ中,继续往下游Consumer推送,而不至于造成消息丢失。

在这里插入图片描述

相关文章:

RocketMQ - 常见问题

RocketMQ常见问题 文章目录 RocketMQ常见问题一&#xff1a;消息幂等问题1&#xff1a;什么是消费幂等2&#xff1a;消息重复的场景分析2.1&#xff1a;发送时消息重复2.2&#xff1a;消费时消息重复2.3&#xff1a;Rebalance时消息重复 3&#xff1a;通用解决方案3.1&#xff…...

kafka消费能力压测:使用官方工具

背景 在之前的业务场景中&#xff0c;我们发现Kafka的实际消费能力远低于预期。尽管我们使用了kafka-go组件并进行了相关测试&#xff0c;测试情况见《kafka-go:性能测试》这篇文章。但并未能准确找出消费能力低下的原因。 我们曾怀疑这可能是由我的电脑网络带宽问题或Kafka部…...

基于Spring Boot的社区居民健康管理平台的设计与实现

目录 1 绪论 1.1 研究现状 1.2 研究意义 1.3 组织结构 2 技术介绍 2.1 平台开发工具和环境 2.2 Vue介绍 2.3 Spring Boot 2.4 MyBatis 2.5 环境搭建 3 系统需求分析 3.1 可行性分析 3.2 功能需求分析 3.3 系统用例图 3.4 系统功能图 4 系统设计 4.1 系统总体描…...

网络安全架构战略 网络安全体系结构

本节书摘来自异步社区《网络安全体系结构》一书中的第1章&#xff0c;第1.4节&#xff0c;作者【美】Sean Convery 1.4 一切皆为目标 网络安全体系结构 当前的大型网络存在着惊人的相互依赖性&#xff0c;作为一名网络安全设计师&#xff0c;对这一点必须心知肚明。Internet就…...

【Spring+MyBatis】_图书管理系统(中篇)

【SpringMyBatis】_图书管理系统&#xff08;上篇&#xff09;-CSDN博客文章浏览阅读654次&#xff0c;点赞4次&#xff0c;收藏7次。&#xff08;1&#xff09;当前页的内容records&#xff08;类型为List&#xff09;&#xff1b;参数&#xff1a;userNameadmin&&pas…...

Python - 爬虫利器 - BeautifulSoup4常用 API

文章目录 前言BeautifulSoup4 简介主要特点&#xff1a;安装方式: 常用 API1. 创建 BeautifulSoup 对象2. 查找标签find(): 返回匹配的第一个元素find_all(): 返回所有匹配的元素列表select_one() & select(): CSS 选择器 3. 访问标签内容text 属性: 获取标签内纯文本get_t…...

宝塔面板开始ssl后,使用域名访问不了后台管理

宝塔面板后台开启ssl访问后&#xff0c;用的证书是其他第三方颁发的证书 再使用 域名/xxx 的形式&#xff1a;https://域名:xxx/xxx 访问后台&#xff0c;结果出现如下&#xff0c;不管使用 http 还是 https 的路径访问都进不后台管理 这个时候可以使用 https://ip/xxx 的方式来…...

大一计算机的自学总结:前缀树(字典树、Trie树)

前言 前缀树&#xff0c;又称字典树&#xff0c;Trie树&#xff0c;是一种方便查找前缀信息的数据结构。 一、字典树的实现 1.类描述实现 #include <bits/stdc.h> using namespace std;class TrieNode { public:int pass0;int end0;TrieNode* nexts[26]{NULL}; };Tri…...

docker 安装的open-webui链接ollama出现网络错误

# 故事背景 部署完ollama以后&#xff0c;使用谷歌浏览器的插件Page Assist - 本地 AI 模型的 Web UI 可以比较流畅的使用DeepSeek&#xff0c;但是只局限于个人使用&#xff0c;想分享给更多的小伙伴使用&#xff0c;于是打算使用open-webui 来管理用户&#xff0c;经官网推荐…...

未来游戏:当人工智能重构虚拟世界的底层逻辑

未来游戏&#xff1a;当人工智能重构虚拟世界的底层逻辑 在《赛博朋克2077》夜之城的霓虹灯下&#xff0c;玩家或许已经注意到酒吧里NPC开始出现微表情变化&#xff1b;在《艾尔登法环》的开放世界中&#xff0c;敌人的战术包抄逐渐显露出类人智慧。这些细节预示着游戏产业正站…...

Redis集群主从切换源码解读

一切的开始 打开Redis5.0.5的源码中server.c&#xff0c;找到如下代码&#xff0c;这里运行了一个定时任务&#xff0c;每隔100毫秒执行一次。 /* Run the Redis Cluster cron. *//** 每隔100毫秒执行一次* 要求开启集群模式*/run_with_period(100) {if (server.cluster_enabl…...

javacv将mp4视频切分为m3u8视频并播放

学习链接 ffmpeg-demo 当前对应的 gitee代码 Spring boot视频播放(解决MP4大文件无法播放)&#xff0c;整合ffmpeg,用m3u8切片播放。 springboot 通过javaCV 实现mp4转m3u8 上传oss 如何保护会员或付费视频&#xff1f;优酷是怎么做的&#xff1f; - HLS 流媒体加密 ffmpe…...

Golang学习笔记_33——桥接模式

Golang学习笔记_30——建造者模式 Golang学习笔记_31——原型模式 Golang学习笔记_32——适配器模式 文章目录 桥接模式详解一、桥接模式核心概念1. 定义2. 解决的问题3. 核心角色4. 类图 二、桥接模式的特点三、适用场景1. 多维度变化2. 跨平台开发3. 动态切换实现 四、与其他…...

蜂鸟视图发布AI智能导购产品:用生成式AI重构空间服务新范式

在人工智能技术飞速发展的今天&#xff0c;北京蜂鸟视图正式宣布推出基于深度求索&#xff08;DeepSeek&#xff09;等大模型的《AI智能导购产品》&#xff0c;通过生成式AI与室内三维地图的深度融合&#xff0c;重新定义空间场景的智能服务体验。 这一创新产品将率先应用于购物…...

AI服务器散热黑科技:让芯片“冷静”提速

AI 服务器为何需要散热黑科技 在人工智能飞速发展的当下&#xff0c;AI 服务器作为核心支撑&#xff0c;作用重大。从互联网智能推荐&#xff0c;到医疗疾病诊断辅助&#xff0c;从金融风险预测&#xff0c;到教育个性化学习&#xff0c;AI 服务器广泛应用&#xff0c;为各类复…...

数据结构-栈、队列、哈希表

1栈 1.栈的概念 1.1栈:在表尾插入和删除操作受限的线性表 1.2栈逻辑结构: 线性结构(一对一) 1.3栈的存储结构:顺序存储(顺序栈)、链表存储(链栈) 1.4栈的特点: 先进后出(fisrt in last out FILO表)&#xff0c;后进先出 //创建栈 Stacklist create_stack() {Stacklist lis…...

安装海康威视相机SDK后,catkin_make其他项目时,出现“libusb_set_option”错误的解决方法

硬件&#xff1a;雷神MIX G139H047LD 工控机 系统&#xff1a;ubuntu20.04 之前运行某项目时&#xff0c;处于正常状态。后来由于要使用海康威视工业相机&#xff08;型号&#xff1a;MV-CA013-21UC&#xff09;&#xff0c;便下载了并安装了该相机的SDK&#xff0c;之后运行…...

【鸿蒙】ArkUI-X跨平台问题集锦

系列文章目录 【鸿蒙】ArkUI-X跨平台问题集锦 文章目录 系列文章目录前言问题集锦1、HSP,HAR模块中 无法引入import bridge from arkui-x.bridge;2、CustomDialog 自定义弹窗中的点击事件在Android 中无任何响应&#xff1b;3、调用 buildRouterMode() 路由跳转页面前&#xf…...

大模型驱动的业务自动化

大模型输出token的速度太低且为统计输出&#xff0c;所以目前大模型主要应用在toP&#xff08;人&#xff09;的相关领域&#xff1b;但其智能方面的优势又是如此的强大&#xff0c;自然就需要尝试如何将其应用到更加广泛的toM&#xff08;物理系统、生产系统&#xff09;领域中…...

ocr智能票据识别系统|自动化票据识别集成方案

在企业日常运营中&#xff0c;对大量票据实现数字化管理是一项耗时且容易出错的任务。随着技术的进步&#xff0c;OCR&#xff08;光学字符识别&#xff09;智能票据识别系统的出现为企业提供了一个高效、准确的解决方案&#xff0c;不仅简化了财务流程&#xff0c;还大幅提升了…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇&#xff0c;相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程&#xff0c;其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线&#xff0c; n r n_r nr​ 根接收天线的 MIMO 系…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案

在大数据时代&#xff0c;海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构&#xff0c;在处理大规模数据抓取任务时展现出强大的能力。然而&#xff0c;随着业务规模的不断扩大和数据抓取需求的日益复杂&#xff0c;传统…...

如何配置一个sql server使得其它用户可以通过excel odbc获取数据

要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据&#xff0c;你需要完成以下配置步骤&#xff1a; ✅ 一、在 SQL Server 端配置&#xff08;服务器设置&#xff09; 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到&#xff1a;SQL Server 网络配…...

嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)

目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 ​编辑​编辑 UDP的特征 socke函数 bind函数 recvfrom函数&#xff08;接收函数&#xff09; sendto函数&#xff08;发送函数&#xff09; 五、网络编程之 UDP 用…...

加密通信 + 行为分析:运营商行业安全防御体系重构

在数字经济蓬勃发展的时代&#xff0c;运营商作为信息通信网络的核心枢纽&#xff0c;承载着海量用户数据与关键业务传输&#xff0c;其安全防御体系的可靠性直接关乎国家安全、社会稳定与企业发展。随着网络攻击手段的不断升级&#xff0c;传统安全防护体系逐渐暴露出局限性&a…...

前端工具库lodash与lodash-es区别详解

lodash 和 lodash-es 是同一工具库的两个不同版本&#xff0c;核心功能完全一致&#xff0c;主要区别在于模块化格式和优化方式&#xff0c;适合不同的开发环境。以下是详细对比&#xff1a; 1. 模块化格式 lodash 使用 CommonJS 模块格式&#xff08;require/module.exports&a…...