RocketMQ核心特性与最佳实践
目录
1. 引言
2. RocketMQ核心特性
2.1 架构演进
2.2 核心组件
2.3 消息模型
2.4 高级特性
3. RocketMQ与其他MQ产品选型对比
3.1 功能特性对比
3.2 适用场景对比
3.3 选型建议
4. RocketMQ部署最佳实践
4.1 部署模式选择
4.2 硬件配置建议
4.3 操作系统优化
4.4 JVM参数优化
5. RocketMQ开发最佳实践
5.1 生产者最佳实践
5.2 消费者最佳实践
5.3 事务消息实践
5.4 顺序消息实践
5.5 延时消息实践
6. RocketMQ运维最佳实践
6.1 监控与告警
6.2 日志管理
6.3 容量规划
6.4 常见问题处理
7. RocketMQ应用场景最佳实践
7.1 异步解耦
7.2 削峰填谷
7.3 分布式事务
7.4 日志收集
7.5 分布式限流
8. 总结
1. 引言
Apache RocketMQ是一个分布式消息和流平台,具有低延迟、高性能和高可靠性,广泛应用于互联网、大数据、移动互联网、物联网等领域。本文将介绍RocketMQ的核心特性、数据结构、应用场景和最佳实践,并提供与其他消息队列产品的选型对比。
2. RocketMQ核心特性
2.1 架构演进
RocketMQ从3.0版本发展至今,经历了多次重大架构升级。5.0版本是一次重大革新,引入了Proxy组件,支持Local模式和Cluster模式两种部署方式:
- Local模式:Broker和Proxy部署在同一进程中,只需在原有Broker配置基础上添加Proxy配置即可运行
- Cluster模式:Broker和Proxy分开部署,可以在现有集群基础上单独部署Proxy
2.2 核心组件
RocketMQ由以下核心组件构成:
- NameServer:轻量级的服务注册与发现中心,支持Broker的注册与发现
- Broker:负责消息的存储、投递和查询以及服务高可用保证
- Proxy:5.0版本新增组件,负责客户端请求的接入和处理
- Producer:消息生产者,负责产生消息
- Consumer:消息消费者,负责消费消息
- Topic:消息主题,一级消息类型,是消息订阅的基本单位
- Message Queue:消息队列,用于存储消息
2.3 消息模型
RocketMQ采用异步通信模型和发布/订阅的消息传输模型,具有简单的系统拓扑和弱上下游耦合特性,适用于异步解耦和负载转移场景。
2.4 高级特性
-
消息类型
- 普通消息:单向、同步、异步三种发送方式
- 顺序消息:保证消息的顺序性,支持全局顺序和分区顺序
- 延时消息:支持定时发送
- 事务消息:支持分布式事务,确保"exactly-once"语义
-
高可用机制
- 多节点集群:支持多Master模式、多Master多Slave模式
- 同步双写:支持同步复制和异步复制
- 自动故障转移:Master宕机后,消费者可以从Slave读取数据
- 5.0版本HA:提供更灵活的HA机制,平衡成本、服务可用性和数据可靠性
-
消息可靠性
- 消息持久化:高性能低延迟的文件存储
- 消息回溯:支持按时间戳和偏移量两种方式回溯
- 消息重试:支持消费失败重试机制
- 死信队列:处理多次重试失败的消息
-
性能与扩展性
- 高吞吐量:单机支持十万级TPS
- 低延迟:毫秒级消息延迟
- 线性扩展:支持水平扩展,动态增加节点
- 多语言客户端:支持Java、C++、Go等多种语言
-
运维与监控
- Dashboard:可视化管理控制台
- Prometheus Exporter:提供监控指标
- 丰富的命令行工具:支持消息查询、主题管理等
3. RocketMQ与其他MQ产品选型对比
3.1 功能特性对比
消息产品 | 客户端SDK | 协议与规范 | 顺序消息 | 定时消息 | 批量消息 | 广播消息 | 消息过滤 | 服务端触发重新投递 | 消息存储 | 消息回溯 | 消息优先级 | 高可用与故障转移 | 消息追踪 | 配置 | 管理与运维工具 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ActiveMQ | Java, .NET, C++等 | Push模型,支持OpenWire, STOMP, AMQP, MQTT, JMS | 独占消费者或独占队列可确保顺序 | 支持 | 不支持 | 支持 | 支持 | 不支持 | 支持使用JDBC的快速持久化和高性能日志,如levelDB, kahaDB | 支持 | 支持 | 支持,取决于存储,使用levelDB需要ZooKeeper服务器 | 不支持 | 默认配置级别较低,用户需要优化配置参数 | 支持 |
Kafka | Java, Scala等 | Pull模型,支持TCP | 确保分区内消息顺序 | 不支持 | 支持,使用异步生产者 | 不支持 | 支持,可使用Kafka Streams过滤消息 | 不支持 | 高性能文件存储 | 支持偏移量指示 | 不支持 | 支持,需要ZooKeeper服务器 | 不支持 | Kafka使用键值对格式进行配置,可以从文件或以编程方式提供 | 支持,使用终端命令公开核心指标 |
RocketMQ | Java, C++, Go | Pull模型,支持TCP, JMS, OpenMessaging | 确保严格的消息顺序,可以优雅地扩展 | 支持 | 支持,使用同步模式避免消息丢失 | 支持 | 支持,基于SQL92的属性过滤表达式 | 支持 | 高性能低延迟文件存储 | 支持时间戳和偏移量两种指示 | 不支持 | 支持,主从模型,无需其他工具 | 支持 | 开箱即用,用户只需关注少量配置 | 支持,丰富的Web和终端命令公开核心指标 |
3.2 适用场景对比
-
ActiveMQ
- 适用场景:传统企业应用集成,支持多种协议的场景
- 优势:协议丰富,支持消息优先级
- 劣势:性能相对较低,扩展性有限
-
Kafka
- 适用场景:大数据处理,日志收集,流处理
- 优势:超高吞吐量,良好的水平扩展性,适合数据管道
- 劣势:不支持定时消息,消息可靠性保证相对较弱
-
RocketMQ
- 适用场景:金融级可靠业务消息,电商交易系统,实时计算
- 优势:金融级可靠性,丰富的消息类型,低延迟,无外部依赖
- 劣势:社区相对较小,不支持消息优先级
3.3 选型建议
-
业务场景导向
- 需要金融级可靠性:选择RocketMQ
- 需要超高吞吐量和流处理:选择Kafka
- 需要多协议支持和传统JMS特性:选择ActiveMQ
-
技术栈考量
- Java生态系统:三者均可
- .NET或多语言环境:ActiveMQ支持更多语言客户端
- 无外部依赖要求:RocketMQ不依赖ZooKeeper等外部系统
-
运维复杂度
- 简单运维:RocketMQ配置简单,开箱即用
- 已有ZooKeeper集群:Kafka可以复用现有基础设施
- 需要消息追踪能力:RocketMQ内置支持
4. RocketMQ部署最佳实践
4.1 部署模式选择
RocketMQ支持多种部署模式,根据业务需求和可用资源选择合适的模式:
-
单节点单副本模式
- 适用场景:本地测试和开发环境
- 风险:单点故障风险高,Broker重启或宕机会导致整个服务不可用
- 部署命令:
bash
# 启动NameServer $ nohup sh bin/mqnamesrv &# 启动Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
-
多节点(集群)单副本模式
- 适用场景:对性能要求高,数据可靠性要求一般的场景
- 优势:配置简单,单个Master宕机或重启对应用无影响,性能最高
- 劣势:单机宕机期间,该机器上未消费的消息在机器恢复前无法被订阅
- 部署命令(以2个Master为例):
bash
# 启动NameServer $ nohup sh bin/mqnamesrv &# 启动第一个Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-noslave/broker-a.properties --enable-proxy &# 启动第二个Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-noslave/broker-b.properties --enable-proxy &
-
多节点(集群)多副本模式 - 异步复制
- 适用场景:对性能和可用性都有较高要求的场景
- 优势:磁盘损坏时消息丢失很少,Master宕机后消费者可以从Slave消费
- 劣势:Master宕机或磁盘损坏时会丢失少量消息
- 部署命令(以2主2从为例):
bash
# 启动NameServer $ nohup sh bin/mqnamesrv &# 启动第一个Master Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-async/broker-a.properties --enable-proxy &# 启动第一个Slave Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-async/broker-a-s.properties --enable-proxy &# 启动第二个Master Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-async/broker-b.properties --enable-proxy &# 启动第二个Slave Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-async/broker-b-s.properties --enable-proxy &
-
多节点(集群)多副本模式 - 同步双写
- 适用场景:对数据可靠性要求极高的金融级应用
- 优势:数据和服务都没有单点故障,Master宕机时消息无延迟
- 劣势:性能比异步复制模式略低(约10%),发送单条消息的RT略高
- 部署命令(以2主2从为例):
bash
# 启动NameServer $ nohup sh bin/mqnamesrv &# 启动第一个Master Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-sync/broker-a.properties --enable-proxy &# 启动第一个Slave Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-sync/broker-a-s.properties --enable-proxy &# 启动第二个Master Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-sync/broker-b.properties --enable-proxy &# 启动第二个Slave Broker+Proxy $ nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-sync/broker-b-s.properties --enable-proxy &
4.2 硬件配置建议
-
磁盘选择
- 推荐使用SSD,特别是对于CommitLog存储
- 生产环境建议使用RAID10,兼顾性能和可靠性
- 避免使用网络存储NAS
-
内存配置
- 为JVM分配足够内存,建议最少4GB
- 保留足够的系统内存用于页面缓存,提高读性能
-
CPU配置
- 多核CPU,建议至少4核
- 对于高吞吐量场景,建议8核以上
-
网络配置
- 使用万兆网卡,特别是对于高吞吐量场景
- 集群内部通信使用独立网络,避免与业务网络共用
4.3 操作系统优化
-
文件描述符限制
bash
# 在/etc/security/limits.conf中添加 * soft nofile 65536 * hard nofile 65536
-
内核参数优化
bash
# 在/etc/sysctl.conf中添加 net.ipv4.tcp_max_tw_buckets=300000 net.ipv4.tcp_tw_reuse=1 net.ipv4.tcp_tw_recycle=1 net.ipv4.tcp_fin_timeout=30 net.ipv4.tcp_keepalive_time=1200 net.ipv4.tcp_max_syn_backlog=8192 net.ipv4.tcp_mem=94500000 915000000 927000000 net.ipv4.tcp_rmem=4096 87380 4194304 net.ipv4.tcp_wmem=4096 16384 4194304 net.ipv4.tcp_window_scaling=1 net.core.wmem_default=8388608 net.core.rmem_default=8388608 net.core.rmem_max=16777216 net.core.wmem_max=16777216 net.core.netdev_max_backlog=32768 net.core.somaxconn=32768 vm.overcommit_memory=1 vm.swappiness=1
-
磁盘调度算法
bash
# 对于SSD,使用noop或deadline调度器 echo noop > /sys/block/sda/queue/scheduler
4.4 JVM参数优化
bash
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25"
JAVA_OPT="${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=30"
JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime"
JAVA_OPT="${JAVA_OPT} -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation"
JAVA_OPT="${JAVA_OPT} -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
5. RocketMQ开发最佳实践
5.1 生产者最佳实践
-
消息发送注意事项
a. 使用标签(Tag)
应用可以设置自由的标签,消费者可以通过标签过滤消息:
java
// 5.x SDK设置标签 Message message = messageBuilder.setTopic("TopicTest").setTag("TagA").setBody("Hello RocketMQ".getBytes()).build();
b. 使用键(Key)
建议每条消息都设置业务键,用于定位消息丢失问题:
java
// 5.x SDK设置业务键 Message message = messageBuilder.setTopic("TopicTest").setTag("TagA").setKeys("OrderId12345").setBody("Hello RocketMQ".getBytes()).build();
c. 打印日志
无论消息发送成功或失败,都需要打印消息日志用于故障排查:
java
try {SendReceipt sendReceipt = producer.send(message);// 发送成功,记录日志logger.info("Message sent successfully, messageId: {}", sendReceipt.getMessageId()); } catch (Exception e) {// 发送失败,记录日志logger.error("Failed to send message, topic: {}, keys: {}", message.getTopic(), message.getKeys(), e); }
-
消息发送失败处理方法
a. 内部重试策略
Producer的send方法支持内部重试,5.x版本重试逻辑参考:
java
// 配置发送超时和重试次数 ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("localhost:9876").setRequestTimeout(Duration.ofSeconds(10)).build();// 创建生产者并设置重试策略 Producer producer = Provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setMaxAttempts(3) // 设置最大重试次数.build();
b. 应用层重试
对于业务要求消息必须发送成功的场景,可以实现DB存储+定时重试机制:
java
public void sendMessageWithRetry(Message message) {try {// 尝试发送消息SendReceipt receipt = producer.send(message);// 发送成功,更新DB状态messageRepository.markAsSent(message.getKeys(), receipt.getMessageId());} catch (Exception e) {// 发送失败,存储到DBmessageRepository.saveForRetry(message);logger.error("Message send failed, saved for retry. keys: {}", message.getKeys(), e);} }// 定时任务,定期重试发送失败的消息 @Scheduled(fixedRate = 60000) public void retryFailedMessages() {List<MessageEntity> failedMessages = messageRepository.findPendingMessages();for (MessageEntity entity : failedMessages) {try {// 重建消息并发送Message message = convertToMessage(entity);SendReceipt receipt = producer.send(message);// 发送成功,更新状态messageRepository.markAsSent(entity.getMessageKey(), receipt.getMessageId());} catch (Exception e) {// 更新重试次数和下次重试时间messageRepository.updateRetryCount(entity.getId());logger.error("Retry failed for message: {}", entity.getMessageKey(), e);}} }
-
提高生产者性能的技巧
a. 批量发送
对于吞吐量要求高的场景,可以使用批量发送提高性能:
java
List<Message> messages = new ArrayList<>(); for (int i = 0; i < 10; i++) {Message msg = messageBuilder.setTopic("TopicTest").setTag("TagA").setKeys("Key" + i).setBody(("Hello RocketMQ " + i).getBytes()).build();messages.add(msg); }// 批量发送 try {producer.send(messages); } catch (Exception e) {logger.error("Failed to send batch messages", e); }
b. 异步发送
对于延迟敏感的应用,可以使用异步发送:
java
producer.sendAsync(message).thenAccept(receipt -> logger.info("Message sent successfully, messageId: {}", receipt.getMessageId())).exceptionally(e -> {logger.error("Failed to send message", e);return null;});
c. 合理设置生产者线程池
java
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("localhost:9876").setRequestTimeout(Duration.ofSeconds(10)).setThreadPoolCoreSize(16) // 设置核心线程数.setThreadPoolMaximumSize(64) // 设置最大线程数.setThreadPoolKeepAliveTime(Duration.ofMinutes(1)) // 设置线程保活时间.build();
5.2 消费者最佳实践
-
消费过程幂等性
RocketMQ不保证消息的"Exactly Once"语义,可能出现重复消费,因此消费者需要实现幂等处理:
java
public class IdempotentConsumer {private final JdbcTemplate jdbcTemplate;public void consumeMessage(MessageView messageView) {String messageId = messageView.getMessageId().toString();String businessKey = messageView.getKeys();// 使用消息ID或业务键检查是否处理过if (isProcessed(messageId)) {logger.info("Message already processed, skipping. messageId: {}", messageId);return;}try {// 执行业务逻辑processBusinessLogic(messageView);// 标记消息为已处理markAsProcessed(messageId, businessKey);} catch (Exception e) {logger.error("Failed to process message: {}", messageId, e);throw e; // 抛出异常触发重试}}private boolean isProcessed(String messageId) {Integer count = jdbcTemplate.queryForObject("SELECT COUNT(1) FROM processed_messages WHERE message_id = ?",Integer.class, messageId);return count != null && count > 0;}private void markAsProcessed(String messageId, String businessKey) {jdbcTemplate.update("INSERT INTO processed_messages (message_id, business_key, process_time) VALUES (?, ?, ?)",messageId, businessKey, new Date());}private void processBusinessLogic(MessageView messageView) {// 实际业务逻辑} }
-
消费过程慢的处理方法
a. 增加消费并行度
通过增加消费线程数提高吞吐量:
java
// 5.x SDK设置消费线程数 SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("YourConsumerGroup").setAwaitDuration(Duration.ofSeconds(10)).setSubscriptionExpressions(Collections.singletonMap("TopicTest", "TagA")).setThreadCount(20) // 设置消费线程数.build();
b. 批量消费
一次获取多条消息进行处理,减少网络交互:
java
// 5.x SDK批量消费 SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("YourConsumerGroup").setAwaitDuration(Duration.ofSeconds(10)).setSubscriptionExpressions(Collections.singletonMap("TopicTest", "TagA")).setMaxMessageNum(32) // 设置批量消费最大消息数.build();// 批量消费处理 while (true) {List<MessageView> messages = consumer.receive(32, Duration.ofSeconds(10));if (messages.isEmpty()) {continue;}try {// 批量处理业务逻辑processBatchMessages(messages);// 批量确认消息consumer.ack(messages);} catch (Exception e) {// 处理失败,稍后重试logger.error("Failed to process batch messages", e);} }
c. 异步处理
将耗时操作异步处理,快速确认消息:
java
// 创建线程池处理耗时任务 ExecutorService executorService = Executors.newFixedThreadPool(20);// 消费消息 while (true) {List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(10));if (messages.isEmpty()) {continue;}for (MessageView message : messages) {// 先确认消息consumer.ack(message);// 异步处理业务逻辑executorService.submit(() -> {try {processBusinessLogic(message);} catch (Exception e) {logger.error("Failed to process message asynchronously", e);// 处理失败,可以发送到另一个主题或记录到DB中后续处理handleFailedMessage(message);}});} }
-
消费者负载均衡
RocketMQ提供两种负载均衡策略:基于消息的负载均衡和基于队列的负载均衡。
a. 基于队列的负载均衡(默认)
java
// 5.x SDK默认使用基于队列的负载均衡 SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("YourConsumerGroup").setSubscriptionExpressions(Collections.singletonMap("TopicTest", "TagA")).setLoadBalanceStrategy(LoadBalanceStrategy.QUEUE_BASED) // 显式设置为基于队列的负载均衡.build();
b. 基于消息的负载均衡
java
// 5.x SDK设置基于消息的负载均衡 SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("YourConsumerGroup").setSubscriptionExpressions(Collections.singletonMap("TopicTest", "TagA")).setLoadBalanceStrategy(LoadBalanceStrategy.MESSAGE_BASED) // 设置为基于消息的负载均衡.build();
5.3 事务消息实践
RocketMQ的事务消息可以解决分布式事务问题,确保本地事务和消息发送的原子性:
java
// 创建事务生产者
TransactionChecker checker = new MyTransactionChecker();
TransactionProducer producer = Provider.newTransactionProducerBuilder().setClientConfiguration(clientConfiguration).setChecker(checker).build();// 发送事务消息
Message message = messageBuilder.setTopic("TransactionTopic").setTag("TagA").setKeys("OrderId12345").setBody("Transaction Message".getBytes()).build();try {// 发送事务消息并执行本地事务TransactionResolution resolution = producer.sendMessageInTransaction(message, null).thenApply(receipt -> {try {// 执行本地事务boolean success = executeLocalTransaction(receipt, message);return success ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;} catch (Exception e) {logger.error("Local transaction execution failed", e);return TransactionResolution.ROLLBACK;}}).get();logger.info("Transaction completed with resolution: {}", resolution);
} catch (Exception e) {logger.error("Failed to send transaction message", e);
}// 事务状态检查器
class MyTransactionChecker implements TransactionChecker {@Overridepublic TransactionResolution check(MessageView messageView) {String transactionId = messageView.getMessageId().toString();String orderKey = messageView.getKeys();// 查询本地事务状态TransactionStatus status = queryTransactionStatus(transactionId, orderKey);switch (status) {case COMMITTED:return TransactionResolution.COMMIT;case ROLLBACKED:return TransactionResolution.ROLLBACK;case UNKNOWN:default:// 如果无法确定状态,可以再次回查return TransactionResolution.UNKNOWN;}}private TransactionStatus queryTransactionStatus(String transactionId, String orderKey) {// 实际查询本地事务状态的逻辑return TransactionStatus.COMMITTED;}
}
5.4 顺序消息实践
RocketMQ支持分区顺序消息,确保同一分区内的消息按照发送顺序被消费:
java
// 生产者发送顺序消息
public void sendOrderedMessages() {// 订单ID作为分区键String orderId = "OrderId12345";// 计算分区IDint hashCode = orderId.hashCode();int queueId = Math.abs(hashCode) % 4; // 假设有4个队列// 创建订单消息List<Message> orderMessages = new ArrayList<>();// 1. 创建订单Message createOrder = messageBuilder.setTopic("OrderTopic").setTag("Create").setKeys(orderId).setMessageGroup(orderId) // 5.x SDK使用messageGroup指定分区键.setBody("Create Order".getBytes()).build();orderMessages.add(createOrder);// 2. 支付订单Message payOrder = messageBuilder.setTopic("OrderTopic").setTag("Pay").setKeys(orderId).setMessageGroup(orderId).setBody("Pay Order".getBytes()).build();orderMessages.add(payOrder);// 3. 发货Message shipOrder = messageBuilder.setTopic("OrderTopic").setTag("Ship").setKeys(orderId).setMessageGroup(orderId).setBody("Ship Order".getBytes()).build();orderMessages.add(shipOrder);// 4. 完成订单Message completeOrder = messageBuilder.setTopic("OrderTopic").setTag("Complete").setKeys(orderId).setMessageGroup(orderId).setBody("Complete Order".getBytes()).build();orderMessages.add(completeOrder);// 按顺序发送消息for (Message msg : orderMessages) {try {SendReceipt receipt = producer.send(msg);logger.info("Ordered message sent successfully, messageId: {}", receipt.getMessageId());} catch (Exception e) {logger.error("Failed to send ordered message", e);}}
}// 消费者处理顺序消息
public void consumeOrderedMessages() {// 创建消费者SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("OrderConsumerGroup").setSubscriptionExpressions(Collections.singletonMap("OrderTopic", "*")).build();// 消费消息while (true) {List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(10));if (messages.isEmpty()) {continue;}// 按订单ID分组处理Map<String, List<MessageView>> orderGroups = messages.stream().collect(Collectors.groupingBy(MessageView::getMessageGroup));for (Map.Entry<String, List<MessageView>> entry : orderGroups.entrySet()) {String orderId = entry.getKey();List<MessageView> orderMessages = entry.getValue();try {// 按顺序处理同一订单的消息processOrderMessages(orderId, orderMessages);// 确认消息consumer.ack(orderMessages);} catch (Exception e) {logger.error("Failed to process ordered messages for order: {}", orderId, e);// 处理失败,不确认消息,等待重试}}}
}private void processOrderMessages(String orderId, List<MessageView> messages) {// 确保消息按Tag顺序处理:Create -> Pay -> Ship -> Completefor (MessageView message : messages) {String tag = message.getTag();switch (tag) {case "Create":createOrder(orderId, message);break;case "Pay":payOrder(orderId, message);break;case "Ship":shipOrder(orderId, message);break;case "Complete":completeOrder(orderId, message);break;default:logger.warn("Unknown message tag: {}", tag);}}
}
5.5 延时消息实践
RocketMQ支持定时消息,可以在指定时间后投递:
java
// 发送延时消息
public void sendDelayedMessage() {// 创建延时消息,10秒后投递Message message = messageBuilder.setTopic("DelayTopic").setTag("Delay").setKeys("DelayedMessage123").setDeliveryTimestamp(System.currentTimeMillis() + 10000) // 10秒后投递.setBody("This is a delayed message".getBytes()).build();try {SendReceipt receipt = producer.send(message);logger.info("Delayed message sent successfully, messageId: {}, will be delivered at: {}", receipt.getMessageId(), new Date(System.currentTimeMillis() + 10000));} catch (Exception e) {logger.error("Failed to send delayed message", e);}
}// 消费延时消息
public void consumeDelayedMessages() {// 创建消费者SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("DelayConsumerGroup").setSubscriptionExpressions(Collections.singletonMap("DelayTopic", "Delay")).build();// 消费消息while (true) {List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(10));if (messages.isEmpty()) {continue;}for (MessageView message : messages) {try {// 处理延时消息processDelayedMessage(message);// 确认消息consumer.ack(message);logger.info("Delayed message consumed successfully, messageId: {}, deliveryTimestamp: {}", message.getMessageId(), new Date(message.getDeliveryTimestamp()));} catch (Exception e) {logger.error("Failed to process delayed message", e);// 处理失败,不确认消息,等待重试}}}
}
6. RocketMQ运维最佳实践
6.1 监控与告警
-
使用RocketMQ Dashboard
RocketMQ Dashboard是官方提供的可视化管理工具,可以监控集群状态、消息堆积、消费进度等:
bash
# 启动RocketMQ Dashboard java -jar rocketmq-dashboard-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876
-
使用Prometheus + Grafana监控
a. 启动RocketMQ Exporter:
bash
java -jar rocketmq-exporter-0.0.1-SNAPSHOT.jar --rocketmq.config.namesrvAddr=localhost:9876
b. Prometheus配置:
yaml
scrape_configs:- job_name: 'rocketmq'static_configs:- targets: ['localhost:5557']
c. 导入Grafana Dashboard模板
-
关键监控指标
- 消息TPS(生产和消费)
- 消息堆积量
- 消息延迟
- 磁盘使用率
- GC情况
- 系统资源使用率(CPU、内存、网络、磁盘IO)
-
告警设置
设置以下关键指标的告警阈值:
- 消息堆积超过阈值(如10000条)
- 消费延迟超过阈值(如60秒)
- 磁盘使用率超过阈值(如85%)
- 生产或消费TPS异常下降
- Broker状态变更(如主从切换)
6.2 日志管理
-
日志配置
修改
conf/logback.xml
配置文件,优化日志输出:xml
<appender name="DefaultAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${user.home}/logs/rocketmqlogs/broker.log</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${user.home}/logs/rocketmqlogs/broker.%d{yyyy-MM-dd}.log</fileNamePattern><maxHistory>7</maxHistory></rollingPolicy><encoder><pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern><charset class="java.nio.charset.Charset">UTF-8</charset></encoder> </appender>
-
日志分析工具
使用ELK(Elasticsearch, Logstash, Kibana)或Graylog收集和分析日志:
bash
# Logstash配置示例 input {file {path => "/home/rocketmq/logs/rocketmqlogs/*.log"type => "rocketmq"} }filter {if [type] == "rocketmq" {grok {match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:thread} - %{GREEDYDATA:content}" }}date {match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,ZZZ" ]target => "@timestamp"}} }output {elasticsearch {hosts => ["localhost:9200"]index => "rocketmq-%{+YYYY.MM.dd}"} }
6.3 容量规划
-
磁盘容量规划
磁盘容量 = 每日消息量 × 平均消息大小 × 消息保留天数 × (1 + 副本数) × 安全系数
示例计算:
- 每日消息量:1亿条
- 平均消息大小:1KB
- 消息保留天数:3天
- 副本数:1(1主1从)
- 安全系数:1.5
磁盘容量 = 1亿 × 1KB × 3天 × (1 + 1) × 1.5 = 900GB
-
Topic和Queue规划
- Topic数量:建议控制在100以内
- 单个Topic的Queue数量:建议4-8个,与消费者数量相匹配
- 总Queue数量:建议控制在1000以内
-
集群规模规划
- 小规模:2-4台服务器,2主2从
- 中等规模:4-8台服务器,4主4从
- 大规模:8台以上服务器,多主多从
6.4 常见问题处理
-
消息堆积问题
a. 临时提高消费并行度:
java
// 增加消费线程数 consumer.setConsumeThreadMax(50);
b. 临时扩容消费者实例
c. 跳过非重要消息:
bash
# 使用mqadmin工具重置消费位点 sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroupName -t TopicName -s now
-
磁盘空间不足
a. 临时调整消息保留时间:
bash
# 修改broker配置 fileReservedTime=24
b. 清理CommitLog:
bash
# 手动触发清理 sh bin/mqadmin cleanExpiredCQ -n localhost:9876 -b BrokerName
-
Broker宕机处理
a. 主从自动切换(需要配置DLedger)
b. 手动切换:
bash
# 停止宕机的Broker sh bin/mqshutdown broker# 将Slave提升为Master # 修改配置文件中的brokerId=0,然后启动 sh bin/mqbroker -c conf/broker.conf
-
消息发送失败处理
a. 检查网络连接
b. 检查Topic权限
c. 检查磁盘空间
d. 检查Broker状态
7. RocketMQ应用场景最佳实践
7.1 异步解耦
使用RocketMQ实现系统间的异步解耦,提高系统的可扩展性和可维护性:
java
// 订单系统:下单后发送消息
public void createOrder(Order order) {// 1. 保存订单到数据库orderRepository.save(order);// 2. 发送订单创建消息Message message = messageBuilder.setTopic("OrderTopic").setTag("Created").setKeys(order.getOrderId()).setBody(JSON.toJSONString(order).getBytes()).build();try {SendReceipt receipt = producer.send(message);logger.info("Order created message sent successfully, orderId: {}, messageId: {}", order.getOrderId(), receipt.getMessageId());} catch (Exception e) {logger.error("Failed to send order created message, orderId: {}", order.getOrderId(), e);// 可以将消息存储到本地,定时重试}
}// 库存系统:消费订单消息,扣减库存
public void consumeOrderMessages() {SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("InventoryConsumerGroup").setSubscriptionExpressions(Collections.singletonMap("OrderTopic", "Created")).build();while (true) {List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(10));if (messages.isEmpty()) {continue;}for (MessageView message : messages) {try {// 解析订单信息String orderJson = new String(message.getBody());Order order = JSON.parseObject(orderJson, Order.class);// 扣减库存boolean success = inventoryService.deductInventory(order);if (success) {// 确认消息consumer.ack(message);logger.info("Inventory deducted for order: {}", order.getOrderId());} else {// 库存不足,稍后重试logger.warn("Insufficient inventory for order: {}, will retry later", order.getOrderId());}} catch (Exception e) {logger.error("Failed to process order message", e);// 处理失败,不确认消息,等待重试}}}
}
7.2 削峰填谷
使用RocketMQ缓冲突发流量,保护后端系统:
java
// 接收用户请求,发送到消息队列
@RestController
public class OrderController {@Autowiredprivate Producer producer;@PostMapping("/orders")public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {// 生成订单IDString orderId = UUID.randomUUID().toString();// 构建消息Message message = messageBuilder.setTopic("OrderTopic").setTag("Created").setKeys(orderId).setBody(JSON.toJSONString(request).getBytes()).build();try {// 发送消息到队列SendReceipt receipt = producer.send(message);// 返回订单ID给用户return ResponseEntity.ok(orderId);} catch (Exception e) {logger.error("Failed to send order message", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Order creation failed");}}
}// 后台服务:按照处理能力消费消息
@Service
public class OrderProcessingService {@Autowiredprivate OrderRepository orderRepository;@PostConstructpublic void init() {// 创建消费者SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("OrderProcessingGroup").setSubscriptionExpressions(Collections.singletonMap("OrderTopic", "Created")).setThreadCount(10) // 控制并发处理数量.build();// 启动消费线程new Thread(() -> {while (true) {try {// 批量获取消息,控制消费速率List<MessageView> messages = consumer.receive(20, Duration.ofSeconds(5));if (!messages.isEmpty()) {// 批量处理订单processOrders(messages);// 确认消息consumer.ack(messages);}} catch (Exception e) {logger.error("Error processing orders", e);}}}).start();}private void processOrders(List<MessageView> messages) {for (MessageView message : messages) {try {// 解析订单请求String orderJson = new String(message.getBody());OrderRequest request = JSON.parseObject(orderJson, OrderRequest.class);// 创建订单Order order = new Order();order.setOrderId(message.getKeys());order.setUserId(request.getUserId());order.setProductId(request.getProductId());order.setQuantity(request.getQuantity());order.setAmount(request.getAmount());order.setStatus("CREATED");// 保存订单orderRepository.save(order);logger.info("Order processed successfully: {}", order.getOrderId());} catch (Exception e) {logger.error("Failed to process order: {}", message.getKeys(), e);}}}
}
7.3 分布式事务
使用RocketMQ事务消息实现分布式事务,确保跨系统操作的一致性:
java
// 订单服务:创建订单并发送事务消息
@Service
public class OrderTransactionService {@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate TransactionProducer producer;@Transactionalpublic String createOrder(OrderRequest request) {// 生成订单IDString orderId = UUID.randomUUID().toString();// 构建事务消息Message message = messageBuilder.setTopic("OrderTransactionTopic").setTag("Created").setKeys(orderId).setBody(JSON.toJSONString(request).getBytes()).build();try {// 发送事务消息TransactionResolution resolution = producer.sendMessageInTransaction(message, orderId).thenApply(receipt -> {try {// 执行本地事务:创建订单Order order = new Order();order.setOrderId(orderId);order.setUserId(request.getUserId());order.setProductId(request.getProductId());order.setQuantity(request.getQuantity());order.setAmount(request.getAmount());order.setStatus("PENDING");order.setTransactionId(receipt.getMessageId().toString());// 保存订单orderRepository.save(order);// 记录事务状态transactionRepository.saveTransactionStatus(receipt.getMessageId().toString(), orderId, TransactionStatus.COMMITTED);return TransactionResolution.COMMIT;} catch (Exception e) {logger.error("Local transaction execution failed", e);// 记录事务状态transactionRepository.saveTransactionStatus(receipt.getMessageId().toString(), orderId, TransactionStatus.ROLLBACKED);return TransactionResolution.ROLLBACK;}}).get();if (resolution == TransactionResolution.COMMIT) {return orderId;} else {throw new RuntimeException("Order creation failed, transaction rolled back");}} catch (Exception e) {logger.error("Failed to create order with transaction", e);throw new RuntimeException("Order creation failed", e);}}// 事务状态检查器@Componentpublic class OrderTransactionChecker implements TransactionChecker {@Autowiredprivate TransactionRepository transactionRepository;@Overridepublic TransactionResolution check(MessageView messageView) {String transactionId = messageView.getMessageId().toString();String orderId = messageView.getKeys();// 查询本地事务状态TransactionStatus status = transactionRepository.getTransactionStatus(transactionId, orderId);switch (status) {case COMMITTED:return TransactionResolution.COMMIT;case ROLLBACKED:return TransactionResolution.ROLLBACK;case UNKNOWN:default:// 如果无法确定状态,可以查询订单表Optional<Order> orderOpt = orderRepository.findById(orderId);if (orderOpt.isPresent()) {return TransactionResolution.COMMIT;}// 如果订单不存在,可能是事务失败或尚未执行// 这里可以根据业务逻辑决定是回滚还是继续等待return TransactionResolution.UNKNOWN;}}}
}// 库存服务:消费事务消息,扣减库存
@Service
public class InventoryTransactionService {@Autowiredprivate InventoryRepository inventoryRepository;@PostConstructpublic void init() {// 创建消费者SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("InventoryTransactionGroup").setSubscriptionExpressions(Collections.singletonMap("OrderTransactionTopic", "Created")).build();// 启动消费线程new Thread(() -> {while (true) {try {List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(10));for (MessageView message : messages) {try {// 解析订单信息String orderJson = new String(message.getBody());OrderRequest request = JSON.parseObject(orderJson, OrderRequest.class);// 在事务中扣减库存deductInventoryInTransaction(message.getKeys(), request);// 确认消息consumer.ack(message);} catch (Exception e) {logger.error("Failed to process inventory transaction", e);// 处理失败,不确认消息,等待重试}}} catch (Exception e) {logger.error("Error receiving messages", e);}}}).start();}@Transactionalpublic void deductInventoryInTransaction(String orderId, OrderRequest request) {// 查询库存Inventory inventory = inventoryRepository.findByProductId(request.getProductId());if (inventory == null || inventory.getQuantity() < request.getQuantity()) {throw new RuntimeException("Insufficient inventory for product: " + request.getProductId());}// 扣减库存inventory.setQuantity(inventory.getQuantity() - request.getQuantity());inventoryRepository.save(inventory);// 记录库存变动InventoryLog log = new InventoryLog();log.setOrderId(orderId);log.setProductId(request.getProductId());log.setQuantity(-request.getQuantity());log.setOperationType("DEDUCT");inventoryLogRepository.save(log);logger.info("Inventory deducted for order: {}, product: {}, quantity: {}", orderId, request.getProductId(), request.getQuantity());}
}
7.4 日志收集
使用RocketMQ作为日志收集管道,实现高吞吐、可靠的日志传输:
java
// 应用服务:发送日志
@Component
public class LogProducer {@Autowiredprivate Producer producer;private static final String LOG_TOPIC = "LogTopic";public void sendLog(LogEntry logEntry) {// 构建日志消息Message message = messageBuilder.setTopic(LOG_TOPIC).setTag(logEntry.getLevel()).setKeys(logEntry.getTraceId()).setBody(JSON.toJSONString(logEntry).getBytes()).build();try {// 异步发送日志,不阻塞业务流程producer.sendAsync(message).thenAccept(receipt -> {if (logger.isDebugEnabled()) {logger.debug("Log sent successfully, traceId: {}", logEntry.getTraceId());}}).exceptionally(e -> {logger.warn("Failed to send log, traceId: {}", logEntry.getTraceId(), e);return null;});} catch (Exception e) {logger.warn("Failed to send log, traceId: {}", logEntry.getTraceId(), e);}}
}// 日志服务:消费日志并存储
@Service
public class LogConsumerService {@Autowiredprivate ElasticsearchClient esClient;@PostConstructpublic void init() {// 创建消费者SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("LogConsumerGroup").setSubscriptionExpressions(Collections.singletonMap("LogTopic", "*")).setMaxMessageNum(32) // 批量消费.setThreadCount(20) // 多线程处理.build();// 启动消费线程new Thread(() -> {while (true) {try {// 批量获取日志List<MessageView> messages = consumer.receive(32, Duration.ofSeconds(5));if (!messages.isEmpty()) {// 批量处理日志processLogs(messages);// 确认消息consumer.ack(messages);}} catch (Exception e) {logger.error("Error processing logs", e);}}}).start();}private void processLogs(List<MessageView> messages) {// 批量构建ES索引请求BulkRequest.Builder bulkRequest = new BulkRequest.Builder();for (MessageView message : messages) {try {// 解析日志String logJson = new String(message.getBody());LogEntry logEntry = JSON.parseObject(logJson, LogEntry.class);// 添加到批量请求bulkRequest.operations(op -> op.index(idx -> idx.index("logs-" + LocalDate.now().format(DateTimeFormatter.ISO_DATE)).id(logEntry.getTraceId()).document(logEntry)));} catch (Exception e) {logger.error("Failed to process log message", e);}}try {// 执行批量索引BulkResponse response = esClient.bulk(bulkRequest.build());if (response.errors()) {logger.warn("Some logs failed to index: {}", response.items().stream().filter(item -> item.error() != null).map(item -> item.id()).collect(Collectors.joining(", ")));}} catch (Exception e) {logger.error("Failed to index logs to Elasticsearch", e);}}
}
7.5 分布式限流
使用RocketMQ实现分布式限流,控制系统访问速率:
java
// 限流服务
@Service
public class RateLimiterService {@Autowiredprivate Producer producer;private static final String RATE_LIMIT_TOPIC = "RateLimitTopic";// 令牌桶限流器private final ConcurrentHashMap<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();// 初始化限流器public RateLimiter getRateLimiter(String resource, double permitsPerSecond) {return rateLimiters.computeIfAbsent(resource, k -> RateLimiter.create(permitsPerSecond));}// 尝试获取令牌public boolean tryAcquire(String resource, double permitsPerSecond) {RateLimiter limiter = getRateLimiter(resource, permitsPerSecond);return limiter.tryAcquire();}// 分布式限流:将限流请求发送到消息队列public boolean distributedTryAcquire(String resource, String key, double permitsPerSecond) {// 先尝试本地限流if (!tryAcquire(resource, permitsPerSecond)) {return false;}// 构建限流消息RateLimitRequest request = new RateLimitRequest();request.setResource(resource);request.setKey(key);request.setTimestamp(System.currentTimeMillis());Message message = messageBuilder.setTopic(RATE_LIMIT_TOPIC).setTag(resource).setKeys(key).setBody(JSON.toJSONString(request).getBytes()).build();try {// 发送限流消息SendReceipt receipt = producer.send(message);return true;} catch (Exception e) {logger.error("Failed to send rate limit message", e);return false;}}// 启动限流消费者@PostConstructpublic void init() {// 创建消费者SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup("RateLimitConsumerGroup").setSubscriptionExpressions(Collections.singletonMap(RATE_LIMIT_TOPIC, "*")).build();// 启动消费线程new Thread(() -> {while (true) {try {List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(1));for (MessageView message : messages) {try {// 解析限流请求String requestJson = new String(message.getBody());RateLimitRequest request = JSON.parseObject(requestJson, RateLimitRequest.class);// 记录限流信息到RedisrecordRateLimit(request);// 确认消息consumer.ack(message);} catch (Exception e) {logger.error("Failed to process rate limit message", e);}}} catch (Exception e) {logger.error("Error receiving rate limit messages", e);}}}).start();}// 记录限流信息到Redisprivate void recordRateLimit(RateLimitRequest request) {String key = "rate_limit:" + request.getResource() + ":" + request.getKey();String countKey = key + ":count";String timestampKey = key + ":timestamp";// 使用Redis记录访问次数和时间戳redisTemplate.opsForValue().increment(countKey);redisTemplate.opsForValue().set(timestampKey, String.valueOf(request.getTimestamp()));redisTemplate.expire(countKey, 1, TimeUnit.MINUTES);redisTemplate.expire(timestampKey, 1, TimeUnit.MINUTES);}
}// 在API网关或服务入口使用限流服务
@RestController
public class ApiGatewayController {@Autowiredprivate RateLimiterService rateLimiterService;@GetMapping("/api/{resource}")public ResponseEntity<?> accessApi(@PathVariable String resource, @RequestParam String userId,HttpServletRequest request) {// 对特定资源进行限流if (!rateLimiterService.distributedTryAcquire(resource, userId, 10.0)) {return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body("Rate limit exceeded for resource: " + resource);}// 继续处理请求return ResponseEntity.ok("Access granted to resource: " + resource);}
}
8. 总结
RocketMQ作为一款高性能、高可靠性的分布式消息和流平台,具有以下核心优势:
-
丰富的消息类型:支持普通消息、顺序消息、延时消息、事务消息等多种消息类型,满足不同业务场景需求。
-
高可靠性:提供多种高可用部署模式,支持同步双写和异步复制,确保消息不丢失。
-
高性能:单机支持十万级TPS,低延迟,适合高吞吐量场景。
-
良好的扩展性:支持水平扩展,动态增加节点,适应业务增长。
-
运维友好:提供丰富的监控和管理工具,简化运维工作。
在选择消息中间件时,需要根据业务场景、性能需求、可靠性要求等因素综合考虑。RocketMQ特别适合对消息可靠性要求高、业务复杂度高的场景,如金融交易、电商订单等核心业务系统。
通过本文介绍的最佳实践,希望能帮助读者更好地理解和使用RocketMQ,充分发挥其在分布式系统中的价值。
相关文章:
RocketMQ核心特性与最佳实践
目录 1. 引言 2. RocketMQ核心特性 2.1 架构演进 2.2 核心组件 2.3 消息模型 2.4 高级特性 3. RocketMQ与其他MQ产品选型对比 3.1 功能特性对比 3.2 适用场景对比 3.3 选型建议 4. RocketMQ部署最佳实践 4.1 部署模式选择 4.2 硬件配置建议 4.3 操作系统优化 4.4…...
springboot配置redis lettuce连接池,以及连接池参数解释
文章目录 前置基本配置参数解释 前置 javaspringbootredislettuce 连接池 有很多连接池,比如 jedis,lettuce,redission,springboot 默认使用 lettuce 连接池 lettuce 连接池的特点是:一个 lettuce 连接可以被多个线…...

基于aspnet,微信小程序,mysql数据库,在线微信小程序汽车故障预约系统
详细视频:【基于aspnet,微信小程序,mysql数据库,在线微信小程序汽车故障预约系统。-哔哩哔哩】 https://b23.tv/zfqLWPV...

如何使用AI搭建WordPress网站
人工智能正迅速成为包括网页设计在内的许多行业在其功能设置中添加的一种工具。在数字设计和营销领域,许多成熟的工具都在其产品中添加了人工智能功能。WordPress 也是如此。作为目前最流行的网站建设工具之一,WordPress 的人工智能插件越来越多也就不足…...
打破双亲委派模型的实践:JDBC与Tomcat的深度解析
一、JDBC如何打破双亲委派模型 1. JDBC SPI机制的核心矛盾 Java数据库连接(JDBC)是打破双亲委派模型的经典案例,其根本原因在于基础类库需要加载实现类的矛盾: 核心接口位置:java.sql.Driver等接口位于rt.jar中,由启动类加载器…...
《打破枷锁:Python多线程GIL困境突围指南》
GIL,这个Python解释器层面的独特机制,虽在一定程度上守护了内存管理的秩序,却也成为了多线程并行的紧箍咒,限制了Python在多核处理器上的性能发挥。今天,让我们深入剖析GIL的本质,探寻突破这一枷锁的有效策…...

Java并发编程:全面解析锁策略、CAS与synchronized优化机制
一、六种锁策略场景化解析 1. 乐观锁 vs 悲观锁:图书馆借书的两种策略 核心差异:对资源是否会被抢占的预期不同。 乐观锁(假设冲突概率低) → 行为:直接去书架上拿书(围绕加锁要做的工作更少)…...

2025第三届黄河流域网络安全技能挑战赛--Crypto--WriteUp
2025第三届黄河流域网络安全技能挑战赛–Crypto–WriteUp Crypto sandwitch task from Crypto.Util.number import * import gmpy2 flag bflag{fake_flag} assert len(flag) 39 p getPrime(512) q getPrime(512) n p * q e 0x3 pad1 beasy_problem pad2 bHow_to_so…...

[爬虫知识] IP代理
相关实战案例:[爬虫实战] 代理爬取:小白也能看懂怎么用代理 相关爬虫专栏:JS逆向爬虫实战 爬虫知识点合集 爬虫实战案例 引言:爬虫与IP封锁的攻防战 对网络爬虫而言,遇到的一个较棘手的问题就是封IP:请…...

6个月Python学习计划 Day 1 - Python 基础入门 开发环境搭建
6个月Python学习计划:从入门到AI实战(前端开发者进阶指南) 🎯 今日目标 理解 Python 的背景和用途安装 Python 开发环境熟悉基本语法:变量、数据类型、打印输出动手编写第一个 Python 程序 🧠 学习内容详…...

GraphPad Prism工作表的基本操作
《2025新书现货 GraphPad Prism图表可视化与统计数据分析(视频教学版)雍杨 康巧昆 清华大学出版社教材书籍 9787302686460 GraphPadPrism图表可视化 无规格》【摘要 书评 试读】- 京东图书 GraphPad Prism中包含5种工作表,每种工作表的基本操…...
Maven插件之docker-maven-plugin
介绍 在持续集成过程中,项目工程一般使用 Maven 编译打包,然后生成镜像,通过镜像上线,能够大大提供上线效率,同时能够快速动态扩容,快速回滚,着实很方便。docker-maven-plugin 插件就是为了实现…...

成年后还能学习多少知识,由大脑的这个数量决定
撰文|Anne Trafton 编译|郑添惺 审校|clefable 麻省理工学院(MIT)的一些神经科学家发现,成年的大脑中含有数百万个“静默突触”(silent synapses)。它们是神经元之间未成熟的神经突…...
Flask 会话管理:从原理到实战,深度解析 session 机制
1、Flask中session 的实现原理:服务器与客户端的协作 HTTP 协议是无状态的——服务器无法区分两次请求是否来自同一用户。这意味着,用户登录后跳转到其他页面时,服务器会“忘记”用户身份。 为解决这一问题,Web 开发中引入了会话…...

MySQL连接错误解决方案:Can‘t connect to MySQL server on ‘localhost‘ (10038)
错误描述 当您尝试连接MySQL数据库时,可能会遇到以下错误提示: 这个错误表明客户端无法连接到本地MySQL服务器。 可能的原因 MySQL服务未启动 MySQL配置问题 防火墙或安全软件阻止连接 端口被占用或未正确配置 网络连接问题 解决方案 方法一&am…...
【跨端框架检测】使用adb logcat检测Android APP使用的跨端框架方法总结
目录 Weex 跨端框架使用了uni-app的情况区分使用了uni-app还是Weex 判断使用了Xamarin判断使用了KMM框架判断使用了 Ionic 框架判断使用了Cordova框架判断使用了Capacitor 框架使用了React Native框架使用了QT框架使用了Cocos框架使用了Electron 框架使用了flutter 框架使用…...
lua脚本实战—— Redis并发原子性陷阱
需求分析 对于内容类网站,比如用户浏览题目的答案,需要先登录才能追溯,那么可以统计用户访问频率来限制数据的爬取。 可采用分级反爬虫策略,先告警、再采取强制措施: 如果每分钟超过 10 道题,给管理员发…...
【MySQL】第10节|MySQL全局优化与Mysql 8.0新增特性详解
全局优化 mysql server参数 1. max_connections(最大连接数) 含义:MySQL 服务允许的最大并发连接数(包括正在使用和空闲的连接)。超过此限制时,新连接会被拒绝(报错 Too many connections&am…...

CSS相关知识
1.清除浮动的方法 2.定位 静态定位相当于标准流 相对定位不脱离文档流,仍然占据原来的位置(最频繁的作用是给绝对定位当爹) 绝对定位脱离文档标准流,不再占有原来位置 3.BFC 1. 解决浮动元素导致的父容器高度塌陷 2. 阻止相邻元…...

AI扫描王APP:高效便捷的手机扫描工具,让生活更智能
AI扫描王APP是一款功能强大的手机扫描软件,专为追求高效、便捷的用户设计。它不仅支持文字提取和扫描翻译,还能进行测量,满足用户在不同场景下的需求。无论是办公、学习还是日常使用,AI扫描王都能帮助你快速完成任务,节…...

《仿盒马》app开发技术分享-- 原生地图展示(端云一体)
开发准备 上一节我们实现了获取当前用户的位置,并且成功的拿到了经纬度,这一节我们就要根据拿到的经纬度,结合我们其他的知识点来实现地图的展示。 功能分析 地图的展示,我们需要在管理中心先给我们对应的应用开启地图api功能&…...
Linux 操作文本文件列数据的常用命令
文章目录 Linux 操作文本文件列数据的常用命令基本列处理命令高级列处理列数据转换和排序列数据统计和分析 Linux 操作文本文件列数据的常用命令 Linux 提供了多种强大的命令来处理文本文件中的列数据,以下是一些最常用的命令和工具: 基本列处理命令 c…...

IP、子网掩码、默认网关、DNS
IP、子网掩码、默认网关、DNS 1. 概述1.1 windows配置处 2.IP 地址(Internet Protocol Address)2.1 公网ip2.2 内网ip2.3 🌐 公网 IP 与内网 IP 的关系(NAT) 3. 子网掩码(Subnet Mask)4. 默认网…...

华为OD机试真题——字符串加密 (2025B卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
2025 B卷 100分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…...

角度回归——八参数检测四边形Gliding Vertex
文章目录 一、介绍(一)五参数检测方法( 基于角度)(二)八参数检测方法(point-based)的边界 二、方案分析(一)问题定义(二)方案…...
JVM 高质量面试题
📌 文章目录 一、JVM 内存结构与运行时模型1. JVM 内存结构分区及作用2. 栈帧结构及方法调用链维护3. 逃逸分析及其对对象分配策略的影响4. TLAB 的作用及提升对象创建效率的机制 二、垃圾回收器与 GC 调优1. CMS 与 G1 垃圾收集器的设计区别及适用场景2. Full GC 频…...

AI助力,制作视频裁剪软件
1. 视频裁剪软件套路多 最近再做一些测试,经常需要录屏什么的,有时候录制的时长视频,需要裁剪,比如去掉开头一些帧或者结尾的一些帧,就想保留关键点。但是网上下的一些软件,打开一用都是要付费的。所以想着…...
SQL注入基础
普通sql注入:后台能提供有价值的错误信息,显示在页面 手动注入 1. 寻找sql注入点 get注入 ?idxx url后加测试是否存在注入漏洞,报错则存在 post注入 把参数封装…...
使用 A2A Python SDK 实现 CurrencyAgent
谷歌官方的a2a-python SDK最近频繁的更新,我们的教程也需要跟着更新,这篇文章,我们通过 a2a-python sdk的 0.2.3 版本,实现一个简单的CurrencyAgent。 https://a2aprotocol.ai/blog/a2a-sdk-currency-agent-tutorial-zh 目录 源码准备详细过程 创建项目创建虚拟环境添加依…...
qt浏览文件支持惯性
#include <QApplication> #include <QListWidget> #include <QScroller> #include <QScrollerProperties>int main(int argc, char *argv[]) {QApplication app(argc, argv);// 创建列表控件并添加示例项QListWidget listWidget;for (int i 0; i <…...