当前位置: 首页 > news >正文

【开发】中间件——RocketMQ

分布式消息系统

  • RocketMQ概念,用途,特性
  • 安装RocketMQ
  • 掌握RocketMQ的api使用
  • 对producer、consumer进行详解
  • 了解RocketMQ的存储特点

  • 简介及相关概念
  • JavaAPI
  • SpringBoot整合RocketMQ
  • 消息的顺序收发
  • 消息系统的事务、存储、重试策略
  • 消息系统的集群

RocketMQ

RocketMQ简介

采用java开发的分布式消息系统,由阿里开发

地址:http://rocketmq.apache.org/

在这里插入图片描述

历史发展

  • 阿里中间件,Notify用于交易核心信息的流转
  • 2010年,B2B开始大规模使用ActiveMQ作为消息内核,急需支持顺序消息拥有海量消息堆积能力消息中间件——MetaQ 1.0 2011诞生
  • 2012 年MetaQ发展到了3.0版本,抽象除了通用的消息引擎RorcketMQ
  • 2015年,RocketMQ进过双十一,在可用性,可靠性和稳定性等方面都有出色表现。阿里消息中间件基于RocketMQ退出Aliware MQ1.0,开始为阿里云上的企业提供消息服务
  • 2016年,RocketMQ进入Apache孵化

在这里插入图片描述

概念

在这里插入图片描述

  1. Producer

    • 消息生产者:生产消息,一般由业务系统负责产生消息
    • Producer Group:一类Producer的集合名称,这类Producer通常发送同一类消息,且发送逻辑一致
  2. Consumer

    • 消费者:负责消费消息,一般由后台系统负责异步消费

    • 分类:

      • Push Consumer:消费端被动接收由服务端Push的消息
      • Pull Consumer:消费端主动向服务端定时拉取消息
    • Consmer Group:一类Consumer的集合名称,这类Producer通常发送同一类消息,且发送逻辑一致

  3. Broker

    • RocketMQ的核心消息的发送、接收、高可用等
    • 需要定时发送自身情况到NameServer,默认10s发送一次,超过2分钟会认为该broker失效
  4. NameServer

    • 集群中的组织协调
    • 收集broker的工作情况
    • 不负责消息的处理
  5. Topic【逻辑概念】

    • 不同类型的消息以不同的Topic名称进行区分,如User、Order等
    • Message Queue
      • 消息队列,用于存储消息

下载部署

非docker

下载地址:https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip

cd /opt
unzip rocketmq-all-4.3.2-bin-release.zip
cd rocketmq-all-4.3.2-bin-release/# 启动nameserver
bin/mqnamesrv
#The Name Server boot success. serializeType=JSON
# 看到这个说明nameserver启动成功#启动broker
bin/mqbroker -n 8.140.130.91:9876 #-n指定nameserver地址和端口
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)

启动错误,因为RocketMQ的配置默认是生产环境的配置,设置jvm的内存值比较大,需要调整默认值

#调整默认的内存大小参数
cd bin/
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"#重新启动测试
bin/mqbroker -n 8.140.130.91:9876
The broker[iZ2zeg4pktzjhp9h7wt6doZ, 172.17.0.1:10911] boot success. serializeType=JSON and name server is 8.140.130.91:9876#启动成功

发送消息测试:

export NAMESRV_ADDR=127.0.0.1:9876
cd /opt/rocketmq-all-4.3.2-bin-release/bin
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息测试:

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

java api测试

依赖

<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins>
</build>

测试代码

package com.rocketmq;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("test-group");//specify name server addressproducer.setNamesrvAddr("8.140.130.91:9876");//Lanuch the instanceproducer.start();for (int i = 0; i < 100; i++) {//create message instance ,specify topic,tag and message bodyMessage msg = new Message("TopicTest1",/*topic*/"TAGA",/*tag*/("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)/*message body*/);//Call send message to deliver message to one of brokers.SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

发现报错

在这里插入图片描述

原因:

broker的ip地址是172.17.0.1,为私有ip,所以不可访问

在这里插入图片描述

解决:修改broker配置文件,指定broker 的ip地址

cd /opt/rocketmq-all-4.3.2-bin-release/conf
vim broker.confbrokerIP1=8.140.130.91
namesrvAddr=8.140.130.91:9876
brokerName=broker_haoke_im#启动broker,通过 -c 指定配置文件
cd /opt/rocketmq-all-4.3.2-bin-release/
bin/mqbroker -c /opt/rocketmq-all-4.3.2-bin-release/conf/broker.conf

在这里插入图片描述

API测试成功

在这里插入图片描述

通过docker部署

#拉取镜像
docker pull foxiswho/rocketmq:server-4.3.2
docker pull foxiswho/rocketmq:broker-4.3.2#创建nameserver容器
docker create -p 9876:9876 --name rmqserver \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq-data/rmqserver/logs:/opt/logs \
-v /data/rmq-data/rmqserver/store:/opt/store \
foxiswho/rocketmq:server-4.3.2#创建broker容器
#10911 生产者,消费者端口
#10909 搭建集群主从端口
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker/logs:/opt/logs \
-v /data/rmq-data/rmqbroker/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2#启动容器
docker start rmqserver rmqbroker#停止删除容器
docker stop rmqbroker rmqserver
docker rm rmqbroker rmqserver

