当前位置: 首页 > news >正文

docker安装和使用kafka

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

docker run --name zookeeper \--network app-tier \-e ALLOW_ANONYMOUS_LOGIN=yes \--restart=always \-d bitnami/zookeeper:latest

2. 启动kafka

docker run --name kafka \--network app-tier \-p 9092:9092 \-e ALLOW_PLAINTEXT_LISTENER=yes \-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092	 \--restart=always \-d bitnami/kafka:latest
命令解释
ALLOW_PLAINTEXT_LISTENER=yes任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

docker run --name kafka-map \--network app-tier \-p 9001:8080 \-v /usr/local/kafka-map/data:/usr/local/kafka-map/data \-e DEFAULT_USERNAME=admin \-e DEFAULT_PASSWORD=admin \--restart=always \-d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

    <dependencies><!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>        

配置application.yml

#------------------------------------spring----------------------------------
spring:#------------------------------------消息队列kafka配置----------------------------------kafka:#  kafka server的地址,如果有多个,使用逗号分割bootstrap-servers: localhost:9092producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。32MB的批处理缓冲区buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1properties:# 自定义拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor#自定义分区器partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitionerconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 自定义消费者拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor# 默认消费者组group-id: code-safe-group# 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)# 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟max-poll-interval-ms: 600000# 批量一次最大拉取数据量max-poll-records: 1000batch:# 批消费并发量,小于或等于Topic的分区数concurrency: 3listener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: falsetopics:# 自定义主题名称twsm: webSocket_send_message_devgroup-id: group-idtopic-name:- topic1

测试发送消息到kafka

/*** Kafka测试** @version 1.0* @author: web* @date: 2024/1/18 15:07*/
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{@Autowiredprivate KafkaUtils kafkaUtils;/*** 生产者_推送消息到kafka** @param msg* @author: web* @return: AjaxResult* @date: 2024/1/18 15:16*/@PostMapping("/send")public AjaxResult send(@RequestBody Map<String, Object> msg){try{String userId = msg.get("userId").toString();Object content = msg.get("content");Message message = kafkaUtils.setMessage(userId, content);kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);}catch (Exception e){log.error("生产者_推送消息到kafka发生异常");}return success();}/*** 消费者1** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*/@KafkaListener(topics = KafkaUtils.TOPIC_TEST)public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional message = Optional.ofNullable(record.value());if (message.isPresent()){Object msg = message.get();log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}/*** 消费者2** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*///    @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)//    public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,//                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)//    {////        Optional message = Optional.ofNullable(record.value());//        if (message.isPresent())//        {//            Object msg = message.get();//            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);//            ack.acknowledge();//        }//    }}

KafkaUtils类

/*** 生产者** @version: 1.0* @author: web* @date: 2024/1/18 10:37*/
@Component
@Slf4j
public class KafkaUtils
{@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 自定义topic*/public static final String TOPIC_TEST = "topic.code-safe";/*** 自定义消费组*/public static final String TOPIC_GROUP1 = "topic.group1";public static final String TOPIC_GROUP2 = "topic.group2";// 业务相关topic/*** 主题: webSocket发送消息到客户端*/public static String TOPIC_WEBSOCKET_SEND_MESSAGE;@Autowiredprivate String[] kafkaTopicName;/*** 获取配置文件中的盐值,并设置到静态变量中** @param topic 主题*/@Value("${spring.kafka.topics.twsm}")private void setTwsmTopic(String topic){TOPIC_WEBSOCKET_SEND_MESSAGE = topic;}/*** 发送消息** @param topic   主题* @param message 消息内容* @author: web* @return: void* @date: 2024/1/18 10:42*/public void send(String topic, Object message){if (StringUtils.isEmpty(topic) || StringUtils.isNull(message)){throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");}String obj2String = JsonUtils.toJsonString(message);//        log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>(){@Overridepublic void onFailure(Throwable throwable){//发送失败的处理log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult){//成功的处理
//                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}/*** 设置websocket发送的消息体** @param userId 用户ID* @param msg    消息内容* @author: web* @return: Message 消息对象* @date: 2024/1/19 11:36*/public Message setMessage(String userId, Object msg){Message message = new Message();message.setSendUserId(userId);message.setSendTime(DateUtils.getTime());message.setSendContent(String.valueOf(msg));return message;}
}

Message类

