kafka使用入门案例与踩坑记录
每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤:
Docker镜像形式启动
zookeeper启动
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka启动
docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=150.158.16.123:12348 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.16.123:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d wurstmeister/kafka
进入kafka容器
docker exec -it [容器id] /bin/bash
创建topic
进入容器,在/opt/kafka_2.13-2.8.1/bin 目录下创建topic
./kafka-topics.sh --create --zookeeper 150.158.16.123:12348 --replication-factor 1 --partitions 1 --topic mykafka
./kafka-topics.sh --create --zookeeper 150.158.16.123:2181 --replication-factor 1 --partitions 1 --topic mykafka
运行生产者

运行消费者

单机形式启动
前提
1、Linux 机器
2、环境已准备好JDK,如果还没有装,推荐用yum一键安装
yum install -y java-1.8.0-openjdk.x86_64
检验:
[root@localhost ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08)
OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
3、将kafka压缩包上传到你的Linux
配置文件关注config目录下的zookeeper.properties和server.properties,启动服务时要指定
配置-启动
有默认配置,可不做修改(有需要可以自定义启动端口和数据存放位置等参数)
1、先启动自带的 Zookeeper:
[root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:14:52,759] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,766] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2023-02-26 14:14:52,783] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,784] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2023-02-26 14:14:52,796] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.version=1.8.0_362 (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
(省略大部分)
2、启动 Kafka
[root@localhost kafka_2.12-2.3.0]# bin/kafka-server-start.sh config/server.properties
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:16:00,261] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-26 14:16:01,004] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-02-26 14:16:01,024] INFO starting (kafka.server.KafkaServer)
[2023-02-26 14:16:01,025] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-26 14:16:01,068] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-02-26 14:16:01,072] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.version=1.8.0_362 (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.ZooKeeper)
(省略大部分)
上述步骤只要启动过程没有报错信息,一般是没有问题的
测试
1、创建个topic
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.154.134:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
2、查看topic列表
[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.154.134:2181 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
test
3、启动生产者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.154.134:9092 --topic test
>hi
>什么意思啊
4、启动消费者
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.154.134:9092 --topic test
hi
什么意思啊
正常启动,OK!
可视化:kafka-manager
镜像下载
docker pull sheepkiller/kafka-manager
运行容器
docker run -d --name kafka-manager -p 12349:9000 --link zookeeper --link kafka01 --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager
然后访问对应的IP:端口即可进入管理页面
注意:
ZK_HOSTS后面在web页面上要用到!
管理界面
进入主页面后,点击 Add Cluster 添加集群信息

然后填写配置信息,主要填写集群名称,Zookeeper的Hosts,还有指定kafka版本(选个跟你所使用的kafka版本号最接近的就行),其他的一些配置按默认的就行。
当你正确连接上以后,就能看到你的集群啦,如:


更多关于kafka可视化操作就由你慢慢探索吧!这里将你引进门!
注意:
如果你在启动
kafka manager这个容器时指定了ZK_HOSTS,那么Cluster Zookeeper Hosts这项填的内容要和ZK_HOSTS一致,否则会出现连接不上,连接超时等情况。如下图:
另外有些配置默认值是1,但是你得将其改成1以上的整数,否则不能正确保存提交。如:
注意
kafka版本不同,响应的api有区别
本版本是2.11
注意3.x是 --bootstrap-server localhost:9092方式新建,kafka2.x是以–zookeeper方式创建。下面查看新建的topic。


奇葩问题
1.重启docker失败?
[root@localhost ~]# systemctl restart docker
Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.
[root@localhost ~]# journalctl -xe
-- The result is failed.
2月 22 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
2月 22 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
2月 22 02:01:53 localhost.localdomain systemd[1]: docker.service failed.
2月 22 02:01:55 localhost.localdomain systemd[1]: docker.service holdoff time over, scheduling restart.
2月 22 02:01:55 localhost.localdomain systemd[1]: Stopped Docker Application Container Engine.
-- Subject: Unit docker.service has finished shutting down
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit docker.service has finished shutting down.
2月 22 02:01:55 localhost.localdomain systemd[1]: start request repeated too quickly for docker.service
2月 22 02:01:55 localhost.localdomain systemd[1]: Failed to start Docker Application Container Engine.
-- Subject: Unit docker.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit docker.service has failed.
--
-- The result is failed.
原因:修改文件/etc/docker/daemon.json时不规范,可能存在空格什么的
解决:
[root@localhost ~]# cat <<EOF >/etc/docker/daemon.json
> {
> "registry-mirrors": ["https://registry.docker-cn.com"]
> }
> EOF
[root@localhost ~]# cat /etc/docker/daemon.json
{
"registry-mirrors": ["https://registry.docker-cn.com"]
}
[root@localhost ~]#
[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl restart docker
2.查询镜像无果?
[root@localhost ~]# docker search kafka
Error response from daemon: Get "https://index.docker.io/v1/search?q=kafka&n=25": x509: certificate has expired or is not yet valid: current time 2023-02-22T02:08:25+08:00 is before 2023-02-22T00:00:00Z
原因:虚拟机时间与外部时间不一致
解决:
[root@localhost ~]# date
2023年 02月 22日 星期三 02:09:50 CST
[root@localhost ~]# ntpdate cn.pool.ntp.org
26 Feb 13:31:38 ntpdate[44996]: step time server 119.28.206.193 offset 386475.634457 sec
[root@localhost ~]# date
2023年 02月 26日 星期日 13:31:48 CST
[root@localhost ~]# docker search kafka
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
bitnami/kafka Apache Kafka is a distributed streaming plat… 615 [OK]
ubuntu/kafka Apache Kafka, a distributed event streaming … 25
bitnami/kafka-exporter 9
ibmcom/kafka Docker Image for IBM Cloud Private-CE (Commu… 6
bitnami/kafka-trigger-controller Source for this controller is in the kubeles… 5
ibmcom/kafka-python-console-sample Docker image for the IBM Event Streams Pytho… 2
openwhisk/kafkaprovider Apache OpenWhisk event provider service for … 2 [OK]
3.Docker容器内如何安装vim?
-
apt-get install vim (可能提示你安装失败!继续往下)
-
agt-get update 同步 /etc/apt/sources.list 和 /etc/apt/sources.list.d 中列出的源的索引
配置国内镜像源:
echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >/etc/apt/sources.listecho "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list -
返回第一步
4.无法启动kafka?
kafka.common.KafkaException: Socket server failed to bind to 150.158.16.123:9092: 无法指定被请求的地址.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)at kafka.network.Acceptor.<init>(SocketServer.scala:252)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at kafka.network.SocketServer.startup(SocketServer.scala:83)at kafka.server.KafkaServer.startup(KafkaServer.scala:222)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)at kafka.Kafka$.main(Kafka.scala:65)at kafka.Kafka.main(Kafka.scala)
注意,上面是配置里面有个地址写得不对,listeners=PLAINTEXT://10.20.30.153:9092后接的是内网地址,通过ip addr即可查看,如我的机器

一个写内网地址,一个写外网地址即可

本次分享到这,下期见!
相关文章:
kafka使用入门案例与踩坑记录
每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤: Docker镜像形式启动 zookeeper启动 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeperkafka启动 docker run --name kafka01 -p 9092:909…...
系统启动太慢,调优后我直呼Nice
问题背景最近在负责一个订单系统的业务研发,本来不是件困难的事。但是服务的启动时间很慢,慢的令人发指。单次启动的时间约在10多分钟左右,基本一次迭代、开发,大部分的时间都花在了启动项目上。忍无可忍的我,终于决定…...
java知识点
文章目录异常写法JVM加载反射访问private调用方法动态代理注解元数据:TargetRetention元注解泛型编写泛型擦拭法局限通配符无限定通配符(<?>)集合重写方法和实现类IO流字节与字符转换同步和异步可以设置编码的类Print*类Files时间与日期时区一种二种三种异常…...
文件的打开关闭和顺序读写
目录 一、文件的打开与关闭 (一)文件指针 (二) 文件的打开和关闭 二、文件的顺序读写 (一)fputc 1. 介绍 2. 举例 (二)fgetc 1. 介绍 2. 举例1 3. 举例2 (三&…...
(十八)操作系统-进程互斥的软件实现方法
文章目录一、知识总览二、单标志法三、双标志先检查法四、双标志后检查法五、Peterson算法六、总结一、知识总览 二、单标志法 算法思想:两个进程在访问临界区后,会把使用临界区的权限转交给另一个进程。也就是说每个进程进入临界区的权限只能被另一个进…...
2023年三月份图形化一级打卡试题
活动时间 从2023年3月1日至3月21日,每天一道编程题。 本次打卡的规则如下: 小朋友每天利用10~15分钟做一道编程题,遇到问题就来群内讨论,我来给大家答疑。 小朋友做完题目后,截图到朋友圈打卡并把打卡的截图发到活动群…...
linux 防火墙管理-firewalld
什么是Firewalld 当前很多linux系统中都默认使用 firewalld(Dynamic Firewall Manager of Linux systems,Linux系统的动态防火墙管理器)服务作为防火墙配置管理工具。 “firewalld”是firewall daemon。它提供了一个动态管理的防火墙&#x…...
2023年最新大厂开发面试题(滴滴,华为,京东,腾讯,头条)
2023年最新大厂开发面试题!!! 滴滴篇 B树、B-树的区别? 数据库隔离级别,幻读和不可重复读的区别? 有 hell, well, hello, world 等字符串组,现在问能否拼接成 helloworld,代码实现。 快排算…...
2023年三月份图形化三级打卡试题
活动时间 从2023年3月1日至3月21日,每天一道编程题。 本次打卡的规则如下: 小朋友每天利用10~15分钟做一道编程题,遇到问题就来群内讨论,我来给大家答疑。 小朋友做完题目后,截图到朋友圈打卡并把打卡的截图发到活动群…...
蓝桥杯算法模板
模拟散列表拉链法import java.io.*; import java.util.*; public class a1 {static int n;static int N100003;static int[] hnew int[N];static int[] enew int[N];static int[] nenew int[N]; static int idx; static void insert(int x){int k(x%NN)%N;e[idx]x;ne[idx]h[k];…...
python之并发编程
一、并发编程之多进程 1.multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocess…...
Vue.js自定义事件的使用(实现父子之间的通信)
vue v-model修饰符:.lazy、.number、.trim $attrs数据的透传,在组件(这个是写在App.vue中),数据就透传到student组件中,在template中可以直接使用{{$attrs.students}}获取数据 通过defineProps定义的属性在attrs中就…...
第12天-商品维护(发布商品、商品管理、SPU管理)
1.发布商品流程 发布商品分为5个步骤: 基本信息规格参数销售属性SKU信息保存完成 2.发布商品-基本信息 2.1.会员等级-会员服务 2.1.1.会员服务-网关配置 在网关增加会员服务的路由配置 - id: member_routeuri: lb://gmall-memberpredicates:- Path/api/member/…...
动态分区分配计算
动态分区分配 内存连续分配管理分为: 单一连续分配固定分区分配动态分区分配(本篇所讲) 首次适应算法(First Fit,FF) 该算法又称最先适应算法,要求空闲分区按照首地址递增的顺序排列。 优点…...
【云原生】k8s的pod基本概念
一、资源限制 Pod 是 kubernetes 中最小的资源管理组件,Pod 也是最小化运行容器化应用的资源对象。一个 Pod 代表着集群中运行的一个进程。kubernetes 中其他大多数组件都是围绕着 Pod 来进行支撑和扩展 Pod 功能的,例如用于管理 Pod 运行的 StatefulSe…...
【史上最全面esp32教程】激光与食人鱼模块篇
文章目录食人鱼模块模块介绍连线说明操作激光模块模块介绍连线说明操作总结提示:以下是本篇文章正文内容,下面案例可供参考 食人鱼模块 模块介绍 采用食人鱼LED设计制作一个发光的电子模块,其实他的本质和LED无区别。 连线说明 名称接线…...
《代码整洁之道》二之有意义的命名
1.有意义的命名 1.1 名副其实 取个好名字需要花时间,但是价值远超取名的时间,一旦发现更好的名称就换掉旧的。这么做,读你代码的人都会很开心。 变量名、方法名、类名称需要清晰的告诉别人含义,如果名称需要注释来补充…...
天气预测demo
天气预测1 数据集介绍1.1 训练集1.2 测试集2 导入数据进行数据分析2.1 浏览数据2.2 探索数据2.2.1 查看数据类型1 数据集介绍 1.1 训练集 训练集中共有116369个样本,每个样本有23个特征,特征具体介绍如下: 列名解释Date:日期&a…...
HDMI协议介绍(四)--Video
目录 视频格式 RGB444 YUV444 YUV422 YUV420 Color Depth Video控制信号 Pixel Repetition HDMI支持多种视频格式和分辨率。以hdmi1.4和2.0协议来说,视频格式支持RGB444、YUV444、YUV422和YUV420,其中RGB444和YUV444一般都是要求支持的。 视频格式…...
微信授权登录流程以及公众号配置方法(golang后端)
一、准备一个已经认证OK的微信公众号和已经备案的域名,且解析好配置好https证书。 1.如上图 微信公众号 > 基本配置 ,设置开发者密码 2.设置IP白名单,白名单填写提供后端服务的服务器公网IP 二、公众号服务器配置。 1.找到基本配置 2.将服…...
LabVIEW数字IO编程避坑指南:单点采样、连续采样到底怎么选?NI-MAX测试面板帮你验证
LabVIEW数字IO编程实战:采样模式选择与NI-MAX验证全攻略 在工业自动化测试领域,LabVIEW的数字IO模块是最基础也最常用的功能之一。许多工程师在初次接触数字IO编程时,往往会被各种采样模式搞得晕头转向——单点采样、N采样、连续采样…...
演讲口才课到底有没有用?上完三个月后的真实反馈
三个月前,林薇坐在会议室的角落里,手里攥着一份精心准备的方案,却迟迟没有开口。那一刻,她看着同事们侃侃而谈,心里反复问自己:为什么明明有想法,却说不出来?就是那个瞬间࿰…...
深度解构:指纹浏览器底层隔离与Python高并发RPA,如何重塑电商矩阵自动化架构?
大家好,我是林焱,一名专注电商底层业务逻辑与 RPA 自动化架构定制的独立开发者。 在 CSDN 的各个技术板块中,关于爬虫与反爬虫、并发调度、以及客户端架构的讨论一直是热点。而将这些技术综合应用到极致的领域之一,就是当下极度内…...
从仿真结果到科研图表:手把手教你用Tonyplot处理Silvaco TCAD数据
从仿真结果到科研图表:手把手教你用Tonyplot处理Silvaco TCAD数据 在半导体器件研究中,TCAD仿真数据的可视化呈现往往决定着研究成果的传达效果。许多研究者花费大量时间完成Silvaco仿真后,却苦于无法将原始数据转化为符合学术出版要求的专业…...
从龟速到极速:如何用trackerslist项目彻底解决BT下载瓶颈
从龟速到极速:如何用trackerslist项目彻底解决BT下载瓶颈 【免费下载链接】trackerslist Updated list of public BitTorrent trackers 项目地址: https://gitcode.com/GitHub_Trending/tr/trackerslist 你是否曾经面对BT下载时那令人沮丧的进度条࿱…...
Flutter+开源鸿蒙实战|城市共享驿站智能存取系统 Day7 最终闭环篇 多端适配演示+毕设总结+源码梳理+功能扩展
Flutter开源鸿蒙实战|城市共享驿站智能存取系统 Day7 最终闭环篇 多端适配演示毕设总结源码梳理功能扩展 欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net <!-- Schema.org 结构化数据 --> <script type"applicati…...
基于Refine框架的企业级后台管理系统实战开发指南
1. 项目概述与核心价值最近在梳理企业内部后台管理系统的技术栈时,我又一次把目光投向了refine这个框架。如果你也和我一样,长期被各种业务后台的重复性开发工作所困扰——比如没完没了的增删改查(CRUD)界面、复杂的权限控制、数据…...
终局架构:指纹隔离底座 + gRPC分布式调度,重塑千万级拼多多店群RPA集群
大家好,我是林焱,一名专注电商底层业务逻辑与 RPA 自动化架构定制的独立开发者。 在前面的几篇 CSDN 专栏中,我们探讨了如何利用“指纹浏览器底层隔离”解决风控关联问题,如何利用“EDA(事件驱动)”和“CD…...
基于MCP协议的Kubernetes智能运维助手:lazymac-k-mcp项目详解
1. 项目概述:一个为Kubernetes而生的MCP服务器如果你和我一样,日常工作中有一大半时间都在和Kubernetes集群打交道,那么你肯定对kubectl命令行工具又爱又恨。爱的是它功能强大,是操作K8s的瑞士军刀;恨的是它命令繁多&a…...
城市道路自动驾驶避障规划与MPC跟踪控制【附仿真】
✨ 长期致力于自动驾驶、路径规划、速度规划、跟踪控制、模型预测控制研究工作,擅长数据搜集与处理、建模仿真、程序编写、仿真设计。 ✅ 专业定制毕设、代码 ✅如需沟通交流,点击《获取方式》 (1)SL图五次多项式代价路径决策与凸…...


