当前位置: 首页 > article >正文

centos7.9使用docker-compose安装kafka

docker-compose配置文件

services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0.1hostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1kafka-manager:image: hlebalbau/kafka-manager:stablecontainer_name: kafka-managerdepends_on:- zookeeperports:- "9002:9000"environment:ZK_HOSTS: "zookeeper:2181"kAFKA_BROKERS: 192.168.1.85:9092KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaTestController {private final KafkaProducerService kafkaProducerService;public KafkaTestController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessageToKafka(@RequestParam(value = "topic", defaultValue = "test-topic") String topic,@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {kafkaProducerService.sendMessage(topic, message);return "消息已发送: " + message + " 到主题: " + topic;}
}

生产者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送成功: " + message +", 分区: " + result.getRecordMetadata().partition() +", 偏移量: " + result.getRecordMetadata().offset());} else {System.err.println("消息发送失败: " + ex.getMessage());}});}
}

消费者监听

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {// 监听指定的主题,groupId用于区分不同的消费者组@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")public void consumeMessage(ConsumerRecord<String, String> record) {System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 这里可以添加你的业务逻辑处理}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yykj</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka_demo</name><description>kafka_demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

kafka的启动:

tiup cdc cli changefeed create   --server=http://192.168.1.85:8300   --changefeed-id="kafka-debezium"   --sink-uri="kafka://192.168.1.85:9092/test-tidbmessage?protocol=debezium&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"   --config=config-cdc.yml

config-cdc.yml配置

force-replicate=true
[filter]
# 只同步 law 数据库下的三张表
rules = ['law.sys_dict', 'law.sys_user', 'law.sys_role']

相关文章:

centos7.9使用docker-compose安装kafka

docker-compose配置文件 services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0…...

ETL 工具与数据中台的关系与区别

ETL 工具和数据中台作为数据处理领域的关键概念&#xff0c;虽然存在一定的关联&#xff0c;但二者有着明显的区别。本文将深入剖析 ETL 工具与数据中台之不同。 一、ETL 工具概述 ETL 是数据仓库技术中的核心技术之一&#xff0c;其全称为 Extract&#xff08;抽取&#xff…...

SQLMesh Typed Macros:让SQL宏更强大、更安全、更易维护

在SQL开发中&#xff0c;宏&#xff08;Macros&#xff09;是一种强大的工具&#xff0c;可以封装重复逻辑&#xff0c;提高代码复用性。然而&#xff0c;传统的SQL宏往往缺乏类型安全&#xff0c;容易导致运行时错误&#xff0c;且难以维护。SQLMesh 引入了 Typed Macros&…...

DeepSpeed-Ulysses:支持极长序列 Transformer 模型训练的系统优化方法

DeepSpeed-Ulysses&#xff1a;支持极长序列 Transformer 模型训练的系统优化方法 flyfish 名字 Ulysses “Ulysses” 和 “奥德修斯&#xff08;Odysseus&#xff09;” 指的是同一人物&#xff0c;“Ulysses” 是 “Odysseus” 的拉丁化版本 《尤利西斯》&#xff08;詹姆…...

Docker 使用镜像[SpringBoot之Docker实战系列] - 第537篇

历史文章&#xff08;文章累计530&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…...

解锁MCP:AI大模型的万能工具箱

摘要&#xff1a;MCP&#xff08;Model Context Protocol&#xff0c;模型上下文协议&#xff09;是由Anthropic开源发布的一项技术&#xff0c;旨在作为AI大模型与外部数据和工具之间沟通的“通用语言”。它通过标准化协议&#xff0c;让大模型能够自动调用外部工具完成任务&a…...

Error in beforeDestroy hook: “Error: [ElementForm]unpected width “

使用 element 的 form 时候报错&#xff1a; vue.runtime.esm.js:3065 Error: [ElementForm]unpected width at VueComponent.getLabelWidthIndex (element-ui.common.js:23268:1) at VueComponent.deregisterLabelWidth (element-ui.common.js:23281:1) at Vue…...

vscode包含工程文件路径

在 VSCode 中配置 includePath 以自动识别并包含上层目录及其所有子文件夹&#xff0c;需结合通配符和相对/绝对路径实现。以下是具体操作步骤及原理说明&#xff1a; 1. 使用通配符 ** 递归包含所有子目录 在 c_cpp_properties.json 的 includePath 中&#xff0c;${workspac…...

私有知识库 Coco AI 实战(七):摄入本地 PDF 文件

是否有些本地文件要检索&#xff1f;没问题。我们先对 PDF 类的文件进行处理&#xff0c;其他的文件往后稍。 Coco Server Token 创建一个 token 备用。 PDF_Reader 直接写个 python 程序解析 PDF 内容&#xff0c;上传到 Coco Server 就行了。还记得以前都是直接写入 Coco …...

GitLab 18.0 正式发布,15.0 将不再受技术支持,须升级【二】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…...

NtfsLookupAttributeByName函数分析之和Scb->AttributeName的关系

第一部分&#xff1a; VOID FindFirstIndexEntry ( IN PIRP_CONTEXT IrpContext, IN PSCB Scb, IN PVOID Value, IN OUT PINDEX_CONTEXT IndexContext ) { 。。。。。。 // // Lookup the attribute record from the Scb. // if (!NtfsLookupAt…...

STM32H7系列USART驱动区别解析 stm32h7xx_hal_usart.c与stm32h7xx_ll_usart.c的区别?

在STM32H7系列中&#xff0c;stm32h7xx_hal_usart.c和stm32h7xx_ll_usart.c是ST提供的两种不同层次的USART驱动程序&#xff0c;主要区别在于设计理念、抽象层次和使用场景&#xff1a; 1. HAL库&#xff08;Hardware Abstraction Layer&#xff09; 文件&#xff1a;stm32h7x…...

网络原理 | TCP与UDP协议的区别以及回显服务器的实现

目录 TCP与UDP协议的区别 基于 UDP 协议实现回显服务器 UDP Socket 编程常用 Api UDP 服务器 UDP 客户端 基于 TCP 协议实现回显服务器 TCP Socket 编程常用 Api TCP 服务器 TCP 客户端 TCP 服务端常见的 bug 客户端发送数据后&#xff0c;没有响应 服务器仅支持…...

IP动态伪装开关

IP动态伪装开关 在OpenWrt系统中&#xff0c;IP动态伪装&#xff08;IP Masquerading&#xff09;是一种网络地址转换&#xff08;NAT&#xff09;技术&#xff0c;用于在私有网络和公共网络之间转换IP地址。它通常用于允许多个设备共享单个公共IP地址访问互联网。以下是关于O…...

【Unity3D】将自动生成的脚本包含到C#工程文件中

我们知道&#xff0c;在用C#开发中&#xff0c;通过vs编辑器新建的脚本&#xff0c;会自动包含到vs工程中&#xff0c;而通过外部创建&#xff0c;比如复制别的工程或代码创建的C#脚本不会包含到vs工程。 在我们的日常开发中&#xff0c;通常会自动创建C#脚本&#xff0c;特别…...

解决leetcode第3509题.最大化交错和为K的子序列乘积

3509.最大化交错和为K的子序列乘积 难度&#xff1a;困难 问题描述&#xff1a; 给你一个整数数组nums和两个整数k与limit&#xff0c;你的任务是找到一个非空的子序列&#xff0c;满足以下条件&#xff1a; 它的交错和等于k。 在乘积不超过limit的前提下&#xff0c;最大…...

【Python 深度学习】1D~3D iou计算

一维iou 二维 import numpy as npdef iou_1d(set_a, set_b):# 获得集合A和B的边界 x1, x2 set_ay1, y2 set_b# 计算交集的上下界low max(x1,y1)high - min(x2, y2)# 计算交集if high - low < 0:inter 0else:inter high - low# 计算并集union (x2 -x1) (y2 - y1) - in…...

java23

1.美化界面 添加背景图片 所以我们添加背景图片要放在后面添加 添加图片边框 绝对路径&#xff1a; 相对(模块)路径&#xff1a; 第一个是绝对路径&#xff0c;第二个是相对路径&#xff0c;但是斜杠的方向不对 总结&#xff1a; 2.图片移动 先实现KeyListener接口&#xf…...

嵌入式工程师常用软件

1、 Git Git 是公司常用的版本管理工具&#xff0c;人人都要会。在线的 git 教程可以参考菜鸟教程&#xff1a; https://www.runoob.com/git/git-tutorial.html 电子书教程请在搜索栏搜索&#xff1a; git Git 教程很多&#xff0c;常用的命令如下&#xff0c;这些命令可…...

LitCTF2025 WEB

星愿信箱 使用的是python&#xff0c;那么大概率是ssti注入 测试{{5*5}} 发现需要包含文字&#xff0c;那么添加文字 可以看到被waf过滤了&#xff0c;直接抓包查看参数上fenjing 可以看到这里是json格式&#xff0c;其实fenjing也是支持json格式的 https://github.com/Marv…...

Redisson WatchDog会一直续期吗?

取决于加锁的方式。 Lock 方法有2种形式&#xff0c;如果指定了leaseTime &#xff08;且不为-1&#xff09;&#xff0c; 不会启用watchDog机制. 如果没有指定leaseTime&#xff0c; 则会启动watchDog机制&#xff0c;且会一直续期&#xff0c;除非线程宕调或者续期失败。 p…...

Linux 下VS Code 的使用

这里以创建helloworld 为例。 Step 0:准备工作&#xff1a; Install Visual Studio Code. Install the C extension for VS Code. You can install the C/C extension by searching for c in the Extensions view (CtrlShiftX). Step 1: 创建工作目录 helloworld&#xff0…...

Android开发namespace奇葩bug

Android开发namespace奇葩bug namespace "com.yibanxxx.yiban"buildFeatures {buildConfig true}namespace 对应你的module的清单下的package...

watchEffect

在处理复杂异步逻辑时&#xff0c;Vue 3 的 watchEffect 相比传统的 watch 具有以下优势&#xff1a; 1. 自动追踪依赖 watchEffect 会自动收集其回调中使用的所有响应式依赖&#xff0c;无需手动指定监听源&#xff1a; import { ref, watchEffect } from vue;const count …...

Qt 布局管理器的层级关系

1、HomeWidget.h头文件&#xff1a; #ifndef HOMEWIDGET_H #define HOMEWIDGET_H#include <QWidget> #include <QPushButton> #include <QVBoxLayout> #include <QHBoxLayout>class HomeWidget : public QWidget {Q_OBJECTpublic:HomeWidget(QWidget …...

Android 之 kotlin 语言学习笔记一

参考官方文档&#xff1a;https://developer.android.google.cn/kotlin/learn?hlzh-cn 1、变量声明 Kotlin 使用两个不同的关键字&#xff08;即 val 和 var&#xff09;来声明变量。 val 用于值从不更改的变量。使用 val 声明的变量无法重新赋值。var 用于值可以更改的变量…...

maven模块化开发

使用方法 将项目安装到本地仓库 mvn install 的作用 运行 mvn install 时&#xff0c;Maven 会执行项目的整个构建生命周期&#xff08;包括 compile、test、package 等阶段&#xff09;&#xff0c;最终将构建的 artifact 安装到本地仓库&#xff08;默认路径为 ~/.m2/repos…...

为什么要使用stream流

总的来说就是 它支持链式调用&#xff0c;方便 不会修改原始数据源&#xff0c;而是生成一个新的流或结果 中间操作不会立即执行&#xff0c;只有在终端操作触发时才会真正执行 注意事项 无状态操作&#xff1a;Stream 操作应该是无状态的&#xff0c;不要依赖外部变量的状…...

语义分割的image

假设图像的尺寸为 3x3&#xff0c;并且是 RGB 图像&#xff08;有 3 个通道&#xff09;。每个通道的像素值范围为 [0, 1]&#xff0c;我们将构造一个 batch_size 2 的图像批次。 Image: tensor([[[[0.1347, 0.4583, 0.7102], # 第一张图像的红色通道[0.1774, 0.0328, 0.308…...

云原生安全之网络IP协议:从基础到实践指南

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 IP协议&#xff08;Internet Protocol&#xff09;是互联网通信的核心协议族之一&#xff0c;负责在设备间传递数据包。其核心特性包括&…...