@Data
public class Message implements Serializable
{private static final long serialVersionUID = -118L;/*** 发送人ID*/private String sendUserId;/*** 发送人*///    private String sendUserName;/*** 发送时间*/private String sendTime;/*** 发送内容*/private String sendContent;
}

监听消息

/*** 消息接收监听器【分布式系统】** @version: 1.0* @author: web* @date: 2024/1/19 13:44*/
@Component
@Slf4j
public class MessageListener
{/*** 根据用户id发送消息到客户端** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/20 22:05*/@KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<String> optional = Optional.ofNullable(record.value());if (optional.isPresent()){Message message = JsonUtils.parseObject(optional.get(), Message.class);if (StringUtils.isNull(message)){log.error("消费者收到kafka消息的内容为空!");return;}
//            log.info("消费者收到kafka消息");String sendUserId = message.getSendUserId();String sendContent = message.getSendContent();// 确认收到消息ack.acknowledge();}}
}

相关文章:

docker安装和使用kafka

1. 启动zookeeper Kafka依赖zookeeper, 首先安装zookeeper -p&#xff1a;设置映射端口&#xff08;默认2181&#xff09; docker run --name zookeeper \--network app-tier \-e ALLOW_ANONYMOUS_LOGINyes \--restartalways \-d bitnami/zookeeper:latest2. 启动kafka docker…...

CTP-API开发系列之接口对接准备

CTP-API开发系列之接口对接准备 CTP-API开发系列之接口对接准备CTP-API文件清单CTP-API通用规则命名规则Spi与Api CTP-API通讯模式开发语言选择 CTP-API开发系列之接口对接准备 CTP-API文件清单 文件名说明ThostFtdcTraderApi.h交易接口&#xff0c;C头文件&#xff0c;包括 …...

C++小记 -链表

链表 文章目录 链表链表基础理论链表的类型单链表双链表循环链表 链表的存储方式链表的定义链表的操作添加节点删除节点 性能分析构建链表删除节点&#xff08;内存泄漏的坑&#xff09;1.直接移除2.使用虚拟头结点3.delete指针后&#xff0c;要将指针置为NULL&#xff01;&…...

网络协议学习DAY1

1.网络协议模型: OSI协议模型 应用层 实际发送的数据 表示层 发送的数据是否加密 会话层 是否建立会话连接 传输层 数据传输的方式&#xff08;数据报、流式&#xff09; 网…...

vue3中全局变量的定义和获取

在vue项目中&#xff0c;我们知道vue2定义全局变量是在main.js文件将变量挂载到vue.prototype.name"lisi"&#xff0c;在页面通过this.name去调用。但是在vue3中&#xff0c;这个定义全局变量有所改变&#xff1a; const app createApp(App) app.config.globalProp…...

1.2 数据模型 数据库系统概论

目录 1.2.1 两类数据模型 1.2.2 概念模型 1.信息世界中的基本概念 &#xff08;1&#xff09;实体 &#xff08;2&#xff09;属性 &#xff08;3&#xff09;码 &#xff08;4&#xff09;实体型 &#xff08;5&#xff09;实体集 &#xff08;6&#xff09;联系 2.…...

C#中openFileDialog 对话框不在最顶层,TopMost的异常情况

重点&#xff01;&#xff01;&#xff01;若 当前窗体this的TopMost是false&#xff0c;可以设置为true&#xff0c;这样打开的对话框就是最顶层 /// <summary> /// 设置窗体TopMost&#xff0c;缺点和其他程序ide有冲突。例如VS有断点的调试会卡死 /// </summary&g…...

信息安全与阿里云等保三级方案实践总结

信息安全在当今数字化时代变得至关重要&#xff0c;企业和组织需要采取有效措施来保护其数据和信息资产。阿里云作为中国领先的云服务提供商&#xff0c;提供了等保三级方案&#xff0c;帮助用户满足国家信息安全等级保护的要求。本文将探讨信息安全和阿里云等保三级方案的重要…...

嵌入式学习记录——线程

线程基本概念: 线程:线程是一个轻量级的进程,位于进程空间内部,一个进程中可以创建多个线程 1.线程创建: 线程独占栈空间,文本段、数据段和堆区与进程共享 2.线程调度: 与进程调度是一样的 宏观并行,微观串行 3.线程消亡: 与进程消亡是一样的 4.进程和线程…...

同步服务器操作系统公网仓库到本地 _ 统信UOS _ 麒麟KYLINOS

原文链接&#xff1a;同步服务器操作系统公网仓库到本地 | 统信UOS | 麒麟KYLINOS 在如今快速发展的信息技术时代&#xff0c;维护和更新服务器操作系统变得越来越重要。无论是为了提高安全性、增加新功能还是提升系统稳定性&#xff0c;同步公网源仓库到本地都是一个关键步骤。…...

【数仓】flume常见配置总结,以及示例

相关文章 【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用&#xff08;集群配置&#xff09;【数仓】Hadoop集群配置常用参数说明【数仓】zookeeper软件安装及集群配置【数仓】kafka软件安装及集群配置【数仓】flume软件安…...

统计信息锁定

在导入成功后我要收集下这些表的信息&#xff0c;结果发现好几张表都没法收集&#xff0c;用DBMS_STATS包显示ORA-20005&#xff1a;object statistics are locked (stattype ALL)&#xff0c;用Analyze命令显示ORA-38029&#xff1a; 对象统计信息已锁定。 解决办法很明确&a…...

光猫改为bridge模式

注意事项&#xff1a; 改成桥接模式后&#xff0c;光猫将不再拨号上网&#xff0c;建议提前记录自己的宽带账号&#xff0c;打10010申请修改自己的宽带密码。 光猫改好桥接之后&#xff0c;把宽带账号和密码输入到负责拨号上网的终端设备中&#xff0c;完成宽带PPPOE拨号设置。…...

回溯算法01-组合(Java)

1.组合 题目描述 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1a; [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4]]示例 2&#xff1a; 输入&#x…...

