当前位置: 首页 > article >正文

2.部署kafka:9092

官方文档:http://kafka.apache.org/documentation.html

(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)

Kafka3台集群搭建环境:

操作系统: centos7

防火墙:全关

3台zookeeper集群内的机器,1台logstash

软件版本: zookeeper-3.4.12.tar.gz

软件版本kafka_2.12-2.1.0.tgz

安装软件

(3台zookeeper集群的机器)

# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/

# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka

创建数据目录(3台)

# mkdir /data/kafka-logs

修改第一台配置文件

(注意不同颜色标记的部分)

# egrep -v "^$|^#" /usr/local/kafka/config/server.properties

broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样

listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数

#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中

num.partitions=1 #默认的分区数,一个topic默认1个分区数

num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1

offsets.topic.replication.factor=2

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数

replica.fetch.max.bytes=5242880 #取消息的最大字节数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除

zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

修改另外两台配置文件

#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/

broker.id=2

listeners=PLAINTEXT://192.168.148.142:9092

# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/

broker.id=3

listeners=PLAINTEXT://192.168.148.143:9092

启动kafka(3台)

[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

查看启动情况(3台)

[root@host1 ~]# jps

10754 QuorumPeerMain

11911 Kafka

12287 Jps

创建topic来验证

[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien

出现Created topic "cien"验证成功运行

在一台服务器上创建一个发布者

[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien

> hello kafka

> ni hao ya

>

在另一台服务器上创建一个订阅者

[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning

...

hello kafka

ni hao ya

如果都能接收到,说明kafka部署成功!

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息

Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:

Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic

Topic qianfeng is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

配置elfk集群订阅和zookeeper和kafka

配置第一台logstash生产消息输出到kafka

yum -y install wget

wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm

yum localinstall jdk-8u341-linux-x64.rpm -y

java -version

1.安装logstash

tar xf logstash-6.4.1.tar.gz -C /usr/local

ln -s /usr/local/logstash-6.4.1 /usr/local/logstash

2.修改配置文件

cd /usr/local/logstash/config/

vim logstash.yml

http.host: "0.0.0.0"

3.编写配置文件

不要过滤, logstash会将message内容写入到队列中

# cd /usr/local/logstash/config/

# vim logstash-kafka.conf

input {file {type => "sys-log"path => "/var/log/messages"start_position => beginning}
}
output {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092"     #输出到kafka集群topic_id => "sys-log-messages"         #主题名称compression_type => "snappy"         #压缩类型codec =>  "json"}
}

启动logstash

# /usr/local/logstash/bin/logstash -f logstash-kafka.conf

在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list

__consumer_offsets

qianfeng

sys-log-messages

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages

Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:

Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2

配置第二台logstash,订阅kafka日志,输出到es集群

# cat kafka-es.conf

input {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" topics => "sys-log-messages"          #kafka主题名称codec => "json"auto_offset_reset => "earliest"}
}output {elasticsearch {hosts => ["192.168.148.131:9200","192.168.148.132:9200"]index => "kafka-%{type}-%{+YYYY.MM.dd}"}
}

相关文章:

2.部署kafka:9092

官方文档:http://kafka.apache.org/documentation.html (虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群) Kafka3台集群搭建环境: 操作系统: centos7 防火墙:全关 3台zookeeper集群内的机器,1台logstash 软件版本: …...

springboot博客系统详解与实现(后端实现)

目录 前言: 项目介绍 一、项目的准备工作 1.1 数据准备 1.2 项目创建 1.3 前端页面的准备 1.4 配置配置文件 二、公共模块 2.1 根据需求完成公共层代码的编写 2.1.1 定义业务状态枚举 2.1.2 统一返回结果 2.1.3 定义项目异常 2.1.4 统一异常处理 三、业…...

14.12 Auto-GPT OutputParser 架构设计:构建安全可控的大模型输出管道

Auto-GPT OutputParser 架构设计:构建安全可控的大模型输出管道 关键词:Auto-GPT 输出解析、结构化响应控制、内容安全过滤、多格式输出适配、错误恢复机制 1. OutputParser 的核心作用与设计挑战 输出解析的三大核心任务: #mermaid-svg-sUqVk51rX50EHefe {font-family:&q…...

seacmsv9注入管理员账号密码+orderby+limit

一、网上收集: 海洋影视管理系统(seacms,海洋cms)是一套专为不同需求的站长而设计的视频点播系统,采 用的是 php5.Xmysql 的架构,seacmsv9漏洞文件:./comment/api/index.php,漏洞参数…...

企业级大模型应用的Java-Python异构融合架构实践

一、后端语言相关技术生态 Python语言 Python在AI计算领域拥有全面的生态支持: 底层工具库: Pandas、NumPy、SciPy、Matplotlib深度学习框架: PyTorch、TensorFlow领域专用框架: HuggingFace Transformers(社区生态为主) 常见Python框架 …...

C#连接sql server

连接时,出现如下提示: ERROR [IM014] [Microsoft][ODBC 驱动程序管理器] 在指定的 DSN 中,驱动程序和应用程序之间的体系结构不匹配 原因是odbc的驱动和应用程序的架构不一致。我的odbc如下所示: 显示为64位,而c#程序显…...

粉色和紫色渐变壁纸怎么设计?

粉色和紫色的渐变壁纸设计可以打造极为浪漫的氛围,这两种颜色的搭配极具梦幻感与浪漫气息,常被用于各种浪漫主题的设计之中。以下是关于粉色和紫色渐变壁纸的设计方法: 一、渐变方向设计 横向渐变:从画面左侧的粉色过渡到右侧的紫…...

计算机网络:从底层原理到前沿应用,解锁数字世界的连接密码

计算机网络:从底层原理到前沿应用,解锁数字世界的连接密码 在信息如洪流般奔涌的时代,计算机网络宛如无形的脉络,贯穿于我们生活的每一个角落。它不仅是数据传输的通道,更是连接全球、驱动创新的核心力量。从日常的网络…...

AOP基础-01.快速入门

一.AOP 对于统计每一个业务方法的耗时这一操作,如果再业务层的每一个方法前获取方法运行的开始时间,方法结束获取结束时间,然后计算执行耗时,那这样就太繁琐了。能不能定义一个模板方法,使得该方法能够在业务层的方法执…...

Linux主机用户登陆安全配置

Linux主机用户登陆安全配置 在Linux主机上进行用户登录安全配置是一个重要的安全措施,可以防止未经授权的访问。以下是如何创建用户hbu、赋予其sudo权限,以及禁止root用户SSH登录,以及通过ssh key管理主机用户登陆。 创建用户hbu 使用具有…...

Solidity 开发环境

Solidity 开发环境 Solidity编辑器:Solidity编辑器是⼀种专⻔⽤于编写和编辑Solidity代码的编辑器。常⽤的Solidity编辑器包括 Visual Studio Code、Atom和Sublime Text。以太坊开发环境:以太坊开发环境(Ethereum Development Environment&a…...

图像处理、数据挖掘、数据呈现

目录 图像处理方法 阈值分割 图像处理方法 图像平滑 图像锐化 图像增强 阈值分割 边缘检测 阈值分割 特征提取 提取边界 区域提取 主成分压缩 POI 多源数据 数据挖掘 多源数据提取 关联度提取 位置集群, 新闻事件, 权限 个人喜好 历史…...

Go小技巧易错点100例(二十三)

本期分享: 1.Go Module控制Go版本 2.int转string注意事项 3.Go项目查看mod依赖关系 Go Module控制Go版本 当我们开发Go项目涉及到两台及以上的机器,而且它们又刚好是不同操作系统的时候,可能就要把代码挪到另一台机器上重新编译&#xff…...

JVM生产环境问题定位与解决实战(三):揭秘Java飞行记录器(JFR)的强大功能

提到飞行记录器,或许你的脑海中并未立刻浮现出清晰的画面,但一说起“黑匣子”,想必大多数人都能恍然大悟,知晓其重要性及用途。在航空领域,黑匣子作为不可或缺的设备,默默记录着飞行过程中的每一项关键数据…...

使用Docker Desktop部署GitLab

1. 环境准备 确保Windows 10/11系统支持虚拟化技术(需在BIOS中开启Intel VT-x/AMD-V)内存建议≥8GB,存储空间≥100GB 2. 安装Docker Desktop 访问Docker官网下载安装包安装时勾选"Use WSL 2 instead of Hyper-V"(推荐…...

MySQL数据库连接池泄露导致MySQL Server超时关闭连接

前言 最近做项目,发现老项目出现xxx,这个错误其实很简单,出现在MySQL数据库Server端对长时间没有使用的client连接执行清楚处理,因为是druid数据库,且在github也出现这样的issue:The last packet successf…...

力扣 下一个排列

交换位置,双指针,排序。 题目 下一个排列即在组成的排列中的下一个大的数,然后当这个排列为降序时即这个排列最大,因为大的数在前面,降序排列的下一个数即升序。所以,要是想找到当前排列的下一个排列&…...

Fisher散度:从信息几何到机器学习的隐藏利器

Fisher散度:从信息几何到机器学习的隐藏利器 在机器学习和统计学中,比较两个概率分布的差异是常见任务,比如评估真实分布与模型预测分布的差距。KL散度(Kullback-Leibler Divergence)可能是大家熟悉的选择&#xff0c…...

事务管理-03.事务进阶-propagation属性

一.工具 在介绍事务的propagation属性前,我们首先介绍一个工具:Grep Console,该工具用来实现将idea输出出的日志信息进行选择性的高亮展示。 当要选择日志中的某一部分高亮展示时,只需要右键点击Add Highlight即可。此时日志中所…...

Pretraining Language Models with Text-Attributed Heterogeneous Graphs

Pretraining Language Models with Text-Attributed Heterogeneous Graphs EMNLP 推荐指数:#paper/⭐⭐#​ 贡献: 我们研究了在更复杂的数据结构上预训练LM的问题,即,TAHG。与大多数只能从每个节点的文本描述中学习的PLM不同&…...

模型疑问图像、嵌入、推理类型与说明

在进行模型使用的时候,有时候会碰到模型存在模型类型需要选择的情况,如下面deepseek模型选择模型类型图像、嵌入、推理。 以下是针对此问题的了解与说明: DeepSeek 模型是一个多模态人工智能模型,能够同时处理图像和文本数据,并在多种任务中实现高效的嵌入表示和推理。以下…...

WiFi IEEE 802.11协议精读:IEEE 802.11-2007,6,MAC service definition MAC服务定义

继续精读IEEE 802.11-2007 6,MAC service definition MAC服务定义 6.1 MAC服务概述 6.1.1 数据服务 此服务为对等逻辑链路控制(LLC)实体提供交换MAC服务数据单元(MSDU)的能力。为支持此服务,本地媒体访…...

Visual Studio Code 跨平台安装与配置指南(附官方下载链接)

一、软件定位与核心功能 Visual Studio Code(简称VS Code)是微软开发的开源跨平台代码编辑器,支持超过50种编程语言的智能补全、调试和版本控制功能。2025版本新增AI辅助编程模块,可自动生成单元测试代码和API文档注释。 二、下载…...

deepseek自动化代码生成

使用流程 效果第一步:注册生成各种大模型的API第二步:注册成功后生成API第三步:下载vscode在vscode中下载agent,这里推荐使用cline 第四步:安装完成后,设置模型信息第一步选择API provider: Ope…...

RK3568开发笔记-AD7616调试笔记

目录 前言 一、AD7616介绍 高分辨率 高速采样速率 宽模拟输入范围 集成丰富功能 二、原理图连接 三、设备树配置 四、内核驱动配置 五、AD芯片测试 总结 前言 在嵌入式数据采集领域,将模拟信号精准转换为数字信号至关重要。AD7616 作为一款性能卓越的 16 位模数转换器…...

【DeepSeek开源:会带来多大的影响】

DeepSeek 开源,震撼登场对云计算行业的冲击 巨头云厂商的新机遇 DeepSeek 开源后,为云计算行业带来了巨大的变革,尤其是为巨头云厂商创造了新的发展机遇。以阿里云为例,它作为云计算行业的领军者,与 DeepSeek 的合作…...

transformer架构嵌入层位置编码之动态NTK-aware位置编码

前文,我们已经构建了一个小型的字符级语言模型,是在transformer架构基础上实现的最基本的模型,我们肯定是希望对该模型进行改进和完善的。所以我们的另外一篇文章也从数据预处理、模型架构、训练策略、评估方法、代码结构、错误处理、性能优化等多个方面提出具体的改进点,但…...

OceanBase + DeepSeek:5分钟免费搭建企业知识库

过去一个月,DeepSeek 在全球范围内引发了热烈讨论。其突破性的 AI 能力使其日流量显著超越 Claude 和 Perplexity,吸引了众多企业和技术专家的高度关注。随着 AI 技术的不断进步,企业正面临一场深刻的智能化变革——如何通过 AI 重构业务&…...

水利工程安全包括哪几个方面

水利工程安全培训的内容主要包括以下几个方面: 基础知识和技能培训 : 法律法规 :学习水利工程相关的安全生产法律法规,了解安全生产标准及规范。 事故案例 :通过分析事故案例,了解事故原因和教训&#x…...

基于 sklearn 的均值偏移聚类算法的应用

基于 sklearn 的均值偏移聚类算法的应用 在机器学习和数据挖掘中,聚类算法是一类非常重要的无监督学习方法。它的目的是将数据集中的数据点划分为若干个类,使得同一类的样本点彼此相似,而不同类的样本点相互之间差异较大。均值偏移聚类&…...