kafka使用入门案例与踩坑记录
每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤:
Docker镜像形式启动
zookeeper启动
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka启动
docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=150.158.16.123:12348 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.16.123:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d wurstmeister/kafka
进入kafka容器
docker exec -it [容器id] /bin/bash
创建topic
进入容器,在/opt/kafka_2.13-2.8.1/bin 目录下创建topic
./kafka-topics.sh --create --zookeeper 150.158.16.123:12348 --replication-factor 1 --partitions 1 --topic mykafka
./kafka-topics.sh --create --zookeeper 150.158.16.123:2181 --replication-factor 1 --partitions 1 --topic mykafka
运行生产者

运行消费者

单机形式启动
前提
1、Linux 机器
2、环境已准备好JDK,如果还没有装,推荐用yum一键安装
yum install -y java-1.8.0-openjdk.x86_64
检验:
[root@localhost ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08)
OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
3、将kafka压缩包上传到你的Linux
配置文件关注config目录下的zookeeper.properties和server.properties,启动服务时要指定
配置-启动
有默认配置,可不做修改(有需要可以自定义启动端口和数据存放位置等参数)
1、先启动自带的 Zookeeper:
[root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:14:52,759] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,766] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2023-02-26 14:14:52,783] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,784] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2023-02-26 14:14:52,796] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.version=1.8.0_362 (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
(省略大部分)
2、启动 Kafka
[root@localhost kafka_2.12-2.3.0]# bin/kafka-server-start.sh config/server.properties
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:16:00,261] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-26 14:16:01,004] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-02-26 14:16:01,024] INFO starting (kafka.server.KafkaServer)
[2023-02-26 14:16:01,025] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-26 14:16:01,068] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-02-26 14:16:01,072] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.version=1.8.0_362 (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.ZooKeeper)
(省略大部分)
上述步骤只要启动过程没有报错信息,一般是没有问题的
测试
1、创建个topic
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.154.134:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
2、查看topic列表
[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.154.134:2181 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
test
3、启动生产者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.154.134:9092 --topic test
>hi
>什么意思啊
4、启动消费者
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.154.134:9092 --topic test
hi
什么意思啊
正常启动,OK!
可视化:kafka-manager
镜像下载
docker pull sheepkiller/kafka-manager
运行容器
docker run -d --name kafka-manager -p 12349:9000 --link zookeeper --link kafka01 --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager
然后访问对应的IP:端口即可进入管理页面
注意:
ZK_HOSTS后面在web页面上要用到!
管理界面
进入主页面后,点击 Add Cluster 添加集群信息

然后填写配置信息,主要填写集群名称,Zookeeper的Hosts,还有指定kafka版本(选个跟你所使用的kafka版本号最接近的就行),其他的一些配置按默认的就行。
当你正确连接上以后,就能看到你的集群啦,如:


更多关于kafka可视化操作就由你慢慢探索吧!这里将你引进门!
注意:
如果你在启动
kafka manager这个容器时指定了ZK_HOSTS,那么Cluster Zookeeper Hosts这项填的内容要和ZK_HOSTS一致,否则会出现连接不上,连接超时等情况。如下图:
另外有些配置默认值是1,但是你得将其改成1以上的整数,否则不能正确保存提交。如:
注意
kafka版本不同,响应的api有区别
本版本是2.11
注意3.x是 --bootstrap-server localhost:9092方式新建,kafka2.x是以–zookeeper方式创建。下面查看新建的topic。


奇葩问题
1.重启docker失败?
[root@localhost ~]# systemctl restart docker
Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.
[root@localhost ~]# journalctl -xe
-- The result is failed.
2月 22 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
2月 22 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
2月 22 02:01:53 localhost.localdomain systemd[1]: docker.service failed.
2月 22 02:01:55 localhost.localdomain systemd[1]: docker.service holdoff time over, scheduling restart.
2月 22 02:01:55 localhost.localdomain systemd[1]: Stopped Docker Application Container Engine.
-- Subject: Unit docker.service has finished shutting down
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit docker.service has finished shutting down.
2月 22 02:01:55 localhost.localdomain systemd[1]: start request repeated too quickly for docker.service
2月 22 02:01:55 localhost.localdomain systemd[1]: Failed to start Docker Application Container Engine.
-- Subject: Unit docker.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit docker.service has failed.
--
-- The result is failed.
原因:修改文件/etc/docker/daemon.json时不规范,可能存在空格什么的
解决:
[root@localhost ~]# cat <<EOF >/etc/docker/daemon.json
> {
> "registry-mirrors": ["https://registry.docker-cn.com"]
> }
> EOF
[root@localhost ~]# cat /etc/docker/daemon.json
{
"registry-mirrors": ["https://registry.docker-cn.com"]
}
[root@localhost ~]#
[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl restart docker
2.查询镜像无果?
[root@localhost ~]# docker search kafka
Error response from daemon: Get "https://index.docker.io/v1/search?q=kafka&n=25": x509: certificate has expired or is not yet valid: current time 2023-02-22T02:08:25+08:00 is before 2023-02-22T00:00:00Z
原因:虚拟机时间与外部时间不一致
解决:
[root@localhost ~]# date
2023年 02月 22日 星期三 02:09:50 CST
[root@localhost ~]# ntpdate cn.pool.ntp.org
26 Feb 13:31:38 ntpdate[44996]: step time server 119.28.206.193 offset 386475.634457 sec
[root@localhost ~]# date
2023年 02月 26日 星期日 13:31:48 CST
[root@localhost ~]# docker search kafka
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
bitnami/kafka Apache Kafka is a distributed streaming plat… 615 [OK]
ubuntu/kafka Apache Kafka, a distributed event streaming … 25
bitnami/kafka-exporter 9
ibmcom/kafka Docker Image for IBM Cloud Private-CE (Commu… 6
bitnami/kafka-trigger-controller Source for this controller is in the kubeles… 5
ibmcom/kafka-python-console-sample Docker image for the IBM Event Streams Pytho… 2
openwhisk/kafkaprovider Apache OpenWhisk event provider service for … 2 [OK]
3.Docker容器内如何安装vim?
-
apt-get install vim (可能提示你安装失败!继续往下)
-
agt-get update 同步 /etc/apt/sources.list 和 /etc/apt/sources.list.d 中列出的源的索引
配置国内镜像源:
echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >/etc/apt/sources.listecho "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list -
返回第一步
4.无法启动kafka?
kafka.common.KafkaException: Socket server failed to bind to 150.158.16.123:9092: 无法指定被请求的地址.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)at kafka.network.Acceptor.<init>(SocketServer.scala:252)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at kafka.network.SocketServer.startup(SocketServer.scala:83)at kafka.server.KafkaServer.startup(KafkaServer.scala:222)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)at kafka.Kafka$.main(Kafka.scala:65)at kafka.Kafka.main(Kafka.scala)
注意,上面是配置里面有个地址写得不对,listeners=PLAINTEXT://10.20.30.153:9092后接的是内网地址,通过ip addr即可查看,如我的机器

一个写内网地址,一个写外网地址即可

本次分享到这,下期见!
相关文章:
kafka使用入门案例与踩坑记录
每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤: Docker镜像形式启动 zookeeper启动 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeperkafka启动 docker run --name kafka01 -p 9092:909…...
系统启动太慢,调优后我直呼Nice
问题背景最近在负责一个订单系统的业务研发,本来不是件困难的事。但是服务的启动时间很慢,慢的令人发指。单次启动的时间约在10多分钟左右,基本一次迭代、开发,大部分的时间都花在了启动项目上。忍无可忍的我,终于决定…...
java知识点
文章目录异常写法JVM加载反射访问private调用方法动态代理注解元数据:TargetRetention元注解泛型编写泛型擦拭法局限通配符无限定通配符(<?>)集合重写方法和实现类IO流字节与字符转换同步和异步可以设置编码的类Print*类Files时间与日期时区一种二种三种异常…...
文件的打开关闭和顺序读写
目录 一、文件的打开与关闭 (一)文件指针 (二) 文件的打开和关闭 二、文件的顺序读写 (一)fputc 1. 介绍 2. 举例 (二)fgetc 1. 介绍 2. 举例1 3. 举例2 (三&…...
(十八)操作系统-进程互斥的软件实现方法
文章目录一、知识总览二、单标志法三、双标志先检查法四、双标志后检查法五、Peterson算法六、总结一、知识总览 二、单标志法 算法思想:两个进程在访问临界区后,会把使用临界区的权限转交给另一个进程。也就是说每个进程进入临界区的权限只能被另一个进…...
2023年三月份图形化一级打卡试题
活动时间 从2023年3月1日至3月21日,每天一道编程题。 本次打卡的规则如下: 小朋友每天利用10~15分钟做一道编程题,遇到问题就来群内讨论,我来给大家答疑。 小朋友做完题目后,截图到朋友圈打卡并把打卡的截图发到活动群…...
linux 防火墙管理-firewalld
什么是Firewalld 当前很多linux系统中都默认使用 firewalld(Dynamic Firewall Manager of Linux systems,Linux系统的动态防火墙管理器)服务作为防火墙配置管理工具。 “firewalld”是firewall daemon。它提供了一个动态管理的防火墙&#x…...
2023年最新大厂开发面试题(滴滴,华为,京东,腾讯,头条)
2023年最新大厂开发面试题!!! 滴滴篇 B树、B-树的区别? 数据库隔离级别,幻读和不可重复读的区别? 有 hell, well, hello, world 等字符串组,现在问能否拼接成 helloworld,代码实现。 快排算…...
2023年三月份图形化三级打卡试题
活动时间 从2023年3月1日至3月21日,每天一道编程题。 本次打卡的规则如下: 小朋友每天利用10~15分钟做一道编程题,遇到问题就来群内讨论,我来给大家答疑。 小朋友做完题目后,截图到朋友圈打卡并把打卡的截图发到活动群…...
蓝桥杯算法模板
模拟散列表拉链法import java.io.*; import java.util.*; public class a1 {static int n;static int N100003;static int[] hnew int[N];static int[] enew int[N];static int[] nenew int[N]; static int idx; static void insert(int x){int k(x%NN)%N;e[idx]x;ne[idx]h[k];…...
python之并发编程
一、并发编程之多进程 1.multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocess…...
Vue.js自定义事件的使用(实现父子之间的通信)
vue v-model修饰符:.lazy、.number、.trim $attrs数据的透传,在组件(这个是写在App.vue中),数据就透传到student组件中,在template中可以直接使用{{$attrs.students}}获取数据 通过defineProps定义的属性在attrs中就…...
第12天-商品维护(发布商品、商品管理、SPU管理)
1.发布商品流程 发布商品分为5个步骤: 基本信息规格参数销售属性SKU信息保存完成 2.发布商品-基本信息 2.1.会员等级-会员服务 2.1.1.会员服务-网关配置 在网关增加会员服务的路由配置 - id: member_routeuri: lb://gmall-memberpredicates:- Path/api/member/…...
动态分区分配计算
动态分区分配 内存连续分配管理分为: 单一连续分配固定分区分配动态分区分配(本篇所讲) 首次适应算法(First Fit,FF) 该算法又称最先适应算法,要求空闲分区按照首地址递增的顺序排列。 优点…...
【云原生】k8s的pod基本概念
一、资源限制 Pod 是 kubernetes 中最小的资源管理组件,Pod 也是最小化运行容器化应用的资源对象。一个 Pod 代表着集群中运行的一个进程。kubernetes 中其他大多数组件都是围绕着 Pod 来进行支撑和扩展 Pod 功能的,例如用于管理 Pod 运行的 StatefulSe…...
【史上最全面esp32教程】激光与食人鱼模块篇
文章目录食人鱼模块模块介绍连线说明操作激光模块模块介绍连线说明操作总结提示:以下是本篇文章正文内容,下面案例可供参考 食人鱼模块 模块介绍 采用食人鱼LED设计制作一个发光的电子模块,其实他的本质和LED无区别。 连线说明 名称接线…...
《代码整洁之道》二之有意义的命名
1.有意义的命名 1.1 名副其实 取个好名字需要花时间,但是价值远超取名的时间,一旦发现更好的名称就换掉旧的。这么做,读你代码的人都会很开心。 变量名、方法名、类名称需要清晰的告诉别人含义,如果名称需要注释来补充…...
天气预测demo
天气预测1 数据集介绍1.1 训练集1.2 测试集2 导入数据进行数据分析2.1 浏览数据2.2 探索数据2.2.1 查看数据类型1 数据集介绍 1.1 训练集 训练集中共有116369个样本,每个样本有23个特征,特征具体介绍如下: 列名解释Date:日期&a…...
HDMI协议介绍(四)--Video
目录 视频格式 RGB444 YUV444 YUV422 YUV420 Color Depth Video控制信号 Pixel Repetition HDMI支持多种视频格式和分辨率。以hdmi1.4和2.0协议来说,视频格式支持RGB444、YUV444、YUV422和YUV420,其中RGB444和YUV444一般都是要求支持的。 视频格式…...
微信授权登录流程以及公众号配置方法(golang后端)
一、准备一个已经认证OK的微信公众号和已经备案的域名,且解析好配置好https证书。 1.如上图 微信公众号 > 基本配置 ,设置开发者密码 2.设置IP白名单,白名单填写提供后端服务的服务器公网IP 二、公众号服务器配置。 1.找到基本配置 2.将服…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
Caliper 配置文件解析:config.yaml
Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...
ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
关于uniapp展示PDF的解决方案
在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项: 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库: npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...
SQL Server 触发器调用存储过程实现发送 HTTP 请求
文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...
云原生周刊:k0s 成为 CNCF 沙箱项目
开源项目推荐 HAMi HAMi(原名 k8s‑vGPU‑scheduler)是一款 CNCF Sandbox 级别的开源 K8s 中间件,通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度,为容器提供统一接口,实现细粒度资源配额…...
【WebSocket】SpringBoot项目中使用WebSocket
1. 导入坐标 如果springboot父工程没有加入websocket的起步依赖,添加它的坐标的时候需要带上版本号。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dep…...
论文阅读:Matting by Generation
今天介绍一篇关于 matting 抠图的文章,抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法,已经有很多的工作和这个任务相关。这两年 diffusion 模型很火,大家又开始用 diffusion 模型做各种 CV 任务了&am…...