初始网络 --- 网络基础

目录 0、 前言 1、 计算机网络发展背景 1.1. 局域网(LAN) && 广域网(WAN) 2、 认识并理解协议 3、 初始网络协议 3.1. 协议分层 4、 TCP/IP 五层(或四层)模型 4.1. 简单了解TCP/IP层状体系 4.2. TCP/IP协议层状结构和计算机层状结构的关系 5、 OSI七层模型 …...

在Linux/Ubuntu/Debian中计算MD5,SHA256的方法

MD5&#xff08;消息摘要算法 5&#xff09;和 SHA-256&#xff08;安全哈希算法 256 位&#xff09;等流行的哈希算法广泛用于从任意数据生成固定大小的哈希值或校验和。 以下是这些算法及其计算方式的简要概述&#xff1a; MD5&#xff08;消息摘要算法5&#xff09;&#x…...

mybatis mysql insert 主键id为空

错误示范 java代码设置了param参数&#xff0c;但是sql 字段没有带上参数&#xff0c;例如 void insertV2(Param("historyDO") HistoryDO historyDO); <insert id"insertDuplicate" parameterType"com.test.entity.HistoryDO"keyProperty&…...

批次大小对ES写入性能影响初探

问题背景 ES使用bulk写入时每批次的大小对性能有什么影响&#xff1f;设置每批次多大为好&#xff1f; 一般来说&#xff0c;在Elasticsearch中&#xff0c;使用bulk API进行批量写入时&#xff0c;每批次的大小对性能有着显著的影响。具体来说&#xff0c;当批量请求的大小增…...

c语言十大核心用法

当然&#xff0c;以下是十个关于 C 语言用法的代码示例&#xff1a; 指针的基本用法&#xff1a; #include <stdio.h>int main() {int num 10;int *ptr;ptr &num;printf("The value of num is: %d\n", *ptr);return 0; }结构体的使用&#xff1a; #in…...

网页打开慢,这锅该谁背?

一、背景 工作中扯皮说不可避免且非常常见的事情. 开发与产品、开发和测试、前端和后端都会产生扯皮现象。今天要聊的一个问题就是前后端之间的扯皮问题。 网页打开太慢或者点击了某个按钮发现数据很久才显示出来&#xff0c;这个锅谁背? 做开发不能无凭据地胡乱甩锅, 我们…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

边缘计算医疗风险自查APP开发方案

核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

Java多线程实现之Thread类深度解析

Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官

。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量&#xff1a;setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...

根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要

根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分&#xff1a; 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...

Kafka主题运维全指南:从基础配置到故障处理

#作者&#xff1a;张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1&#xff1a;主题删除失败。常见错误2&#xff1a;__consumer_offsets占用太多的磁盘。 主题日常管理 …...