broker配置文件

#broker名
brokerName=broker_haoke_im
#broker IP
brokerIP1=8.140.130.91
#当前broker托管的NameServer地址
namesrvAddr=8.140.130.91:9876
#开启自定义属性支持
enablePropertyFilter=true

在这里插入图片描述
在这里插入图片描述

部署RocketMQ的管理工具

UI管理工具,rocketmq-console,项目地址https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

#拉取镜像
docker pull apacherocketmq/rocketmq-console:2.0.0#创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=8.140.130.91:9876 -Drocketmq.config.isVIPChannel=false" -p 8082:8080 -t apacherocketmq/rocketmq-console:2.0.0

访问:http://8.140.130.91:8082/

在这里插入图片描述
在这里插入图片描述

Java API基本使用

创建topic

package com.rocketmq;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class TopicDemo {public static void main(String[] args) throws Exception{//设置NameServer地址DefaultMQProducer producer = new DefaultMQProducer("test-group");//设置producer 的NameServerAddressproducer.setNamesrvAddr("8.140.130.91:9876");//启动NameServerproducer.start();/** 创建topic* @param key broker name* @param newTopic topic name* @param queueNum topic's queue number* */producer.createTopic("broker_haoke_im","test_topic",8);System.out.println("topic创建成功");producer.shutdown();}
}

在这里插入图片描述

发送消息

消息的属性

字段名默认 值说明
Topicnull必填,线下环境不需要申请,线上环境需要申请后才能使用
Bodynull必填,二进制形式,序列化由应用决定,Producer 与 Consumer 要协商好 序列化形式。
Tagsnull选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只 支持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概 念
Keysnull选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置 后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引, 请尽可能保证 key 唯一,例如订单号,商品 Id 等。
Flag0选填,完全由应用来设置,RocketMQ 不做干预
DelayTimeLevel0选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
WaitStoreMsgOKTRUE选填,表示消息是否在服务器落盘后才返回应答。

同步

package com.rocketmq.message;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();String msgStr = "测试消息1";/** String topic, String tags, byte[] body* */Message message = new Message("test_topic","test",msgStr.getBytes("UTF-8"));SendResult result = producer.send(message);System.out.println(result);System.out.println("消息状态:" + result.getSendStatus());System.out.println("消息id:" + result.getMsgId());System.out.println("消息queue:" + result.getMessageQueue());System.out.println("消息offset:" + result.getQueueOffset());producer.shutdown();}
}

在这里插入图片描述

异步

与同步区别在于,回调函数的执行是滞后的,主程序是顺序执行的

package com.rocketmq.message;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class AsyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();String msgStr = "异步消息发送测试";/** String topic, String tags, byte[] body* */Message message = new Message("test_topic","test",msgStr.getBytes("UTF-8"));producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult result) {System.out.println(result);System.out.println("消息状态:" + result.getSendStatus());System.out.println("消息id:" + result.getMsgId());System.out.println("消息queue:" + result.getMessageQueue());System.out.println("消息offset:" + result.getQueueOffset());}@Overridepublic void onException(Throwable e) {System.out.println("消息发送失败");}});// producer.shutdown()要注释掉,否则发送失败。原因是,异步发送,还未来得及发送就被关闭了//producer.shutdown();}
}

在这里插入图片描述
在这里插入图片描述

消费信息

package com.rocketmq.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class ConsumerDemo {public static void main(String[] args) throws Exception{/** push类型的消费者,被动接收从broker推送的消息* */DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");consumer.setNamesrvAddr("8.140.130.91:9876");//订阅topic,接收此topic下的所有消息consumer.subscribe("test_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->"+msgs);/** 返回给broker消费者的接收情况* CONSUME_SUCCESS  接收成功* RECONSUME_LATER  延时重发* */return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

测试接收历史消息:

在这里插入图片描述

测试接收实时消息:

在这里插入图片描述

消息的订阅方式

可以通过tag区分不同类型

#生产者
Message message = new Message("test_topic","add",msgStr.getBytes("UTF-8"));#消费者
//完整匹配
consumer.subscribe("test_topic","add");
//或匹配
consumer.subscribe("test_topic","add || delete");

消息过滤器

RocketMQ支持根据用户自定义属性进行过滤 ,类似与SQL

MessageSelector.bySql(“age>=20 AND sex=‘女’”));

消息发送方

package com.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;/*** @author Auspice Tian* @time 2021-04-04 15:10* @current example-roketmq-com.rocketmq.filter*/
public class SyncProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();String msgStr = "发送测试";Message msg = new Message("test_topic","test",msgStr.getBytes("UTF-8"));msg.putUserProperty("age","18");msg.putUserProperty("sex","女");SendResult result = producer.send(msg);System.out.println("消息状态"+result.getSendStatus());System.out.println("消息id"+ result.getMsgId());System.out.println("消息queue"+result.getMessageQueue());System.out.println("消息offset"+result.getQueueOffset());producer.shutdown();}
}

