当前位置: 首页 > news >正文

Kafka+PostgreSql,构建一个总线服务

之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。

环境安装

安装Kafka

官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。

我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可

下载安装包

注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。

wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz

安装

tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

这里安装完成后的路径是这样子的

重点关注的就是bin,config和logs这3个目录。

启动服务

官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper

先启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

在启动kafka服务

bin/kafka-server-start.sh config/server.properties

后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇

启动后控制台的输出是这样

这样,一个kafka的服务节点就启动了。

对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。

*端口转发(仅WSL2环境)

在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79

后面那个ip地址就写宿主机给WSL环境下发的地址

此外,宿主机和wsl环境都放开9092(或者你设置的)端口

链接测试

这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令

本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可

至此,一个本地的Kafka节点就基本配置完成了

安装PostgreSql

这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT

也可以用docker,快速部署一个节点做本地的测试。

docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

开发测试

新建项目

这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。

总之创建完成后,我的项目长这样

安装依赖

我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。

这里我的项目文件长这样

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net8.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Dapper" Version="2.1.35" /><PackageReference Include="DotNetCore.CAP" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /></ItemGroup></Project>

注入服务

这里主要注入pg和Kafka

builder.Services.AddCap(x =>
{x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");x.UseKafka("localhost:9092");x.UseDashboard();
});

测试的业务代码

在常规的controller中注入服务

public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher = capPublisher;}
}

写一个生产者接口

public async Task<IActionResult> Producer()
{Console.WriteLine("生产者发布消息: " + DateTime.Now);await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();
}

再写一个延时发送消息的生产者接口

public async Task<IActionResult> ProducerDelay()
{Console.WriteLine("生产者发布延时消息: " + DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);return Ok();
}

创建消费者

