Kafka+PostgreSql,构建一个总线服务
之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。
环境安装
安装Kafka
官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。
我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可
下载安装包
注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。

wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
安装
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
这里安装完成后的路径是这样子的

重点关注的就是bin,config和logs这3个目录。
启动服务
官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper
先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
在启动kafka服务
bin/kafka-server-start.sh config/server.properties
后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇

启动后控制台的输出是这样

这样,一个kafka的服务节点就启动了。
对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。
*端口转发(仅WSL2环境)
在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,
netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79
后面那个ip地址就写宿主机给WSL环境下发的地址

此外,宿主机和wsl环境都放开9092(或者你设置的)端口
链接测试
这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令

本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可

至此,一个本地的Kafka节点就基本配置完成了
安装PostgreSql
这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT
也可以用docker,快速部署一个节点做本地的测试。
docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
开发测试
新建项目
这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。
总之创建完成后,我的项目长这样

安装依赖
我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。
这里我的项目文件长这样
<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net8.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Dapper" Version="2.1.35" /><PackageReference Include="DotNetCore.CAP" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /></ItemGroup></Project>
注入服务
这里主要注入pg和Kafka
builder.Services.AddCap(x =>
{x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");x.UseKafka("localhost:9092");x.UseDashboard();
});
测试的业务代码
在常规的controller中注入服务
public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher = capPublisher;}
}
写一个生产者接口
public async Task<IActionResult> Producer()
{Console.WriteLine("生产者发布消息: " + DateTime.Now);await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();
}
再写一个延时发送消息的生产者接口
public async Task<IActionResult> ProducerDelay()
{Console.WriteLine("生产者发布延时消息: " + DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);return Ok();
}
创建消费者
[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{Console.WriteLine("订阅到消息: " + value);
}
我们访问下接口看下控制台的打印效果

可以看到,订阅到的时间和生产者发送的实际是一致的。
再试下延时发送

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。
我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。
再看下PostG里保存的消息记录
这是生产记录

这是消费记录

注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。
CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。
好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。
小总结
实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。
至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。
更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。
好了,就这些吧。
相关文章:
Kafka+PostgreSql,构建一个总线服务
之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续…...
电脑怎么录屏?四款录屏工具分享
作为一个刚刚踏入视频创作领域的新手,我一直在寻找一款适合自己的录屏软件。最近,我尝试了四款市面上比较热门的录屏工具。今天,就让我来分享一下我的使用体验,希望能给同样在寻找录屏软件的朋友们一些参考。 一、福昕录屏大师 …...
Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计
文章目录 Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计0. 前言1. XXE代码审计【有1处】1.1. 搜索JRXmlLoader1.1.1. JRAntApiWriteTask1.1.2. JRAntUpdateTask1.1.3. TableReportContextXmlRule1.1.4. JasperCompileManager【存在漏洞】 1.2. 搜索XMLReader1.2…...
Leetcode 每日一题:Word Ladder
写在前面: 今天我们来看一道图论的题,这道题目是我做过目前最难与图论联想到的一道题目之一。如果没有提示的话,我们很容易往 dp 等解决 array 问题的方向去解决它,经过我超过 2个小时的思考我觉得这种方向是没前途的~…...
c++ 编辑器 和 编译器 的详细解释
在 C 开发中,编辑器 和 编译器 是两个不同的工具,分别在编写代码和生成可执行文件的过程中起着不同的作用。下面是它们的详细介绍: 1. 编辑器(Editor) 编辑器 是用来编写和编辑代码的工具。C 代码就是通过编辑器编写…...
计算机视觉(二)—— MDPI特刊推荐
特刊征稿 01 期刊名称: Applied Computer Vision and Pattern Recognition: 2nd Volume 截止时间: 摘要提交截止日期:2024年10月30日 投稿截止日期:2024年12月30日 目标及范围: 包括但不限于以下领域:…...
交叉编译工具链的安装及带wiringPi库的交叉编译实现
交叉编译工具链的安装及带wiringPi库的交叉编译实现 交叉编译的概念交叉编译工具链的安装下载交叉编译工具链配置环境遍变量编译程序到ARM平台 带wiringPi库的交叉编译下载编译wiringPi库调用树莓派的wringPi库 交叉编译的概念 交叉编译是在一个平台上生成另一个平台上的可执行…...
java: 程序包org.junit.jupiter.api不存在
明明idea没有报错,引用包也没问题,为啥提示java: 程序包org.junit.jupiter.api不存在? 配置!还TMD是配置! 如果是引用包的版本不对或者其他,直接就是引用报错或者pom里面飘红了。 这个应该是把generat…...
代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯
代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯 1.动态规划理论基础 动态规划,英文:Dynamic Programming,简称DP,如果某一问题有很多重叠子问题…...
为什么矩阵特征值之和等于主对角线元素之和,特征值乘积等于行列式值
首先给出特征值和特征向量的定义。 设A是n阶矩阵,如果数λ和n维非零向量x使关系式 Axλx (1) 成…...
学生学籍管理系统可行性分析报告
引言 一、编写目的 随着科学技术的不断提高,计算机科学日渐成熟,其强大的功能已为人们深刻认识,它已进入人类社会的各个领域并发挥着越来越重要的作用。而学籍管理系统软件,可广泛应用于全日制大、中小学及其他各类学校,系统涵盖了小学、初中、高中学籍…...
C#排序算法新境界:深度剖析与高效实现基数排序
基数排序(Radix Sort)是一种非比较型整数排序算法,其原理是将整数按位数切割成不同的数字,然后按每个位数进行比较。具体来说,基数排序有两种方法: 最低位优先(LSD, Least Significant Digit f…...
玩机搞机-----如何简单的使用ADB指令来卸载和冻结系统应用 无需root权限 详细操作图示教程
同类博文: 玩机搞机---卸载内置软件 无root权限卸载不需要的软件 安全卸载_无需root卸载彻底内置软件-CSDN博客 在很多时候我们需要卸载一些系统级的app。但如果直接手机端进行卸载的话。是无法正常卸载的。其实我们可以通过有些成品工具或者完全靠ADB指令来进行卸…...
如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实…...
打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵
附源码!!! 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中,我们将展示如何制作一个结合民国风格和…...
豆包MarsCode编程助手:产品功能解析与应用场景探索!
随着现代技术的不断进化升级,人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目,代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率,许多智能编程工具应运而生,豆包MarsCode 编程助手就是…...
爬虫全网抓取
爬虫全网抓取是指利用网络爬虫技术,通过自动化的方式遍历互联网上各个网站、论坛、博客等,从这些网页中提取所需的数据。它通常涉及以下几个步骤: 目标设定:确定要抓取哪些类型的网页内容,比如新闻、商品信息、用户评论…...
【计算机组成原理】详细解读带符号整数在计算机中的运算
有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好,很高兴又和大家见面啦!!! 经过前面的介绍,我们已经初步认识了有符号整数的三种表示形式&…...
vue3常见的bug 修复bug
Vue 3 作为 Vue.js 的最新版本,在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而,就像任何软件框架一样,Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题: 响应式系统相关的问题&…...
C++课程笔记 类和对象
类概念 结构体:只要属性 类:有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...
美胸-年美-造相Z-Turbo在网络安全领域的创新应用:恶意代码可视化分析
美胸-年美-造相Z-Turbo在网络安全领域的创新应用:恶意代码可视化分析 1. 当安全分析遇上图像生成:一个意想不到的跨界组合 最近在调试一个自动化威胁分析流程时,我偶然发现了一个有趣的现象:当把一段混淆后的JavaScript恶意代码…...
【部署】windows下虚拟机OpenClaw Ubuntu 24.04.4 安装指南
未来已来,只需一句指令,养龙虾专栏导航,持续更新ing… 概述 前置环境:win10/11、vmware等虚拟机(安装时注意勾选VMware Tools、cpu可以分配2C,内存建议4G,硬盘空间建议给40G) 系统要求 Node.js 22+:安装脚本可自动检测并安装(下文补充手动安装方案); Ubuntu 24.0…...
leOS2:基于看门狗定时器的轻量级嵌入式调度器
1. leOS2:基于看门狗定时器的轻量级嵌入式调度器 leOS2(little embedded Operating System 2)是一个专为资源受限的8位AVR微控制器设计的极简实时调度器。它不依赖于通用定时器(如Timer0/Timer1),而是创造…...
Python 3.15 JIT不是“可选优化”——而是CPython官方首次强制嵌入的LLVM后端(2024 Q3起新项目默认启用)
第一章:Python 3.15 JIT 的历史定位与架构革命Python 3.15 标志着 CPython 运行时的一次范式跃迁——它首次将生产就绪的、默认启用的即时编译(JIT)引擎深度集成至解释器核心,而非作为外部补丁或实验性分支存在。这一设计终结了自…...
1815《中国城市统计年鉴》面板数据(1985-2024)
1、搜说数据皮皮侠2、使用兑换码 516004233462b5Qy0SoHIf26 获取注意:兑换码2026.3.30(不包括30号)前有效!数据简介《中国城市统计年鉴》是国家统计局城市社会经济调查司主办的、全面反映中国城市经济和社会发展情况的资料性年刊。…...
SVPWM/AZSPWM的simulink仿真 AZSPWM(Advanced Zero Se...
SVPWM/AZSPWM的simulink仿真 AZSPWM(Advanced Zero Sequence Pulse Width Modulation,先进零序脉宽调制)是一种改进的脉宽调制技术,主要应用于三相逆变器中,通过引入零序分量来优化输出电压的波形和性能。 AZSPWM的目标…...
PDF24 Creator离线版隐藏技巧:5个连官网都没说的自动化妙用
PDF24 Creator离线版隐藏技巧:5个连官网都没说的自动化妙用 如果你经常需要处理PDF文档,可能已经听说过PDF24 Creator这款免费工具。但大多数人仅仅停留在基础功能的使用上,比如简单的PDF合并、分割或转换。今天我要分享的是PDF24 Creator离线…...
全向轮底盘运动控制:嵌入式PID与逆运动学实现
1. 全向轮底盘控制库(omni_wheel)技术解析与工程实践1.1 项目背景与工程定位omni_wheel是为B团队自主移动机器人开发的底层运动控制模块,最初版本发布于2018年7月10日。从其原始README描述“PIDかけて一方向に進むだけのプログラムでござんす…...
从‘偏差-方差’到一行代码:用NumPy/PyTorch五步实现GAE,附PPO实战避坑点
从‘偏差-方差’到一行代码:用NumPy/PyTorch五步实现GAE,附PPO实战避坑点 强化学习中的策略优化常常面临一个核心挑战:如何准确评估动作的价值?广义优势估计(GAE)通过巧妙平衡偏差与方差,成为PP…...
如何高效使用AsrTools:快速上手指南与实用功能详解
如何高效使用AsrTools:快速上手指南与实用功能详解 【免费下载链接】AsrTools ✨ AsrTools: Smart Voice-to-Text Tool | Efficient Batch Processing | User-Friendly Interface | No GPU Required | Supports SRT/TXT Output | Turn your audio into accurate tex…...