消息接收方

package com.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class ConsumerFilter {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");consumer.setNamesrvAddr("8.140.130.91:9876");consumer.subscribe("test_topic", MessageSelector.bySql("age>=20 AND sex='女'"));consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->"+msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

在这里插入图片描述

测试

消息发送成功,但是由于不满足条件,被过滤器过滤,消费者未接收到

在这里插入图片描述

修改生产者自定义属性

Message msg = new Message("test_topic","test",msgStr.getBytes("UTF-8"));
msg.putUserProperty("age","21");
msg.putUserProperty("sex","女");

可以接收到消息

在这里插入图片描述

消息的顺序发送与接收

原理

消息的顺序收发,需要消费者与生产者二者配合

  • 生产者发送的顺序消息都要放在同一消息队列中,才能保证被顺序取出
  • 消费者接收的顺序消息,需要从同一队列中获取

生产者

package com.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class OrderProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();for (int i = 0; i < 100; i++) {int orderId = i % 10;//生产10个订单的消息,每个订单10条消息String msgStr = "order-->"+i + " orderId-->" + orderId;Message message = new Message("test_topic","ORDER_MSG",msgStr.getBytes("UTF-8"));/** public SendResult send(Message msg, MessageQueueSelector selector, Object arg)* MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);* */SendResult sendResult = producer.send(message,(mqs,msg,arg)->{//匿名函数的作用为选择消息队列的idInteger id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);},//arg与orderId对应,orderId);System.out.println(sendResult);}producer.shutdown();}
}

消费者

public class OrderConsumer {public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-order-group");consumer.setNamesrvAddr("8.140.130.91:9876");consumer.subscribe("test_order_topic","*");consumer.registerMessageListener(new MessageListenerOrderly() {//顺序读取消息@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(Thread.currentThread().getName() + " "+ msg.getQueueId() + " "+ new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}

在这里插入图片描述

可见,订单id为3的消息,会存入同一消息队列,故在同一消息队列的消息可被同一消费线程监听

消息系统的事务

分布式事务分类:

  • 基于单个JVM,数据库分库分表
  • 基于多个JVM,服务拆分
  • 基于多JVM,服务拆分且数据库分库分表

原理

Half(Prepare) Message

消息系统暂时不能投递的消息:发送方将消息发送到了MQ服务端。MQ服务端未收到生产者对消息的二次确认,此时该消息被标记为 暂不能投递状态 处于该状态的消息称为 半消息

Message Status Check

由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ服务端发现某条消息长期处于 半消息,需要主动向消息生产者询问该消息的状态

在这里插入图片描述

  1. 发送方向MQ服务端发送消息

  2. MQ Server将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为 半消息

  3. 发送方开始执行本地事务逻辑

    在这里插入图片描述

  4. 发送方根据本地事务执行结果向MQ Server提交二次确认(Commit或Rollback),MQ Server 收到 Commit 则将半消息标记为 可投递,订阅方最终收到该消息;MQ Server收到 Rollback ,则删除该半消息,订阅方不会收到该消息

  5. 在断网或应用重启情况下,上述4提交的二次确认最终未到达MQ Server,经过固定时间后,MQ Server将对该消息发起消息回查

  6. 发送方收到消息回查,需要检查对应消息的本地事务执行的最终结果

  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server仍按4对半消息进行确认

生产者

package com.rocketmq.trancation;public class TrancationProducer {public static void main(String[] args) throws Exception{TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");producer.setNamesrvAddr("8.140.130.91:9876");//设置事务监听器producer.setTransactionListener(new TransactionImpl());producer.start();//发送消息Message message = new Message("pay_topic","用户A给用户B转钱".getBytes("UTF-8"));producer.sendMessageInTransaction(message,null);Thread.sleep(99999);producer.shutdown();}
}

本地事务处理

package com.rocketmq.trancation;public class TransactionImpl implements TransactionListener {private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();/*** 本地执行业务具体的逻辑* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {Thread.sleep(500);System.out.println("用户A账户减500");// System.out.println(1/0);System.out.println("用户B账户加500元.");Thread.sleep(800);//二次提交确认STATE_MAP.put(msg.getTransactionId(),LocalTransactionState.COMMIT_MESSAGE);return LocalTransactionState.COMMIT_MESSAGE;} catch (InterruptedException e) {e.printStackTrace();}//回滚STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;}/*** 消息回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return STATE_MAP.get(msg.getTransactionId());}
}

消费者

package com.rocketmq.trancation;public class TransactionConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_transaction_consumer");consumer.setNamesrvAddr("8.140.130.91:9876");//订阅topic,接收消息consumer.subscribe("pay_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

测试

  • 返回 commit 状态时,消费者能够接收消息

    在这里插入图片描述

  • 返回 rollback 状态时,消费者接收不到消息

    在这里插入图片描述
    在这里插入图片描述

  • 消息回查测试

    public class TransactionImpl implements TransactionListener {private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();/*** 本地执行业务具体的逻辑* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {System.out.println("用户A账户减500");Thread.sleep(500);//            System.out.println(1/0);System.out.println("用户B账户加500元.");Thread.sleep(800);//二次提交确认STATE_MAP.put(msg.getTransactionId(),LocalTransactionState.COMMIT_MESSAGE);return LocalTransactionState.UNKNOW;
    //            return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {e.printStackTrace();}//回滚STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;}/*** 消息回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("状态回查-->"+ msg.getTransactionId() + " "+ STATE_MAP.get(msg.getTransactionId()));return STATE_MAP.get(msg.getTransactionId());}
    }
    

在这里插入图片描述

Consumer

Push和Pull模式

  • push模式:客户端与服务端建立连接后,当服务端有消息,将消息推送到客户端
  • pull模式:客户端不断的轮询请求服务端,来获取新的而消息

push模式需要消息系统与消费端之间建立长连接,对消息系统是很大的负担,所以在具体实现时,都采用消费端主动拉取的方式,即consumer轮询从broker拉取消息

在RocketMQ中,push与pull的区别

Push:DefaultPushConsumer 将轮询过程都封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener监听器的consumeMessage()来消费,对用户而言,感觉消息是被推送来的。

Pull:取消息过程需要自己写:首先从目标topic中拿到MessageQueue集合并遍历,然后针对每个MessageQueue批量取消息。一次Pull,都要记录该队列的offset,知道去完MessageQueue,再换另一个

长轮询保证Pull的实时性

长轮询(长连接+轮询),客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或超时才返回给客户端,然后关闭连接,客户端处理完响应信息后再向服务器发送新的请求

image-20210405103624028

消息模式

DefaultMQPushConsumer实现了自动保存offset值及多个consumer的负载均衡

//设置组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");

通过 groupname 将多个consumer组合在一起,会存在消息的分配问题(消息是发送到组还是每个消费者)

  • 集群模式(默认)

    同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的

  • 广播模式

    同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,一个消息会被分发多次,被多个Consumer消费

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

重复消息的解决方案

重复消息的产生情况:

  • 生产者不断发送重复消息到消息系统

  • 网络不可达 :只要通过网络交换数据,就无法避免这个问题

由于接收到重复消息不可避免,问题变为 消费端收到重复消息,怎么处理

  1. 消费端处理消息的业务逻辑保持幂等性

    幂等性:无论执行多少次,结果都一样

    eg:while s!=1;在执行sql语句

  2. 保证每条消息都有唯一编号且保证消息处理成功与去重的日志同时出现

    利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息

    如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RocketMQ不解决消息重复的问题 的原因

RocketMQ存储

RocketMQ中的消息数据存储,采用了零拷贝技术(mmap + write方式),文件系统采用 Linux Ext4文件系统进行存储。

消息数据的存储

在RocketMQ中,消息数据是保存在磁盘文件中的,使用RocketMQ尽可能保证顺序写入,比随机写入效率高很多

  • ConsumeQueue:索引文件,存储数据指向物理文件的位置

  • CommitLog是真正存储数据的文件

    在这里插入图片描述

  • 消息主体及元数据都存储在CommitLog中

  • Consume Queue 是一个逻辑队列,存储了这个Queue在CommitLog中的其实offset、log大小和MessageTag的hashcode

  • 每次读取消息队列先读取ConsumerQueue,然后再通过consumerQueue中拿到消息主体

同步刷盘和异步刷盘

RocketMQ为提高性能,会尽可能保证磁盘的顺序读写。消息通过Producer写入RocketMQ的时候,有两种写磁盘方式,分别是同步刷盘与异步刷盘

  • 同步刷盘——安全性
    • 在返回写成功状态时,消息已经写入磁盘
    • 执行流程:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
  • 异步刷盘——效率
    • 在返回写成功状态时,消息可能只是被写入内存的PAGECACHE,写操作的返回快,吞吐量大
    • 当内存里的消息积累到一定程度,统一触发写磁盘动作,快速写入

修改刷盘方式

broker.conf

flushDiskType=ASYNC_FLUSH——异步

flushDiskType=SYNC_FLUSH——同步

在这里插入图片描述

重试策略

重试情况分析

在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试分为 producer端的重试consumer端重试

在这里插入图片描述

producer端重试

  • 指定重试次数
  • 指定超时时间
//消息发送失败时,重试3次
producer.setRetryTimesWhenSendFailed(3);// 发送消息,并且指定超时时间
SendResult sendResult = producer.send(msg, 1000);
  • 只有同步生产者才会进行错误重试。
  • 只有特定异常才会重试;
  • 设置的超时时间小于实际执行时间,则不会进行重试
#DefaultMQProducerImpl
//设置发送总次数    
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;for (; times < timesTotal; times++) {try{if (timeout < costTime) {callTimeout = true;break;}}catch (RemotingException e) {...continue;}catch (MQClientException e) {...continue;}catch (MQBrokerException e){switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;}}
}

consumer端重试

  • 消息处理的异常失败
  • 消息超时接收的超时失败

异常重试

消息正常到了消费者端,处理失败,发生异常。eg:反序列化失败,消息数据本身无法处理

消息状态

package org.apache.rocketmq.client.consumer.listener;public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;
}

broker的启动日志

INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

如果消息消费失败即broker收到 RECONSUME_LATER ,则broker会对消息进行重试发送,直至2h

演示:

public class ConsumerDemo {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");consumer.setNamesrvAddr("8.140.130.91:9876");// 订阅topic,接收此Topic下的所有消息consumer.subscribe("test_error_topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->" + msgs);if(msgs.get(0).getReconsumeTimes() >= 3){// 重试3次后,不再进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}
}

重试消息和原始发送消息不是同一条

在这里插入图片描述

timeout

由于消息没有从MQ发送到消费者上,那么在MQ Server内部会不断的尝试发送这条消息,直至发送成功位置

也就是,服务端没有接收到消费端发来的消息的反馈,定义为超时

RocketMQ的集群

集群模式

单个Master

  • 风险较大,一旦Broker重启或者宕机,会导致整个服务不可用,只做开发环境

多Master

  • 一个集群无Slave,全是Master,例如2个Master或者3个Master
  • 单台机器宕机,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性受到影响

多Master多Slave,异步复制

  • 每个Master配置一个Slave,有多个Master-Slave对,HA(双机集群系统)采用异步复制方式,主备有短暂消息延迟,毫秒级
  • 优点:即使磁盘损坏,丢失的消息非常少,实时性不会收到影响,消费者仍可从Slave消费,此过程对应用透明,不需人工干预,性能同多Master模式一样
  • 缺点:Master宕机或磁盘损坏,会丢失少量消息

多Master多Slave,同步双写

  • 每个Master配置一个Slave,有多个Master-Slave对,HA(双机集群系统)采用同步双写方式,主备都写成功,向应用返回成功
  • 优点:数据与服务无单点,Master宕机情况下,消息无延迟,服务可用性和数据可用性非常高
  • 缺点:性能比异步复制模式低

搭建2m2s集群

创建2个NameServer(master)

#nameserver1
docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq-data/rmqserver01/logs:/opt/logs \
-v /data/rmq-data/rmqserver01/store:/opt/store \
foxiswho/rocketmq:server-4.3.2#nameserver2
docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq-data/rmqserver02/logs:/opt/logs \
-v /data/rmq-data/rmqserver02/store:/opt/store \
foxiswho/rocketmq:server-4.3.2

搭建broker(2master)

#broker01配置文件 
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker01
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11911#master broker01
docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker01/logs:/opt/logs \
-v /data/rmq-data/rmqbroker01/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
  • brokerId:0表示主,>0表示Slave

  • fileReservedTime:消息保存时间 单位——h

  • deleteWhen:什么是时候对过期消息清理 24小时制

  • brokerRole:[同步双写|异步双写]_[主] | [从]

    [SYNC|ASYNC_MASTER] | [SLAVE]

  • flushDiskType:刷盘方式 [同步|异步]_FLUSH

    [SYNC|ASYNC_FLUSH]

  • brokerIP1:访问broker的ip地址

  • brokerIP2:主从同步的ip

  • listenPort:与客户端交互的端口(+1,-2)

在这里插入图片描述

#broker02配置文件
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11811#master broker02
docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker02/logs:/opt/logs \
-v /data/rmq-data/rmqbroker02/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2

搭建从broker(slave)

#slave broker01配置文件 
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11711#slave broker01
docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker03/logs:/opt/logs \
-v /data/rmq-data/rmqbroker03/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#slave broker02配置文件 
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11611#slave broker02
docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker04/logs:/opt/logs \
-v /data/rmq-data/rmqbroker04/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#启动容器
docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04

测试

生产者

public class SyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test_cluster_group");producer.setNamesrvAddr("8.140.130.91:9876;8.140.130.91:9877");producer.start();String msgStr = "Cluster测试消息";/** String topic, String tags, byte[] body* */Message message = new Message("test_cluster_topic","CLUSTER",msgStr.getBytes("UTF-8"));SendResult result = producer.send(message);System.out.println(result);System.out.println("消息状态:" + result.getSendStatus());System.out.println("消息id:" + result.getMsgId());System.out.println("消息queue:" + result.getMessageQueue());System.out.println("消息offset:" + result.getQueueOffset());producer.shutdown();}
}

消费者

public class ConsumerDemo {public static void main(String[] args) throws Exception{/** push类型的消费者,被动接收从broker推送的消息* */DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_cluster_group");consumer.setNamesrvAddr("8.140.130.91:9876;8.140.130.91:9877");//订阅topopic,接收此topic下的所有消息consumer.subscribe("test_cluster_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->"+msgs);/** 返回给broker消费者的接收情况* CONSUME_SUCCESS  接收成功* RECONSUME_LATER  延时重发* */return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

在这里插入图片描述

SpringBoot整合RocketMQ

下载依赖

由于rocketMQ没有发布到Mven中央仓库,需要自行下载源码,并载入到本地Maven仓库

#源码地址
https://hub.fastgit.org/apache/rocketmq-spring#进入源码目录,执行
mvn clean install

导入依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.3</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins>
</build>

application.properties

#Spring boot application
spring.application.name = test-rocketmq
spring.rocketmq.nameServer=8.140.130.91:9876
spring.rocketmq.producer.group=test_spring_producer_group

基本使用

生产者发送消息

package com.rocketmq.spring;@Component
public class SpringProducer {//注入rocketmq模板@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** @param topic* @param msg*/public void sendMsg(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);}
}

启动类

package com.rocketmq;@SpringBootApplication
public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class,args);}
}

测试生产消息

package com.rocketmq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {@AutowiredSpringProducer producer;@Testpublic void testSendMsg(){String msg = "第二个Spring RocketMq 消息";this.producer.sendMsg("test_spring_topic",msg);System.out.println("发送成功!");}}

在这里插入图片描述

消费者消费消息

package com.rocketmq.spring;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "test_spring_topic",consumerGroup = "test_spring_consumer_group",selectorExpression = "*",consumeMode = ConsumeMode.CONCURRENTLY
)
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("收到消息->"+msg);}
}

