Kafka+PostgreSql,构建一个总线服务
之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。
环境安装
安装Kafka
官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。
我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可
下载安装包
注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。

wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
安装
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
这里安装完成后的路径是这样子的

重点关注的就是bin,config和logs这3个目录。
启动服务
官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper
先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
在启动kafka服务
bin/kafka-server-start.sh config/server.properties
后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇

启动后控制台的输出是这样

这样,一个kafka的服务节点就启动了。
对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。
*端口转发(仅WSL2环境)
在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,
netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79
后面那个ip地址就写宿主机给WSL环境下发的地址

此外,宿主机和wsl环境都放开9092(或者你设置的)端口
链接测试
这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令

本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可

至此,一个本地的Kafka节点就基本配置完成了
安装PostgreSql
这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT
也可以用docker,快速部署一个节点做本地的测试。
docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
开发测试
新建项目
这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。
总之创建完成后,我的项目长这样

安装依赖
我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。
这里我的项目文件长这样
<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net8.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Dapper" Version="2.1.35" /><PackageReference Include="DotNetCore.CAP" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /></ItemGroup></Project>
注入服务
这里主要注入pg和Kafka
builder.Services.AddCap(x =>
{x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");x.UseKafka("localhost:9092");x.UseDashboard();
});
测试的业务代码
在常规的controller中注入服务
public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher = capPublisher;}
}
写一个生产者接口
public async Task<IActionResult> Producer()
{Console.WriteLine("生产者发布消息: " + DateTime.Now);await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();
}
再写一个延时发送消息的生产者接口
public async Task<IActionResult> ProducerDelay()
{Console.WriteLine("生产者发布延时消息: " + DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);return Ok();
}
创建消费者
[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{Console.WriteLine("订阅到消息: " + value);
}
我们访问下接口看下控制台的打印效果

可以看到,订阅到的时间和生产者发送的实际是一致的。
再试下延时发送

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。
我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。
再看下PostG里保存的消息记录
这是生产记录

这是消费记录

注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。
CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。
好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。
小总结
实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。
至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。
更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。
好了,就这些吧。
相关文章:
Kafka+PostgreSql,构建一个总线服务
之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续…...
电脑怎么录屏?四款录屏工具分享
作为一个刚刚踏入视频创作领域的新手,我一直在寻找一款适合自己的录屏软件。最近,我尝试了四款市面上比较热门的录屏工具。今天,就让我来分享一下我的使用体验,希望能给同样在寻找录屏软件的朋友们一些参考。 一、福昕录屏大师 …...
Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计
文章目录 Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计0. 前言1. XXE代码审计【有1处】1.1. 搜索JRXmlLoader1.1.1. JRAntApiWriteTask1.1.2. JRAntUpdateTask1.1.3. TableReportContextXmlRule1.1.4. JasperCompileManager【存在漏洞】 1.2. 搜索XMLReader1.2…...
Leetcode 每日一题:Word Ladder
写在前面: 今天我们来看一道图论的题,这道题目是我做过目前最难与图论联想到的一道题目之一。如果没有提示的话,我们很容易往 dp 等解决 array 问题的方向去解决它,经过我超过 2个小时的思考我觉得这种方向是没前途的~…...
c++ 编辑器 和 编译器 的详细解释
在 C 开发中,编辑器 和 编译器 是两个不同的工具,分别在编写代码和生成可执行文件的过程中起着不同的作用。下面是它们的详细介绍: 1. 编辑器(Editor) 编辑器 是用来编写和编辑代码的工具。C 代码就是通过编辑器编写…...
计算机视觉(二)—— MDPI特刊推荐
特刊征稿 01 期刊名称: Applied Computer Vision and Pattern Recognition: 2nd Volume 截止时间: 摘要提交截止日期:2024年10月30日 投稿截止日期:2024年12月30日 目标及范围: 包括但不限于以下领域:…...
交叉编译工具链的安装及带wiringPi库的交叉编译实现
交叉编译工具链的安装及带wiringPi库的交叉编译实现 交叉编译的概念交叉编译工具链的安装下载交叉编译工具链配置环境遍变量编译程序到ARM平台 带wiringPi库的交叉编译下载编译wiringPi库调用树莓派的wringPi库 交叉编译的概念 交叉编译是在一个平台上生成另一个平台上的可执行…...
java: 程序包org.junit.jupiter.api不存在
明明idea没有报错,引用包也没问题,为啥提示java: 程序包org.junit.jupiter.api不存在? 配置!还TMD是配置! 如果是引用包的版本不对或者其他,直接就是引用报错或者pom里面飘红了。 这个应该是把generat…...
代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯
代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯 1.动态规划理论基础 动态规划,英文:Dynamic Programming,简称DP,如果某一问题有很多重叠子问题…...
为什么矩阵特征值之和等于主对角线元素之和,特征值乘积等于行列式值
首先给出特征值和特征向量的定义。 设A是n阶矩阵,如果数λ和n维非零向量x使关系式 Axλx (1) 成…...
学生学籍管理系统可行性分析报告
引言 一、编写目的 随着科学技术的不断提高,计算机科学日渐成熟,其强大的功能已为人们深刻认识,它已进入人类社会的各个领域并发挥着越来越重要的作用。而学籍管理系统软件,可广泛应用于全日制大、中小学及其他各类学校,系统涵盖了小学、初中、高中学籍…...
C#排序算法新境界:深度剖析与高效实现基数排序
基数排序(Radix Sort)是一种非比较型整数排序算法,其原理是将整数按位数切割成不同的数字,然后按每个位数进行比较。具体来说,基数排序有两种方法: 最低位优先(LSD, Least Significant Digit f…...
玩机搞机-----如何简单的使用ADB指令来卸载和冻结系统应用 无需root权限 详细操作图示教程
同类博文: 玩机搞机---卸载内置软件 无root权限卸载不需要的软件 安全卸载_无需root卸载彻底内置软件-CSDN博客 在很多时候我们需要卸载一些系统级的app。但如果直接手机端进行卸载的话。是无法正常卸载的。其实我们可以通过有些成品工具或者完全靠ADB指令来进行卸…...
如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实…...
打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵
附源码!!! 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中,我们将展示如何制作一个结合民国风格和…...
豆包MarsCode编程助手:产品功能解析与应用场景探索!
随着现代技术的不断进化升级,人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目,代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率,许多智能编程工具应运而生,豆包MarsCode 编程助手就是…...
爬虫全网抓取
爬虫全网抓取是指利用网络爬虫技术,通过自动化的方式遍历互联网上各个网站、论坛、博客等,从这些网页中提取所需的数据。它通常涉及以下几个步骤: 目标设定:确定要抓取哪些类型的网页内容,比如新闻、商品信息、用户评论…...
【计算机组成原理】详细解读带符号整数在计算机中的运算
有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好,很高兴又和大家见面啦!!! 经过前面的介绍,我们已经初步认识了有符号整数的三种表示形式&…...
vue3常见的bug 修复bug
Vue 3 作为 Vue.js 的最新版本,在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而,就像任何软件框架一样,Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题: 响应式系统相关的问题&…...
C++课程笔记 类和对象
类概念 结构体:只要属性 类:有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
