当前位置: 首页 > news >正文

canal rocketmq

上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。

使用队列,可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里,也可以直接使用canal server, canal server原生支持几种队列:Kafka, RocketMQ ,RabbitMQ, PulsarMQ, 下面了解一下canal sever具体的处理过程。

canal server将消息投递到mq中

在canal server中,如果检测到配置了mq, 就会启动线程来读取bin log事件,并投递到mq中:
CanalMQStarter

while (running && destinationRunning.get()) {Message message;if (getTimeout != null && getTimeout > 0) {message = canalServer.getWithoutAck(clientIdentity,getBatchSize,getTimeout.longValue(),TimeUnit.MILLISECONDS);} else {message = canalServer.getWithoutAck(clientIdentity, getBatchSize);}final long batchId = message.getId();int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();if (batchId != -1 && size != 0) {canalMQProducer.send(canalDestination, message, new Callback() {@Overridepublic void commit() {canalServer.ack(clientIdentity, batchId); // 提交确认}@Overridepublic void rollback() {canalServer.rollback(clientIdentity, batchId);}}); // 发送message到topic} else {try {Thread.sleep(100);} catch (InterruptedException e) {// ignore}}}

从代码可以看到,首先调用getWithoutAck从实例获取事件,然后调用canalMQProducer.send将消息投递到队列中,如果投递成功就执行ack,否则执行rollback, 因为投递消息到队列是非常快的操作,所以这就降低了阻塞的风险。

最终发送mq消息的代码如下(CanalRocketMQProducer):

    private void sendMessage(Message message, int partition) {//...SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {if (partition >= mqs.size()) {return mqs.get(partition % mqs.size());} else {return mqs.get(partition);}}, null);//...}

这里有个分区的概念,对于RocketMQ来说就是队列选择,这关系到顺序消费。

业务代码使用RocketMQCanalConnector消费数据

    while (running) {try {connector.connect();connector.subscribe();while (running) {List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取messagefor (Message message : messages) {long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// }} else {printSummary(message, batchId, size);printEntry(message.getEntries());// logger.info(message.toString());}}connector.ack(); // 提交确认}} catch (Exception e) {logger.error(e.getMessage(), e);}}connector.unsubscribe();// connector.stopRunning();
}

可以看到这和之前ClusterCanalConnector一样的处理方法,只是底层实现不一样,在subscribe的时候,调用了mq的subscribe:

    public synchronized void subscribe(String filter) throws CanalClientException {//...rocketMQConsumer.subscribe(this.topic, "*");rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {context.setAutoCommit(true);boolean isSuccess = process(messageExts);if (isSuccess) {return ConsumeOrderlyStatus.SUCCESS;} else {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});rocketMQConsumer.start();//...}

可以看到这里使用了MessageListenerOrderly来进行顺序消费, 使用process来处理消息

private boolean process(List<MessageExt> messageExts) {//...for (MessageExt messageExt : messageExts) {//...if (!flatMessage) {Message message = CanalMessageDeserializer.deserializer(data);messageList.add(message);} else {FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);messageList.add(flatMessage);}ConsumerBatchMessage batchMessage;if (!flatMessage) {batchMessage = new ConsumerBatchMessage<Message>(messageList);} else {batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);}try {messageBlockingQueue.put(batchMessage);} catch (InterruptedException e) {logger.error("Put message to queue error", e);throw new RuntimeException(e);}boolean isCompleted;try {isCompleted = batchMessage.waitFinish(batchProcessTimeout);} catch (InterruptedException e) {logger.error("Interrupted when waiting messages to be finished.", e);throw new RuntimeException(e);}boolean isSuccess = batchMessage.isSuccess();return isCompleted && isSuccess;}

这里将数据放到了messageBlockingQueue中,然后等待消息执行完成, ConsumerBatchMessage内置了一个CountDownLatch, batchMessage.waitFinish会阻塞在这里。
客户端使用getFlatList/getFlatListWithoutAck取数据时,就是从messageBlockingQueue取出数据,调用ack时,会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。

    @Overridepublic List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {if (this.lastGetBatchMessage != null) {throw new CanalClientException("mq get/ack not support concurrent & async ack");}ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);//...}@Overridepublic void ack() throws CanalClientException {if (this.lastGetBatchMessage != null) {this.lastGetBatchMessage.ack();}//...}

对于MessageListenerOrderly来说,是一个消费线程对应一个mq队列的,从而实现多线程消费,而这里把不同mq队列的消息在messageBlockingQueue中排队,并且使用getListWithoutAck/ack也不支持并发,又变成了单线程模式,这可能对性能造成影响,建议生产环境对性能有要求时,采用自己写代码来实现mq的消费。

配置

mq相关参数说明

相关文章:

canal rocketmq

上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的&#xff0c;可以先用单点将canal事件发送到mq中&#xff0c;再由mq并发处理&#xff0c;另外mq还可以做到削峰的作用&#xff0c;让canal数据不至于阻塞。 使用队列&#xff0c;可以自己起一个单实…...

【数据库系统概论】第九章关系查询处理何查询优化

9.1查询处理 一&#xff1a;查询处理步骤 关系数据库管理系统查询处理可以分为4个阶段&#xff1a; 查询分析查询检查查询优化查询执行 &#xff08;1&#xff09;查询分析 任务&#xff1a;对查询语句进行扫描&#xff0c;分析词法、语法是否符合SQL语法规则 如果没有语…...

bp盐丘模型波场数值模拟matlab

波场数值模拟是地震勘探和地震学研究中常用的工具&#xff0c;而BP&#xff08;Backpropagation&#xff09;盐丘模型是一种用于地下介质成像的方法。如果您想在MATLAB中进行波场数值模拟&#xff0c;并结合BP盐丘模型进行地下成像&#xff0c;可以按照以下步骤进行&#xff1a…...

结构体对齐规则

1.第一个成员在结构体变量偏移量为0的地址处。 2.其他成员变量对齐到某个数字(对齐数)的整数倍的地址处。(对齐数编译器默认的一个对齐数与该成员大小的较小值&#xff09;注意&#xff1a;目前有且只有VS编译器有默认为8. 3.结构体总大小为最大对齐数的整数倍。 4.如果嵌套…...

css 如何让元素内部文本和外部文本 一块显示省略号

实际上还是有这样的需求的 <div class"container"><span>啊啊啊啊啊啊啊啊</span>你好啊撒撒啊撒撒撒撒啊撒撒撒撒撒说</div>还是有这样的需求的哦。 div.container {width: 200px;white-space: nowrap;text-overflow: ellipsis;overflow:…...

SQL语句-中级

一、Mysql软件使用 1.启动/停止Mysql服务器 任务管理器 cmd命令&#xff1a;以管理员的身份打开cmd命令行 net start mysql80//开启net stop mysql80//停止 2.连接与断开Mysql服务器 注意要在bin目录下执行:-u用户名root&#xff0c;-p密码 mysql -u root -p 可能出现的…...

巧用h2-database.jar连接数据库

文章目录 一 、概述二、实践三、解决办法 一 、概述 H2 Database是一个开源的嵌入式数据库引擎&#xff0c;采用java语言编写&#xff0c;不受平台的限制&#xff0c;同时H2 Database提供了一个十分方便的web控制台用于操作和管理数据库内容。H2 Database还提供兼容模式&#…...

136.只出现一次的数字

136. 只出现一次的数字 - 力扣&#xff08;LeetCode&#xff09; 给你一个 非空 整数数组 nums &#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法来解决此问题&#xff0c;且…...

mysql中遇到查询字段的别名与函数冲突问题

比如以下哎&#xff0c;我查询城市行业数量排名 select City, DENSE_RANK() over(ORDER BY COUNT(Id) DESC) rank, COUNT(Id) num,IndustrySubGroupName from base_companyinfo WHERE IndustrySubGroupName工业机器人 GROUP BY City 上面使用 DENSE_RANK() 函数来计算排名&am…...

直播获奖

题目描述 NOI2130 即将举行。为了增加观赏性&#xff0c; CCF 决定逐一评出每个选手的成 绩&#xff0c;并直播即时的获奖分数线。本次竞赛的获奖率为 &#x1d464;% &#xff0c;即当前排名前 &#x1d464;% 的选手的最低成绩就是即时的分数线。 更具体地&#xff0c…...

选择适合自身业务的HTTP代理有哪些因素决定?

相信对很多爬虫工作者和数据采集的企业来说&#xff0c;如何选购适合自己业务的HTTP代理是一个特别特别困扰的选题&#xff0c;市面上那么多HTTP代理厂商&#xff0c;好像这家有这些缺点&#xff0c;转头又看到另外一家的缺点&#xff0c;要找一家心仪的仿佛大海捞针。今天我们…...

1.3 do...while实现1+...100 for实现1+...100

思路&#xff1a;两个变量&#xff0c;一个变量存储数据之和&#xff0c;一个变量实现自增就行 do...while int i, s;i 1;s 0;do{s 1;i;} while (i < 100);cout << s << endl; for int i, j0;for (i 1; i < 100; i){j 1;}cout << j << …...

react数据管理之setState与Props

react数据管理之setState与Props setState调用原理 setState 是 React 中用于更新组件状态&#xff08;state&#xff09;的方法。它的调用原理可以分为以下几个步骤&#xff1a; 状态的改变&#xff1a;当调用 setState 时&#xff0c;React 会将新的状态对象与当前状态对象…...

如何保护我们的网络安全

保护网络安全是至关重要的&#xff0c;尤其是在今天的数字化时代。以下是一些保护网络安全的基本步骤&#xff1a; 1、使用强密码&#xff1a;使用包含字母、数字和特殊字符的复杂密码。不要在多个网站上重复使用相同的密码。定期更改密码。 2、启用双因素认证 (2FA)&#xff…...

springboot 制造装备物联及生产管理ERP系统

springboot 制造装备物联及生产管理ERP系统 liu1113625581...

Google zxing 生成带logo的二维码图片

环境准备 开发环境 JDK 1.8SpringBoot2.2.1Maven 3.2 开发工具 IntelliJ IDEAsmartGitNavicat15 添加maven配置 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.4.0</version> </…...

使用Python计算平面多边形间最短距离

要计算平面多边形间的最短距离&#xff0c;首先需要导入Excel表格中的多边形数据&#xff0c;然后使用GJK&#xff08;Gilbert-Johnson-Keerthi&#xff09;算法来判断两个多边形是否重叠。如果两个多边形不重叠&#xff0c;可以计算它们之间的最短距离。 以下是一个基本的Pyt…...

【Python】Python语言基础(中)

第十章 Python的数据类型 基本数据类型 数字 整数 整数就是整数 浮点数 在编程中&#xff0c;小数都称之为浮点数 浮点数的精度问题 print(0.1 0.2) --------------- 0.30000000000000004 ​​1.可以通过round()函数来控制小数点后位数 round(a b)&#xff0c;则表示…...

观察者模式、订阅者发布者模式、vtk中的观察者模式

文章目录 什么是观察者模式vtk是如何实现的观察者模式.AddObserver什么时候使用观察者模式&#xff1f;什么使用订阅发布者模式?观察者模式的实现订阅发布者的实现总结知识补充: 什么是观察者模式 用于在对象之间建立一对多的依赖关系&#xff0c;当一个对象的状态发生变化时…...

关于element-ui中,页面上有多个el-table并通过v-if、v-else等控制是否显示时,type=selection勾选框失效或不显示的问题

刚开始是勾选框那一列直接空了什么都不显示&#xff0c;搜索了一下说是给el-table标签增加id&#xff0c;加了之后是显示了&#xff0c;但是点击任何选框都会直接取消全部选中效果&#xff0c;翻了半天源码也没发现到底是哪里事件冲突了还是怎么回事&#xff0c;烦了&#xff0…...

如何用applera1n免费绕过iOS激活锁:完整指南与操作教程

如何用applera1n免费绕过iOS激活锁&#xff1a;完整指南与操作教程 【免费下载链接】applera1n icloud bypass for ios 15-16 项目地址: https://gitcode.com/gh_mirrors/ap/applera1n 你是否购买了一部二手iPhone或iPad&#xff0c;却发现设备被原主人的Apple ID锁定&a…...

保姆级教程:在Ubuntu 20.04上从源码编译aarch64-linux-gnu交叉工具链(GCC 9.2.0 + Glibc 2.30)

深度实践&#xff1a;从源码构建aarch64-linux-gnu交叉工具链全指南 在嵌入式开发领域&#xff0c;交叉编译工具链的构建能力是区分普通开发者与资深工程师的重要标志。当现成的预编译工具链无法满足特定需求时&#xff0c;从源码手动构建工具链不仅能解决兼容性问题&#xff0…...

深入Transformer内部:LoRA到底改动了哪部分权重才让模型“学会”新任务?

深入Transformer内部&#xff1a;LoRA如何通过低秩更新重塑大模型能力 在自然语言处理领域&#xff0c;大型预训练模型的微调一直是个计算密集型任务。传统全参数微调需要更新数十亿甚至数千亿参数&#xff0c;这对大多数研究者和企业来说都是难以承受的负担。低秩适应(LoRA)技…...

量子私有信息检索(QPIR)技术解析与应用前景

1. 量子私有信息检索技术概述量子私有信息检索&#xff08;Quantum Private Information Retrieval, QPIR&#xff09;是密码学领域的一项突破性技术&#xff0c;它允许用户从数据库中检索特定条目而不泄露被查询的是哪个条目。这项技术的核心价值在于解决了隐私保护与数据获取…...

怎么找到一个行业的源头工厂、绕开中间商?一套五步识别流程

你下了单&#xff0c;货到了&#xff0c;质量也还行。但心里一直有个疙瘩&#xff1a;这家供应商到底是自己在生产&#xff0c;还是从别处转手赚了你一道差价&#xff1f; 这个问题对采购方和跨境卖家不是洁癖&#xff0c;是真金白银。同一款产品&#xff0c;源头工厂和中间商的…...

SVG与CSS变量驱动的自动化品牌视觉生成技术实践

1. 项目概述&#xff1a;一分钟品牌塑造的实践宝库在品牌营销和创意设计领域&#xff0c;一个常见的痛点是如何快速、高效地生成高质量的视觉品牌资产。无论是初创公司需要一个临时的Logo&#xff0c;还是内容创作者想为新的系列视频设计一个统一的片头&#xff0c;传统的品牌设…...

数据流编排与异步任务调度中间件kelivo部署与实战指南

1. 项目概述与核心价值最近在折腾一个挺有意思的项目&#xff0c;叫“Chevey339/kelivo”。乍一看这个标题&#xff0c;可能有点摸不着头脑&#xff0c;它不像那些直接告诉你“XX管理系统”或“XX工具库”的项目名那么直白。但恰恰是这种看似神秘的命名&#xff0c;背后往往隐藏…...

基于Taotoken统一API开发支持多模型切换的智能对话应用

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 基于Taotoken统一API开发支持多模型切换的智能对话应用 应用场景类&#xff0c;场景是开发一个需要支持用户自由选择或系统自动切换…...

ElevenLabs情绪驱动API实战手册(2024企业级部署全链路):从F0曲线调制到微表情时序对齐

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;ElevenLabs情绪驱动API核心架构与演进脉络 ElevenLabs 的情绪驱动 API 并非简单叠加情感标签的语音合成增强层&#xff0c;而是构建在多模态表征学习与实时声学参数调控双引擎之上的闭环系统。其核心架…...

基于RAG与向量数据库的智能信息管理系统(IIMS)架构与实现

1. 项目概述&#xff1a;当AI成为你的“第二大脑”最近在折腾一个挺有意思的项目&#xff0c;叫“IIMS-By-AI”。乍一看这个标题&#xff0c;可能有点摸不着头脑&#xff0c;但拆解一下就能明白它的野心&#xff1a;IntelligentInformationManagementSystem&#xff0c; By AI。…...