在这里插入图片描述

事务消息

生产者

package com.rocketmq.spring.transaction;@Component
public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** @param topic* @param msg*/public void sendMsg(String topic,String msg){Message message = (Message) MessageBuilder.withPayload(msg).build();//此处的txProducerGroup与事务监听器的@RocketMQTransactionListener(txProducerGroup = "")一致this.rocketMQTemplate.sendMessageInTransaction("test_tx_producer_group",topic,message,null);System.out.println("消息发送成功");}
}

生产者监听器

package com.rocketmq.spring.transaction;@RocketMQTransactionListener(txProducerGroup = "test_tx_producer_group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private static Map<String,RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();/*** 执行本地事务** @param message* @param o* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println("执行操作1");Thread.sleep(500L);System.out.println("执行操作2");Thread.sleep(500L);STATE_MAP.put(transactionId,RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){e.printStackTrace();}STATE_MAP.put(transactionId,RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK;}/*** 消息回查** @param message* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);System.out.println("回查消息->transactionId = "+transactionId+",state = "+STATE_MAP.get(transactionId));return STATE_MAP.get(transactionId);}
}

消息生产测试

@Test
public void testSendTransactionMsg(){String msg = "事务消息测试!";this.transactionProducer.sendMsg("test_spring_transaction_topic",msg);System.out.println("发送成功");
}

