kafka使用入门案例与踩坑记录
每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤:
Docker镜像形式启动
zookeeper启动
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka启动
docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=150.158.16.123:12348 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.16.123:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d wurstmeister/kafka
进入kafka容器
docker exec -it [容器id] /bin/bash
创建topic
进入容器,在/opt/kafka_2.13-2.8.1/bin
目录下创建topic
./kafka-topics.sh --create --zookeeper 150.158.16.123:12348 --replication-factor 1 --partitions 1 --topic mykafka
./kafka-topics.sh --create --zookeeper 150.158.16.123:2181 --replication-factor 1 --partitions 1 --topic mykafka
运行生产者
运行消费者
单机形式启动
前提
1、Linux 机器
2、环境已准备好JDK,如果还没有装,推荐用yum一键安装
yum install -y java-1.8.0-openjdk.x86_64
检验:
[root@localhost ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08)
OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
3、将kafka压缩包上传到你的Linux
配置文件关注config
目录下的zookeeper.properties
和server.properties
,启动服务时要指定
配置-启动
有默认配置,可不做修改(有需要可以自定义启动端口和数据存放位置等参数)
1、先启动自带的 Zookeeper:
[root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:14:52,759] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,766] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2023-02-26 14:14:52,783] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,784] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2023-02-26 14:14:52,796] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.version=1.8.0_362 (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
(省略大部分)
2、启动 Kafka
[root@localhost kafka_2.12-2.3.0]# bin/kafka-server-start.sh config/server.properties
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:16:00,261] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-26 14:16:01,004] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-02-26 14:16:01,024] INFO starting (kafka.server.KafkaServer)
[2023-02-26 14:16:01,025] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-26 14:16:01,068] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-02-26 14:16:01,072] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.version=1.8.0_362 (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.ZooKeeper)
(省略大部分)
上述步骤只要启动过程没有报错信息,一般是没有问题的
测试
1、创建个topic
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.154.134:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
2、查看topic列表
[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.154.134:2181 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
test
3、启动生产者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.154.134:9092 --topic test
>hi
>什么意思啊
4、启动消费者
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.154.134:9092 --topic test
hi
什么意思啊
正常启动,OK!
可视化:kafka-manager
镜像下载
docker pull sheepkiller/kafka-manager
运行容器
docker run -d --name kafka-manager -p 12349:9000 --link zookeeper --link kafka01 --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager
然后访问对应的IP:端口
即可进入管理页面
注意:
ZK_HOSTS
后面在web页面上要用到!
管理界面
进入主页面后,点击 Add Cluster
添加集群信息
然后填写配置信息,主要填写集群名称,Zookeeper的Hosts,还有指定kafka版本(选个跟你所使用的kafka版本号最接近的就行),其他的一些配置按默认的就行。
当你正确连接上以后,就能看到你的集群啦,如:
更多关于kafka可视化操作就由你慢慢探索吧!这里将你引进门!
注意:
如果你在启动
kafka manager
这个容器时指定了ZK_HOSTS
,那么Cluster Zookeeper Hosts这项填的内容要和ZK_HOSTS
一致,否则会出现连接不上,连接超时等情况。如下图:
另外有些配置默认值是1,但是你得将其改成1以上的整数,否则不能正确保存提交。如:
注意
kafka版本不同,响应的api有区别
本版本是2.11
注意3.x是 --bootstrap-server localhost:9092方式新建,kafka2.x是以–zookeeper方式创建。下面查看新建的topic。
奇葩问题
1.重启docker失败?
[root@localhost ~]# systemctl restart docker
Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.
[root@localhost ~]# journalctl -xe
-- The result is failed.
2月 22 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
2月 22 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
2月 22 02:01:53 localhost.localdomain systemd[1]: docker.service failed.
2月 22 02:01:55 localhost.localdomain systemd[1]: docker.service holdoff time over, scheduling restart.
2月 22 02:01:55 localhost.localdomain systemd[1]: Stopped Docker Application Container Engine.
-- Subject: Unit docker.service has finished shutting down
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit docker.service has finished shutting down.
2月 22 02:01:55 localhost.localdomain systemd[1]: start request repeated too quickly for docker.service
2月 22 02:01:55 localhost.localdomain systemd[1]: Failed to start Docker Application Container Engine.
-- Subject: Unit docker.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit docker.service has failed.
--
-- The result is failed.
原因:修改文件/etc/docker/daemon.json
时不规范,可能存在空格什么的
解决:
[root@localhost ~]# cat <<EOF >/etc/docker/daemon.json
> {
> "registry-mirrors": ["https://registry.docker-cn.com"]
> }
> EOF
[root@localhost ~]# cat /etc/docker/daemon.json
{
"registry-mirrors": ["https://registry.docker-cn.com"]
}
[root@localhost ~]#
[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl restart docker
2.查询镜像无果?
[root@localhost ~]# docker search kafka
Error response from daemon: Get "https://index.docker.io/v1/search?q=kafka&n=25": x509: certificate has expired or is not yet valid: current time 2023-02-22T02:08:25+08:00 is before 2023-02-22T00:00:00Z
原因:虚拟机时间与外部时间不一致
解决:
[root@localhost ~]# date
2023年 02月 22日 星期三 02:09:50 CST
[root@localhost ~]# ntpdate cn.pool.ntp.org
26 Feb 13:31:38 ntpdate[44996]: step time server 119.28.206.193 offset 386475.634457 sec
[root@localhost ~]# date
2023年 02月 26日 星期日 13:31:48 CST
[root@localhost ~]# docker search kafka
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
bitnami/kafka Apache Kafka is a distributed streaming plat… 615 [OK]
ubuntu/kafka Apache Kafka, a distributed event streaming … 25
bitnami/kafka-exporter 9
ibmcom/kafka Docker Image for IBM Cloud Private-CE (Commu… 6
bitnami/kafka-trigger-controller Source for this controller is in the kubeles… 5
ibmcom/kafka-python-console-sample Docker image for the IBM Event Streams Pytho… 2
openwhisk/kafkaprovider Apache OpenWhisk event provider service for … 2 [OK]
3.Docker容器内如何安装vim?
-
apt-get install vim (可能提示你安装失败!继续往下)
-
agt-get update 同步 /etc/apt/sources.list 和 /etc/apt/sources.list.d 中列出的源的索引
配置国内镜像源:
echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >/etc/apt/sources.listecho "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
-
返回第一步
4.无法启动kafka?
kafka.common.KafkaException: Socket server failed to bind to 150.158.16.123:9092: 无法指定被请求的地址.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)at kafka.network.Acceptor.<init>(SocketServer.scala:252)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at kafka.network.SocketServer.startup(SocketServer.scala:83)at kafka.server.KafkaServer.startup(KafkaServer.scala:222)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)at kafka.Kafka$.main(Kafka.scala:65)at kafka.Kafka.main(Kafka.scala)
注意,上面是配置里面有个地址写得不对,listeners=PLAINTEXT://10.20.30.153:9092后接的是内网地址,通过ip addr即可查看,如我的机器
一个写内网地址,一个写外网地址即可
本次分享到这,下期见!
相关文章:

kafka使用入门案例与踩坑记录
每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤: Docker镜像形式启动 zookeeper启动 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeperkafka启动 docker run --name kafka01 -p 9092:909…...

系统启动太慢,调优后我直呼Nice
问题背景最近在负责一个订单系统的业务研发,本来不是件困难的事。但是服务的启动时间很慢,慢的令人发指。单次启动的时间约在10多分钟左右,基本一次迭代、开发,大部分的时间都花在了启动项目上。忍无可忍的我,终于决定…...

java知识点
文章目录异常写法JVM加载反射访问private调用方法动态代理注解元数据:TargetRetention元注解泛型编写泛型擦拭法局限通配符无限定通配符(<?>)集合重写方法和实现类IO流字节与字符转换同步和异步可以设置编码的类Print*类Files时间与日期时区一种二种三种异常…...

文件的打开关闭和顺序读写
目录 一、文件的打开与关闭 (一)文件指针 (二) 文件的打开和关闭 二、文件的顺序读写 (一)fputc 1. 介绍 2. 举例 (二)fgetc 1. 介绍 2. 举例1 3. 举例2 (三&…...

(十八)操作系统-进程互斥的软件实现方法
文章目录一、知识总览二、单标志法三、双标志先检查法四、双标志后检查法五、Peterson算法六、总结一、知识总览 二、单标志法 算法思想:两个进程在访问临界区后,会把使用临界区的权限转交给另一个进程。也就是说每个进程进入临界区的权限只能被另一个进…...

2023年三月份图形化一级打卡试题
活动时间 从2023年3月1日至3月21日,每天一道编程题。 本次打卡的规则如下: 小朋友每天利用10~15分钟做一道编程题,遇到问题就来群内讨论,我来给大家答疑。 小朋友做完题目后,截图到朋友圈打卡并把打卡的截图发到活动群…...

linux 防火墙管理-firewalld
什么是Firewalld 当前很多linux系统中都默认使用 firewalld(Dynamic Firewall Manager of Linux systems,Linux系统的动态防火墙管理器)服务作为防火墙配置管理工具。 “firewalld”是firewall daemon。它提供了一个动态管理的防火墙&#x…...
2023年最新大厂开发面试题(滴滴,华为,京东,腾讯,头条)
2023年最新大厂开发面试题!!! 滴滴篇 B树、B-树的区别? 数据库隔离级别,幻读和不可重复读的区别? 有 hell, well, hello, world 等字符串组,现在问能否拼接成 helloworld,代码实现。 快排算…...

2023年三月份图形化三级打卡试题
活动时间 从2023年3月1日至3月21日,每天一道编程题。 本次打卡的规则如下: 小朋友每天利用10~15分钟做一道编程题,遇到问题就来群内讨论,我来给大家答疑。 小朋友做完题目后,截图到朋友圈打卡并把打卡的截图发到活动群…...

蓝桥杯算法模板
模拟散列表拉链法import java.io.*; import java.util.*; public class a1 {static int n;static int N100003;static int[] hnew int[N];static int[] enew int[N];static int[] nenew int[N]; static int idx; static void insert(int x){int k(x%NN)%N;e[idx]x;ne[idx]h[k];…...

python之并发编程
一、并发编程之多进程 1.multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocess…...
Vue.js自定义事件的使用(实现父子之间的通信)
vue v-model修饰符:.lazy、.number、.trim $attrs数据的透传,在组件(这个是写在App.vue中),数据就透传到student组件中,在template中可以直接使用{{$attrs.students}}获取数据 通过defineProps定义的属性在attrs中就…...

第12天-商品维护(发布商品、商品管理、SPU管理)
1.发布商品流程 发布商品分为5个步骤: 基本信息规格参数销售属性SKU信息保存完成 2.发布商品-基本信息 2.1.会员等级-会员服务 2.1.1.会员服务-网关配置 在网关增加会员服务的路由配置 - id: member_routeuri: lb://gmall-memberpredicates:- Path/api/member/…...

动态分区分配计算
动态分区分配 内存连续分配管理分为: 单一连续分配固定分区分配动态分区分配(本篇所讲) 首次适应算法(First Fit,FF) 该算法又称最先适应算法,要求空闲分区按照首地址递增的顺序排列。 优点…...

【云原生】k8s的pod基本概念
一、资源限制 Pod 是 kubernetes 中最小的资源管理组件,Pod 也是最小化运行容器化应用的资源对象。一个 Pod 代表着集群中运行的一个进程。kubernetes 中其他大多数组件都是围绕着 Pod 来进行支撑和扩展 Pod 功能的,例如用于管理 Pod 运行的 StatefulSe…...
【史上最全面esp32教程】激光与食人鱼模块篇
文章目录食人鱼模块模块介绍连线说明操作激光模块模块介绍连线说明操作总结提示:以下是本篇文章正文内容,下面案例可供参考 食人鱼模块 模块介绍 采用食人鱼LED设计制作一个发光的电子模块,其实他的本质和LED无区别。 连线说明 名称接线…...
《代码整洁之道》二之有意义的命名
1.有意义的命名 1.1 名副其实 取个好名字需要花时间,但是价值远超取名的时间,一旦发现更好的名称就换掉旧的。这么做,读你代码的人都会很开心。 变量名、方法名、类名称需要清晰的告诉别人含义,如果名称需要注释来补充…...
天气预测demo
天气预测1 数据集介绍1.1 训练集1.2 测试集2 导入数据进行数据分析2.1 浏览数据2.2 探索数据2.2.1 查看数据类型1 数据集介绍 1.1 训练集 训练集中共有116369个样本,每个样本有23个特征,特征具体介绍如下: 列名解释Date:日期&a…...

HDMI协议介绍(四)--Video
目录 视频格式 RGB444 YUV444 YUV422 YUV420 Color Depth Video控制信号 Pixel Repetition HDMI支持多种视频格式和分辨率。以hdmi1.4和2.0协议来说,视频格式支持RGB444、YUV444、YUV422和YUV420,其中RGB444和YUV444一般都是要求支持的。 视频格式…...

微信授权登录流程以及公众号配置方法(golang后端)
一、准备一个已经认证OK的微信公众号和已经备案的域名,且解析好配置好https证书。 1.如上图 微信公众号 > 基本配置 ,设置开发者密码 2.设置IP白名单,白名单填写提供后端服务的服务器公网IP 二、公众号服务器配置。 1.找到基本配置 2.将服…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...