Kafka+PostgreSql,构建一个总线服务
之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。
环境安装
安装Kafka
官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。
我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可
下载安装包
注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。
wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
安装
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
这里安装完成后的路径是这样子的
重点关注的就是bin,config和logs这3个目录。
启动服务
官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper
先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
在启动kafka服务
bin/kafka-server-start.sh config/server.properties
后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇
启动后控制台的输出是这样
这样,一个kafka的服务节点就启动了。
对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。
*端口转发(仅WSL2环境)
在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,
netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79
后面那个ip地址就写宿主机给WSL环境下发的地址
此外,宿主机和wsl环境都放开9092(或者你设置的)端口
链接测试
这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令
本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可
至此,一个本地的Kafka节点就基本配置完成了
安装PostgreSql
这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT
也可以用docker,快速部署一个节点做本地的测试。
docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
开发测试
新建项目
这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。
总之创建完成后,我的项目长这样
安装依赖
我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。
这里我的项目文件长这样
<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net8.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Dapper" Version="2.1.35" /><PackageReference Include="DotNetCore.CAP" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /></ItemGroup></Project>
注入服务
这里主要注入pg和Kafka
builder.Services.AddCap(x =>
{x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");x.UseKafka("localhost:9092");x.UseDashboard();
});
测试的业务代码
在常规的controller中注入服务
public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher = capPublisher;}
}
写一个生产者接口
public async Task<IActionResult> Producer()
{Console.WriteLine("生产者发布消息: " + DateTime.Now);await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();
}
再写一个延时发送消息的生产者接口
public async Task<IActionResult> ProducerDelay()
{Console.WriteLine("生产者发布延时消息: " + DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);return Ok();
}
创建消费者
[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{Console.WriteLine("订阅到消息: " + value);
}
我们访问下接口看下控制台的打印效果
可以看到,订阅到的时间和生产者发送的实际是一致的。
再试下延时发送
我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。
我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。
再看下PostG里保存的消息记录
这是生产记录
这是消费记录
注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。
CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。
好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。
小总结
实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。
至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。
更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。
好了,就这些吧。
相关文章:

Kafka+PostgreSql,构建一个总线服务
之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续…...

电脑怎么录屏?四款录屏工具分享
作为一个刚刚踏入视频创作领域的新手,我一直在寻找一款适合自己的录屏软件。最近,我尝试了四款市面上比较热门的录屏工具。今天,就让我来分享一下我的使用体验,希望能给同样在寻找录屏软件的朋友们一些参考。 一、福昕录屏大师 …...

Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计
文章目录 Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计0. 前言1. XXE代码审计【有1处】1.1. 搜索JRXmlLoader1.1.1. JRAntApiWriteTask1.1.2. JRAntUpdateTask1.1.3. TableReportContextXmlRule1.1.4. JasperCompileManager【存在漏洞】 1.2. 搜索XMLReader1.2…...

Leetcode 每日一题:Word Ladder
写在前面: 今天我们来看一道图论的题,这道题目是我做过目前最难与图论联想到的一道题目之一。如果没有提示的话,我们很容易往 dp 等解决 array 问题的方向去解决它,经过我超过 2个小时的思考我觉得这种方向是没前途的~…...
c++ 编辑器 和 编译器 的详细解释
在 C 开发中,编辑器 和 编译器 是两个不同的工具,分别在编写代码和生成可执行文件的过程中起着不同的作用。下面是它们的详细介绍: 1. 编辑器(Editor) 编辑器 是用来编写和编辑代码的工具。C 代码就是通过编辑器编写…...

计算机视觉(二)—— MDPI特刊推荐
特刊征稿 01 期刊名称: Applied Computer Vision and Pattern Recognition: 2nd Volume 截止时间: 摘要提交截止日期:2024年10月30日 投稿截止日期:2024年12月30日 目标及范围: 包括但不限于以下领域:…...

交叉编译工具链的安装及带wiringPi库的交叉编译实现
交叉编译工具链的安装及带wiringPi库的交叉编译实现 交叉编译的概念交叉编译工具链的安装下载交叉编译工具链配置环境遍变量编译程序到ARM平台 带wiringPi库的交叉编译下载编译wiringPi库调用树莓派的wringPi库 交叉编译的概念 交叉编译是在一个平台上生成另一个平台上的可执行…...

java: 程序包org.junit.jupiter.api不存在
明明idea没有报错,引用包也没问题,为啥提示java: 程序包org.junit.jupiter.api不存在? 配置!还TMD是配置! 如果是引用包的版本不对或者其他,直接就是引用报错或者pom里面飘红了。 这个应该是把generat…...

代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯
代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯 1.动态规划理论基础 动态规划,英文:Dynamic Programming,简称DP,如果某一问题有很多重叠子问题…...
为什么矩阵特征值之和等于主对角线元素之和,特征值乘积等于行列式值
首先给出特征值和特征向量的定义。 设A是n阶矩阵,如果数λ和n维非零向量x使关系式 Axλx (1) 成…...

学生学籍管理系统可行性分析报告
引言 一、编写目的 随着科学技术的不断提高,计算机科学日渐成熟,其强大的功能已为人们深刻认识,它已进入人类社会的各个领域并发挥着越来越重要的作用。而学籍管理系统软件,可广泛应用于全日制大、中小学及其他各类学校,系统涵盖了小学、初中、高中学籍…...
C#排序算法新境界:深度剖析与高效实现基数排序
基数排序(Radix Sort)是一种非比较型整数排序算法,其原理是将整数按位数切割成不同的数字,然后按每个位数进行比较。具体来说,基数排序有两种方法: 最低位优先(LSD, Least Significant Digit f…...

玩机搞机-----如何简单的使用ADB指令来卸载和冻结系统应用 无需root权限 详细操作图示教程
同类博文: 玩机搞机---卸载内置软件 无root权限卸载不需要的软件 安全卸载_无需root卸载彻底内置软件-CSDN博客 在很多时候我们需要卸载一些系统级的app。但如果直接手机端进行卸载的话。是无法正常卸载的。其实我们可以通过有些成品工具或者完全靠ADB指令来进行卸…...

如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实…...

打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵
附源码!!! 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中,我们将展示如何制作一个结合民国风格和…...

豆包MarsCode编程助手:产品功能解析与应用场景探索!
随着现代技术的不断进化升级,人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目,代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率,许多智能编程工具应运而生,豆包MarsCode 编程助手就是…...
爬虫全网抓取
爬虫全网抓取是指利用网络爬虫技术,通过自动化的方式遍历互联网上各个网站、论坛、博客等,从这些网页中提取所需的数据。它通常涉及以下几个步骤: 目标设定:确定要抓取哪些类型的网页内容,比如新闻、商品信息、用户评论…...

【计算机组成原理】详细解读带符号整数在计算机中的运算
有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好,很高兴又和大家见面啦!!! 经过前面的介绍,我们已经初步认识了有符号整数的三种表示形式&…...
vue3常见的bug 修复bug
Vue 3 作为 Vue.js 的最新版本,在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而,就像任何软件框架一样,Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题: 响应式系统相关的问题&…...

C++课程笔记 类和对象
类概念 结构体:只要属性 类:有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

spring Security对RBAC及其ABAC的支持使用
RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型,它将权限分配给角色,再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...
数据库正常,但后端收不到数据原因及解决
从代码和日志来看,后端SQL查询确实返回了数据,但最终user对象却为null。这表明查询结果没有正确映射到User对象上。 在前后端分离,并且ai辅助开发的时候,很容易出现前后端变量名不一致情况,还不报错,只是单…...

CTF show 数学不及格
拿到题目先查一下壳,看一下信息 发现是一个ELF文件,64位的 用IDA Pro 64 打开这个文件 然后点击F5进行伪代码转换 可以看到有五个if判断,第一个argc ! 5这个判断并没有起太大作用,主要是下面四个if判断 根据题目…...

python可视化:俄乌战争时间线关键节点与深层原因
俄乌战争时间线可视化分析:关键节点与深层原因 俄乌战争是21世纪欧洲最具影响力的地缘政治冲突之一,自2022年2月爆发以来已持续超过3年。 本文将通过Python可视化工具,系统分析这场战争的时间线、关键节点及其背后的深层原因,全面…...

英国云服务器上安装宝塔面板(BT Panel)
在英国云服务器上安装宝塔面板(BT Panel) 是完全可行的,尤其适合需要远程管理Linux服务器、快速部署网站、数据库、FTP、SSL证书等服务的用户。宝塔面板以其可视化操作界面和强大的功能广受国内用户欢迎,虽然官方主要面向中国大陆…...