在这里插入图片描述

消费者测试

package com.rocketmq.spring.transaction;@Component
@RocketMQMessageListener(topic = "test_spring_transaction_topic",consumeMode = ConsumeMode.CONCURRENTLY,selectorExpression = "*",consumerGroup = "test_tx_consumer_group"
)
public class TransactionConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("收到消息->"+s);}
}

在这里插入图片描述

消息回查测试

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println("执行操作1");Thread.sleep(500L);System.out.println("执行操作2");Thread.sleep(500L);STATE_MAP.put(transactionId,RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.UNKNOWN;}catch (Exception e){e.printStackTrace();}STATE_MAP.put(transactionId,RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK;
}

在这里插入图片描述

相关文章:

【开发】中间件——RocketMQ

分布式消息系统 RocketMQ概念&#xff0c;用途&#xff0c;特性安装RocketMQ掌握RocketMQ的api使用对producer、consumer进行详解了解RocketMQ的存储特点 简介及相关概念JavaAPISpringBoot整合RocketMQ消息的顺序收发消息系统的事务、存储、重试策略消息系统的集群 RocketMQ R…...

36 UnitTest框架 - 参数化

目录 一、参数化环境准备 1、方式一&#xff1a;在终端&#xff08;cmd&#xff09;安装parameterized 2、方式二&#xff1a;在Pycharm中安装parameterized 二、参数化 1、什么事参数化&#xff1f; 2、参数化引入案例 &#xff08;1&#xff09;需求 &#xff08;2&a…...