[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{Console.WriteLine("订阅到消息: " + value);
}

我们访问下接口看下控制台的打印效果

可以看到,订阅到的时间和生产者发送的实际是一致的。

再试下延时发送

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

再看下PostG里保存的消息记录

这是生产记录

这是消费记录

注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。

CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。

好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。

小总结

实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。

至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。

更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。

好了,就这些吧。

相关文章:

Kafka+PostgreSql,构建一个总线服务

之前开发的系统&#xff0c;用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层&#xff0c;最近一直在看Kafka和PostgreSql相关的知识&#xff0c;想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试&#xff0c;过程还是比较顺利的&#xff0c;后续…...

电脑怎么录屏?四款录屏工具分享

作为一个刚刚踏入视频创作领域的新手&#xff0c;我一直在寻找一款适合自己的录屏软件。最近&#xff0c;我尝试了四款市面上比较热门的录屏工具。今天&#xff0c;就让我来分享一下我的使用体验&#xff0c;希望能给同样在寻找录屏软件的朋友们一些参考。 一、福昕录屏大师 …...

Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计

文章目录 Java代码审计篇 | ofcms系统审计思路讲解 - 篇4 | XXE漏洞审计0. 前言1. XXE代码审计【有1处】1.1. 搜索JRXmlLoader1.1.1. JRAntApiWriteTask1.1.2. JRAntUpdateTask1.1.3. TableReportContextXmlRule1.1.4. JasperCompileManager【存在漏洞】 1.2. 搜索XMLReader1.2…...

Leetcode 每日一题:Word Ladder

写在前面&#xff1a; 今天我们来看一道图论的题&#xff0c;这道题目是我做过目前最难与图论联想到的一道题目之一。如果没有提示的话&#xff0c;我们很容易往 dp 等解决 array 问题的方向去解决它&#xff0c;经过我超过 2个小时的思考我觉得这种方向是没前途的&#xff5e…...

c++ 编辑器 和 编译器 的详细解释

在 C 开发中&#xff0c;编辑器 和 编译器 是两个不同的工具&#xff0c;分别在编写代码和生成可执行文件的过程中起着不同的作用。下面是它们的详细介绍&#xff1a; 1. 编辑器&#xff08;Editor&#xff09; 编辑器 是用来编写和编辑代码的工具。C 代码就是通过编辑器编写…...

计算机视觉(二)—— MDPI特刊推荐

特刊征稿 01 期刊名称&#xff1a; Applied Computer Vision and Pattern Recognition: 2nd Volume 截止时间&#xff1a; 摘要提交截止日期&#xff1a;2024年10月30日 投稿截止日期&#xff1a;2024年12月30日 目标及范围&#xff1a; 包括但不限于以下领域&#xff1a…...

交叉编译工具链的安装及带wiringPi库的交叉编译实现

交叉编译工具链的安装及带wiringPi库的交叉编译实现 交叉编译的概念交叉编译工具链的安装下载交叉编译工具链配置环境遍变量编译程序到ARM平台 带wiringPi库的交叉编译下载编译wiringPi库调用树莓派的wringPi库 交叉编译的概念 交叉编译是在一个平台上生成另一个平台上的可执行…...

java: 程序包org.junit.jupiter.api不存在

明明idea没有报错&#xff0c;引用包也没问题&#xff0c;为啥提示java: 程序包org.junit.jupiter.api不存在&#xff1f; 配置&#xff01;还TMD是配置&#xff01; 如果是引用包的版本不对或者其他&#xff0c;直接就是引用报错或者pom里面飘红了。 这个应该是把generat…...

代码随想录刷题day32丨动态规划理论基础,509. 斐波那契数, 70. 爬楼梯, 746. 使用最小花费爬楼梯

代码随想录刷题day32丨动态规划理论基础&#xff0c;509. 斐波那契数&#xff0c; 70. 爬楼梯&#xff0c; 746. 使用最小花费爬楼梯 1.动态规划理论基础 动态规划&#xff0c;英文&#xff1a;Dynamic Programming&#xff0c;简称DP&#xff0c;如果某一问题有很多重叠子问题…...

为什么矩阵特征值之和等于主对角线元素之和,特征值乘积等于行列式值

首先给出特征值和特征向量的定义。 设A是n阶矩阵&#xff0c;如果数λ和n维非零向量x使关系式 Axλx &#xff08;1&#xff09; 成…...

学生学籍管理系统可行性分析报告

引言 一、编写目的 随着科学技术的不断提高,计算机科学日渐成熟,其强大的功能已为人们深刻认识,它已进入人类社会的各个领域并发挥着越来越重要的作用。而学籍管理系统软件&#xff0c;可广泛应用于全日制大、中小学及其他各类学校&#xff0c;系统涵盖了小学、初中、高中学籍…...

C#排序算法新境界:深度剖析与高效实现基数排序

基数排序&#xff08;Radix Sort&#xff09;是一种非比较型整数排序算法&#xff0c;其原理是将整数按位数切割成不同的数字&#xff0c;然后按每个位数进行比较。具体来说&#xff0c;基数排序有两种方法&#xff1a; 最低位优先&#xff08;LSD, Least Significant Digit f…...

玩机搞机-----如何简单的使用ADB指令来卸载和冻结系统应用 无需root权限 详细操作图示教程

同类博文&#xff1a; 玩机搞机---卸载内置软件 无root权限卸载不需要的软件 安全卸载_无需root卸载彻底内置软件-CSDN博客 在很多时候我们需要卸载一些系统级的app。但如果直接手机端进行卸载的话。是无法正常卸载的。其实我们可以通过有些成品工具或者完全靠ADB指令来进行卸…...

如何通过 Apache Camel 将数据导入 Elasticsearch

作者&#xff1a;来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中&#xff0c;我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能&#xff0c;我们将实…...

打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵

附源码&#xff01;&#xff01;&#xff01; 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中&#xff0c;我们将展示如何制作一个结合民国风格和…...

豆包MarsCode编程助手:产品功能解析与应用场景探索!

随着现代技术的不断进化升级&#xff0c;人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目&#xff0c;代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率&#xff0c;许多智能编程工具应运而生&#xff0c;豆包MarsCode 编程助手就是…...

爬虫全网抓取

爬虫全网抓取是指利用网络爬虫技术&#xff0c;通过自动化的方式遍历互联网上各个网站、论坛、博客等&#xff0c;从这些网页中提取所需的数据。它通常涉及以下几个步骤&#xff1a; 目标设定&#xff1a;确定要抓取哪些类型的网页内容&#xff0c;比如新闻、商品信息、用户评论…...

【计算机组成原理】详细解读带符号整数在计算机中的运算

有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好&#xff0c;很高兴又和大家见面啦&#xff01;&#xff01;&#xff01; 经过前面的介绍&#xff0c;我们已经初步认识了有符号整数的三种表示形式&…...

vue3常见的bug 修复bug

Vue 3 作为 Vue.js 的最新版本&#xff0c;在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而&#xff0c;就像任何软件框架一样&#xff0c;Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题&#xff1a; 响应式系统相关的问题&…...

C++课程笔记 类和对象

类概念 结构体&#xff1a;只要属性 类&#xff1a;有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧

上周三&#xff0c;HubSpot宣布已构建与ChatGPT的深度集成&#xff0c;这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋&#xff0c;但同时也存在一些关于数据安全的担忧。 许多网络声音声称&#xff0c;这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?

Pod IP 的本质与特性 Pod IP 的定位 纯端点地址&#xff1a;Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址&#xff08;如 10.244.1.2&#xff09;无特殊名称&#xff1a;在 Kubernetes 中&#xff0c;它通常被称为 “Pod IP” 或 “容器 IP”生命周期&#xff1a;与 Pod …...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

链式法则中 复合函数的推导路径 多变量“信息传递路径”

非常好&#xff0c;我们将之前关于偏导数链式法则中不能“约掉”偏导符号的问题&#xff0c;统一使用 二重复合函数&#xff1a; z f ( u ( x , y ) , v ( x , y ) ) \boxed{z f(u(x,y),\ v(x,y))} zf(u(x,y), v(x,y))​ 来全面说明。我们会展示其全微分形式&#xff08;偏导…...