Qt源码阅读(四) 事件循环

事件系统 文章为本人理解&#xff0c;如有理解不到位之处&#xff0c;烦请各位指正。 文章目录事件系统什么是事件循环&#xff1f;事件是如何产生的&#xff1f;sendEventpostEvent事件是如何处理的&#xff1f;事件循环是怎么遍历的&#xff1f;事件过滤器event夹带私货时间Q…...

银行数字化转型导师坚鹏:银行数字化领导力提升之道

银行数字化领导力提升之道 ——融合中西智慧&#xff0c;践行知行合一思想&#xff0c;实现知行果合一 课程背景&#xff1a; 很多银行存在以下问题&#xff1a;不知道如何领导数字员工&#xff1f;不清楚银行数字化领导力模型的内涵&#xff1f;不知道如何开展银行数字化…...

Vue2 -- 自定义单选内容的单选框组件

自定义单选内容的单选框组件 之前做的一个项目&#xff0c;在项目中有一个关于人员权限分配的功能&#xff0c;给人员指定各个模块的权限信息&#xff0c;分为 write 可写权限read 可读权限none 没有权限 项目要求画面中只显示 W R 两个按钮控制指定权限信息&#xff0c;都不…...

让PyTorch训练速度更快,你需要掌握这17种方法

掌握这 17 种方法&#xff0c;用最省力的方式&#xff0c;加速你的 Pytorch 深度学习训练。近日&#xff0c;Reddit 上一个帖子热度爆表。主题内容是关于怎样加速 PyTorch 训练。原文作者是来自苏黎世联邦理工学院的计算机科学硕士生 LORENZ KUHN&#xff0c;文章向我们介绍了在…...

LeetCode-309. 最佳买卖股票时机含冷冻期

目录题目思路动态规划题目来源 309. 最佳买卖股票时机含冷冻期 题目思路 每天最多只可能有三种状态中的一种 0表示当前处于买入状态(持有股票) 1表示当前处于卖出状态(不持有股票) 2表示当前处于冷冻状态 设dp[i][j]表示i - 1天状态为j时所拥有的最大现金 dp[i][0] Math.ma…...

AUTOSAR知识点Com(七):CANSM初认知

目录 1、概述 2、CanSM主要做什么 2.1、CAN控制器状态管理 2.2、CAN收发器状态管理 2.3、Busoff检测 1、概述 CANSM&#xff08;Controller Area Network State Manager&#xff09;是AUTOSAR&#xff08;Automotive Open System Architecture&#xff09;标准中的一个模块…...

递归:斐波那契数列、递归实现指数型枚举、递归实现排列型枚举

递归&#xff1a;O(2^n) 调用自己 例题及代码模板&#xff1a; 斐波那契数列 输入一个整数 n &#xff0c;求斐波那契数列的第 n 项。 假定从 0 开始&#xff0c;第 0 项为 0。 数据范围 0≤n≤39 样例 输入整数 n5 返回 5 #include <iostream> #include <cstring&g…...

oracle模糊查询时字段内容包含下划线的解决办法

最近项目中遇到一个关于模糊查询问题。表tabA中的字段name的值有下划线的情况&#xff0c;在模糊查询时发现查询的记录不对。 表的结构 表名&#xff1a;tabA id name sex 1 test_601 1 2 test_602 2 3 test16 1 4 t…...

C++:explicit关键字

C中的explicit关键字只能用于修饰只有一个参数的类构造函数&#xff0c;它的作用是表明该构造函数是显示的&#xff0c;而非隐式的&#xff0c;跟它相对应的另一个关键字是implicit&#xff0c;意思是隐藏的&#xff0c;类构造函数默认情况下即声明为implicit(隐式)。那么显示声…...

【C5】bmc wtd,post

文章目录1.bmc_wtd_cpld&#xff1a;syscpld.c中wd_en和wd_kick节点对应寄存器&#xff0c;crontab&#xff0c;FUNCNAME2.AST芯片WDT切换主备&#xff1a;BMC用WDT2作为主备切换的控制器2.1 AC后读取&#xff1a;bmc处于主primary flash&#xff08;设完后&#xff1a;实际主&…...

200.Spark(七):SparkSQL项目实战

一、启动环境 需要启动mysql,hadoop,hive,spark。并且能让spark连接上hive(上一章有讲) #启动mysql,并登录,密码123456 sudo systemctl start mysqld mysql -uroot -p#启动hive cd /opt/module/ myhadoop.sh start#查看启动情况 jpsall#启动hive cd /opt/module/hive/…...

区块链系统:挖矿原理

在比特币的P2P网络中&#xff0c;有一类节点&#xff0c;它们时刻不停地进行计算&#xff0c;试图把新的交易打包成新的区块并附加到区块链上&#xff0c;这类节点就是矿工。因为每打包一个新的区块&#xff0c;打包该区块的矿工就可以获得一笔比特币作为奖励。所以&#xff0c…...

【博弈】【清华冬令营2018模拟】取石子

写完敢说全网没有这么详细的题解了。 注意&#xff1a;题解长是为了方便理解&#xff0c;所以读起来速度应该很快。 题目描述 有 nnn 堆石子&#xff0c;第 iii 堆有 xix_ixi​ 个。 AliceAliceAlice 和 BobBobBob 轮流去石子&#xff08;先后手未定&#xff09;&#xff0c; …...

嵌入式:BSP的理解

BSP概念总结BSP定义BSP的特点BSP的主要工作BSP在嵌入式系统和Windowsx系统中的不同BSP和PC机主板上的BIOS区别BSP与 HAL关系嵌入式计算机系统主要由 硬件层&#xff0c;中间层&#xff0c;系统软件层和应用软件层四层组成。硬件层&#xff1a;包含CPU&#xff0c;存储器(SDRAM&…...

Linux主机Tcpdump使用-centos实例

1、安装前系统信息 ifconfig查看系统网络接口情况。这里可以看到3个interface&#xff0c;ens160是正常使用的网口&#xff0c;lo是主机的loopback地址127.0.0.1。另外&#xff0c;由于centos安装在虚拟主机上&#xff0c;virbr0是KVM默认创建的一个Bridge,其作用是为连接其上的…...

线性DP——AcWing 898. 数字三角形、AcWing 895. 最长上升子序列

AcWing 898. 数字三角形 1.题目 898. 数字三角形 2.思路 DP问题首先考虑状态转移方程&#xff0c;定义一个集合f ( i , j) &#xff0c;表示从第一个数字&#xff08;1,1&#xff09;走到第 i行&#xff0c;第 j列&#xff08;i , j&#xff09;的所有方案的集合&#xff0c…...

SpringMVC

SpringMVC配置 引入Maven依赖 &#xff08;springmvc&#xff09;web.xml配置DispatcherServlet配置 applicationContext 的 MVC 标记开发Controller控制器 几点注意事项&#xff1a; 在web.xml中 配置<load-on-startup> 0 </load-on-startup> 会自动创建Spring…...

C++模板基础(二)

函数模板&#xff08;二&#xff09; ● 模板实参的类型推导 – 如果函数模板在实例化时没有显式指定模板实参&#xff0c;那么系统会尝试进行推导 template<typename T> void fun(T input, T input2) {std::cout << input << \t << input2 << …...

什么是linux内核态、用户态?

目录标题为什么需要区分内核空间与用户空间内核态与用户态如何从用户空间进入内核空间整体结构为什么需要区分内核空间与用户空间 在 CPU 的所有指令中&#xff0c;有些指令是非常危险的&#xff0c;如果错用&#xff0c;将导致系统崩溃&#xff0c;比如清内存、设置时钟等。如…...

day8—选择题

文章目录1.Test.main() 函数执行后的输出是&#xff08;D&#xff09;2. JUnit主要用来完成什么&#xff08;D&#xff09;3.下列选项中关于Java中super关键字的说法正确的是&#xff08;A&#xff09;1.Test.main() 函数执行后的输出是&#xff08;D&#xff09; public clas…...

ngx错误日志error_log配置

ngx之error_log 日志配置格式&#xff1a; 常见的错误日志级别 错误日志可配置位置 关闭error_log配置 设置debug 日志级别的前提&#xff1a; ngx之error_log 日志配置格式&#xff1a; error_log 存放路径 日志级别 例&#xff1a; error_log /usr/local/log…...

1.11、自动化

自动化 一、java 手机自动化 首先new DesertCapabilities&#xff08;这是一个类&#xff09; setCapability – 设置信息 获取appium的驱动对象 new AppiumDriver – 本机IP地址:端口号/wd/hub,前面的设置值信息 driver.findElementById() – 通过id找位置 click() – 点击 &…...

函数的定义与使用及七段数码管绘制

函数的定义 函数是一段代码的表示 函数是一段具有特定功能的、可重用的语句组 函数是一种功能的抽象&#xff0c;一般函数表达特定功能 两个作用&#xff1a;降低编程难度 和 代码复用 求一个阶乘 fact就是 函数名 n就是参数 return就是输出部分即返回值 而函数的调用就是…...

怎么压缩pdf文件大小?pdf文件太大如何压缩?

喜爱看小说的小伙伴们都会在网上下载很多的pdf格式电子书以方便随时阅览&#xff0c;但是pdf的电子书一般都过于的冗长&#xff0c;下载后的储存也是一个问题&#xff0c;怎么pdf压缩大小呢&#xff1f;可以试试今天介绍的这款pdf在线压缩工具来进行pdf压缩&#xff08;https:/…...

阿里云Linux服务器登录名ecs-user和root选择问题

阿里云服务器Linux系统登录名可以选择root或ecs-user&#xff0c;root具有操作系统的最高权限&#xff0c;但是root会导致的安全风险比较大&#xff0c;ecs-user比较安全&#xff0c;但是如果系统后续依赖root权限就会比较麻烦&#xff0c;从安全的角度&#xff0c;建议选择ecs…...

【云原生】 初体验阿里云Serverless应用引擎SAE(三),挂载配置文件使应用的配置和运行的镜像解耦

目录 一、前言二、SAE配置1、创建配置项2、配置SAE Nginx服务效果1、【云原生】 初体验阿里云Serverless应用引擎SAE(一),部署Nginx服务 2、【云原生】 初体验阿里云Serverless应用引擎SAE(二),前端Nginx静态文件持久化到对象存储OSS 本篇 3、【云原生】 初体验阿里云Se…...

Oracle用户密码过期,修改永不过期

修改密码有效过期时间&#xff0c;可以通过以下四步设置&#xff0c;如果再第一步发现本身的密码过期时间为无限期的&#xff0c;那就请各位小伙伴绕过&#xff0c;如果发现不是无期限的&#xff0c;那么必须设置第四步&#xff0c;才会生效。 目录 第一步&#xff1a;查询密码…...

welearn 视听说1-4

词汇题&#xff08;55道&#xff09; 1. You should carefully think over_____ the manager said at the meeting. A. that B. which C. what D. whose 1.选C,考察宾语从句连接词&#xff0c;主句谓语动词think over后面缺宾语&#xff0c;后面的宾语从句谓语动…...