kafka下载安装部署
Apache kafka 是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。
kafka的特性:
1.高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
2.可扩展性:kafka集群支持热扩展
3.持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
4.容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
5.高并发:支持数千个客户端同时读写
kafka是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。
Kafka和其他主流分布式消息系统的对比

1、下载安装kafka
Kafka需要依赖JAVA环境运行,需要先下载安装JDK。Kafka支持内置的Zookeeper和引用外部的Zookeeper,如果使用外部的zookeeper,需要提前下载安装zookeeper (zookeeper下载安装部署)。
在安装jdk之前,先卸载Linux系统自带的jdk。
通过 rpm -qa | grep jdk 命令查看系统自带的jdk,并通过 rpm -e --nodeps命令逐个卸载。

Jdk8下载地址:Java Downloads | Oracle

下载后上传到Linux系统的某个目录下,解压并移动到/usr/local目录下。
tar -zxvf jdk-8u391-linux-x64.tar.gz
mv jdk1.8.0_391 /usr/local/jdk1.8/jdk1.8.0_391
配置环境变量,修改 /etc/profile 文件,添加如下jdk的配置。
#set java environment
export JAVA_HOME=/usr/local/jdk1.8/jdk1.8.0_391
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATHexport PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH:$HOME/bin
然后执行 source /etc/profile 命令使得修改立即生效。
kafka下载地址:Apache Kafka
在/usr/local/目录下创建kafka目录,并在kafka目录下通过wget命令下载kafka压缩包,或者将在Windows系统中下载好的kafka压缩包通过Xftp传到kafka目录中,然后解压。

tar -zxvf kafka_2.12-3.6.1.tgz
最后使用root用户修改/etc/profile文件,添加kafka启动bin目录,以便在任何目录下都可以通过cd $KAFKA_HOME命令进入到kafka安装目录。最后通过source /etc/profile 命令使得修改生效。
配置环境变量,修改 /etc/profile 文件,在最后加上如下配置:
export KAFKA_HOME=/usr/local/kafka/kafka_2.12-3.6.1
export PATH=$KAFKA_HOME/bin:$PATH
然后执行 source /etc/profile 命令使得修改立即生效。
2、单机部署
2.1、修改配置文件
在 /usr/local/kafka/kafka_2.12-3.6.1 目录下创建一个用于存放日志的目录mylogs,在server.properties配置文件中会使用到这个目录。
mkdir -p /usr/local/kafka/kafka_2.12-3.6.1/mylogs

修改kafka的配置文件:server.properties。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0port=9092
host.name=192.168.10.188############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.10.188:9092# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
#log.dirs=D:/MySoftware/Install/tools/kafka/logs
log.dirs=/usr/local/kafka/kafka_2.12-3.6.1/mylogs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# zookeeper.connect=localhost:2181
zookeeper.connect=192.168.10.188:2181/kafka# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0#Delete topic
delete.topic.enable=true
在server.properties中只添加了port和host.name,并修改了log.dirs、zookeeper.connect属性,其余都是默认。另外,还要开启listeners属性,不然在后面启动consumer接受消息时看不到消息。
配置文件中参数详解:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。改成自己centos的ip地址。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:2181,192.168.7.101:2181,192.168.7.107:2181/kafka #设置zookeeper的连接端口,在集群配置时,要把所有机器的ip地址都要写上,这里以三个机器为例。如果是单机部署,只需要写一个ip地址就行了。
注意:在zookeeper.connect的最后加上/kafka是因为kafka需要依赖zookeeper,在kafka启动之后默认会在zookeeper服务所在节点的根目录下创建很多与kafka有关的目录,这样就会导致zookeeper服务所在节点的根目录下的文件很多很乱。另外,如果多个kafka共用一个zookeeper,就会导致zookeeper服务的根目录下各个kafka文件更加混乱。所以在zookeeper.connect的最后加上/kafka是为了在kafka启动时将创建的文件都放到zookeeper节点根目录下的/kafka子目录下。多个kafka共用一个zookeeper时可以分别配置自己的子目录以示区分。
启动zookeeper和kafka之后,会自动在zookeeper节点上创建/kafka目录。

2.2、配置和启动zookeeper
方式一:使用kafka自带的zookeeper,修改zookeeper.properties配置文件:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
# dataDir=D:/MySoftware/Install/tools/kafka/tmp/zookeeper
dataDir=/usr/local/kafka/kafka_2.12-3.6.1/tmp/zk/data
dataLogDir=/usr/local/kafka/kafka_2.12-3.6.1/tmp/zk/logs
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
进入到kafka安装目录下,执行如下命令启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
方式二:使用外部安装的zookeeper。
这里使用外部安装的zookeeper。由于这里kafka是单机部署,所以zookeeper也要使用单机部署,具体步骤见 zookeeper下载安装部署 中的单机部署zookeeper部分。因为之前已经安装并配置了zookeeper,所以这里不在配置了,直接启动就行了。
进入到zookeeper安装目录下的bin目录中,执行如下命令启动zookeeper服务端。
./zkServer.sh start
./zkServer.sh start
2.3、启动kafka
切记:启动kafka之前必须先启动zookeeper。
进入到kafka的bin目录下,启动kafka。参数-daemon的含义是指启动的服务进程是作为后台进程(守护进程模式)启动,不加就是作为前端线程来启动。Kafka在启动一段时间后,如果出现服务自动关闭情况,可在启动kafka的时使用守护进程模式启动,即在原启动命令中加 -daemon。启动之后用jps命令检查是否启动。启动命令:./kafka-server-start.sh -daemon ../config/server.properties
# 进入到 bin 目录
./kafka-server-start.sh -daemon ../config/server.properties# 或者进入到kafka安装目录,执行如下命令
bin/kafka-server-start.sh -daemon config/server.properties

2.4、创建、查看、删除topic
创建topic:
创建一个名字为testKafka的topic,只有一个副本,一个分区。
进入到kafka安装目录的bin目录下,执行kafka-topics.sh脚本。
--zookeeper参数表示要指定zookeeper服务的安装节点,多个节点可以用逗号分隔。并且最后加上在server.properties配置文件中zookeeper.connect属性设置的kafka启动时存储信息的路径,上面配置文件中zookeeper.connect属性配置的路径是/kafka。
命令:./kafka-topics.sh --zookeeper 192.168.10.188:2181/kafka --create --replication-factor 1 --partitions 1 --topic testKafka
./kafka-topics.sh --zookeeper 192.168.10.188:2181/kafka --create --replication-factor 1 --partitions 1 --topic testKafka
#参数解释
--replication-factor 1 #副本因子是1
--partitions 1 #创建1个分区
--topic testKafka #主题为testKafka
192.168.10.188:2181是在server.properties文件中配置的zookeeper.connect,这个是zk的连接端口。
查看topic及topic状态:
查看topic的命令:./kafka-topics.sh -zookeeper 192.168.10.188:2181 -list
或者:./kafka-topics.sh --zookeeper 192.168.10.188:2181 --list
./kafka-topics.sh --zookeeper 192.168.10.188:2181 --list

查看topic状态的命令:./kafka-topics.sh --zookeeper 192.168.10.188:2181 --describe --topic testKafka
./kafka-topics.sh --zookeeper 192.168.10.188:2181 --describe --topic testKafka

leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
此处是单机部署kafka,只有一个broker,在server.properties文件中broker.id=0,所以此处leader是节点为0的broker。
删除topic:
命令:./kafka-topics.sh --zookeeper 192.168.10.188:2181 --delete --topic testKafka
./kafka-topics.sh --zookeeper 192.168.10.188:2181 --delete --topic testKafka


#Delete topic
delete.topic.enable=true
2.5、启动producer和consumer
在介绍启动producer和consumer的命令之前,先简单了解一下broker-list、bootstrap-servers和zookeeper几个参数。
1.broker:kafka服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。
2.broker-list:指定kafka集群中的一个或多个服务器,一般在使用kafka-console-producer.sh的时候,这个参数是必备参数,另外一个必备的参数是topic。
3.bootstrap-servers指的是kafka目标集群的服务器地址,这和broker-list功能一样,不过在启动producer时要求用broker-list,在启动consumer时用bootstrap-servers。
4. zookeeper指的是zk服务器或zk集群的地址。旧版本(0.9以前)的kafka,消费的进度(offset)是写在zk中的,所以启动consumer需要知道zk的地址。后来的版本都统一由broker管理,所以在启动consumer时就用bootstrap-server。
启动producer并发送消息,发送消息之后用Ctrl+C结束。
命令:./kafka-console-producer.sh --broker-list 192.168.10.188:9092 --topic testKafka
./kafka-console-producer.sh --broker-list 192.168.10.188:9092 --topic testKafka
启动consumer并接受消息。按Ctrl+C结束。
命令:./kafka-console-consumer.sh --zookeeper 192.168.10.188:2181 --topic testKafka --from-beginning (参数zookeeper被bootstrap-server代替了)
或者:./kafka-console-consumer.sh --bootstrap-server 192.168.10.188:9092 --topic testKafka --from-beginning
./kafka-console-consumer.sh --bootstrap-server 192.168.10.188:9092 --topic testKafka --from-beginning
参数--zookeeper 192.168.10.188:2181中的ip和port是zookeeper节点的ip和zookeeper的port,参数--bootstrap-server 192.168.10.188:9092中的ip和port是kafka节点的ip和kafka的port。
2.6、查看消费者组以及消息是否积压
查看消费者组的命令:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.188:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.188:9092 --list
查看消息是否有积压的命令:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.188:9092 --describe --group consumer-group-01
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.188:9092 --describe --group consumer-group-01

上图是在windows系统中执行kafka命令的截图,与Linux系统命令类似。上图中GROUP表示消费者组,TOPIC表示消息主题,PARTITION表示分区,CURRENT-OFFSET表示当前消费的消息条数,LOG-END-OFFSET表示kafka中生产的消息条数,LAG表示kafka中有多少条消息还未消费,也就是有多少条积压的消息。
在kafka中,消费者是按批次拉取数据的,每一批次拉取的数据条数是0-n条,每个消费者可以拉取多个分区的数据,但是一个分区的数据只能被同一个消费者组中的一个消费者拉取。如果一个消费者拉取多个分区的数据,那么拉取的这一批次的数据就包含多个分区的数据。消费者处理完这批数据之后,会将offset提交到__consumer_offsets这个topic中,__consumer_offsets(是一个topic)就是用于维护消费者消费到哪条数据offset的,是按照分区粒度维护的,各个分区的offset是互不影响的。例如一个consumer拉取两个分区(p0、p1)的数据,如果p0分区的数据处理完并将offset提交到__consumer_offsets中,而p1分区的数据还未处理完,p1分区的offset还未提交到__consumer_offsets中,此时consumer异常重启,consumer不会再拉取p0分区上次已消费的数据,但是会重新拉取p1分区上次消费但未提交的数据。
__consumer_offsets这个topic是kafka自动创建的,当consumer消费数据之后,consumer就会把offset提交到__consumer_offsets中。
2.7、关闭zookeeper和kafka
关闭kafka的命令:./kafka-server-stop.sh (必须进到kafka的bin目录下才能执行该命令)
关闭zk的命令:./zkServer.sh stop (必须进到zookeeper的bin目录下才能执行该命令)
3、集群部署
集群部署的步骤与单机部署几乎是一样的,主要的区别在于kafka的配置文件。
Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
Zookeeper的集群部署具体步骤见 zookeeper下载安装部署 中的集群部署zookeeper部分。这里与zookeeper集群部署一样,仍然使用三台计算机构成kafka集群。下面先在一台计算机上部署kafka,另外两台计算机的配置与这一台完全一样,只需修改配置文件中对应节点的ip和broker.id。假设三台计算机的ip地址分别是192.168.1.128、192.168.1.129、192.168.1.130。
3.1、修改配置文件
在 /usr/local/kafka/kafka_2.12-3.6.1 目录下创建一个用于存放日志的目录mylogs,在server.properties配置文件中会使用到这个目录。
mkdir -p /usr/local/kafka/kafka_2.12-3.6.1/mylogs
修改kafka的配置文件:server.properties。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=1port=9092
host.name=192.168.1.128############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.1.128:9092# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
#log.dirs=D:/MySoftware/Install/tools/kafka/logs
log.dirs=/usr/local/kafka/kafka_2.12-3.6.1/mylogs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# zookeeper.connect=localhost:2181
zookeeper.connect=192.168.1.128:2181,192.168.1.129:2181,192.168.1.130:2181/kafka# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0#Delete topic
delete.topic.enable=true


在server.properties文件中主要配置的就是broker.id、port、host.name、listeners、log.dirs和zookeeper.connect这六个属性,其他的都是默认值。
在zookeeper.connect的最后加上/kafka是因为kafka需要依赖zookeeper,在kafka启动之后默认会在zookeeper服务所在节点的根目录下创建很多与kafka有关的目录,这样就会导致zookeeper服务所在节点的根目录下的文件很多很乱。另外,如果多个kafka共用一个zookeeper,就会导致zookeeper服务的根目录下各个kafka文件更加混乱。所以在zookeeper.connect的最后加上/kafka是为了在kafka启动时将创建的文件都放到zookeeper节点根目录下的/kafka子目录下。多个kafka共用一个zookeeper时可以分别配置自己的子目录以示区分。
启动zookeeper和kafka之后,会自动在zookeeper节点上创建/kafka目录。


3.2、配置和启动zookeeper
Zookeeper的集群部署具体步骤见 zookeeper下载安装部署 中的集群部署zookeeper部分。
3.3、启动kafka
三个机器都要启动kafka。进入到kafka的bin目录下,启动kafka。启动之后用jps命令检查是否启动。
启动命令:./kafka-server-start.sh -daemon ../config/server.properties
# 进入到 bin 目录
./kafka-server-start.sh -daemon ../config/server.properties# 或者进入到kafka安装目录,执行如下命令
bin/kafka-server-start.sh -daemon config/server.properties
3.4、创建topic
创建一个名字为testKafka的topic,有两个副本,两个分区。
--zookeeper参数表示要指定zookeeper服务的安装节点,多个节点可以用逗号分隔。并且最后加上在server.properties配置文件中zookeeper.connect属性设置的kafka启动时存储信息的路径,上面配置文件中zookeeper.connect属性配置的路径是/kafka。
命令:./kafka-topics.sh --zookeeper 192.168.1.128:2181/kafka --create --replication-factor 2 --partitions 2 --topic testKafka
./kafka-topics.sh --zookeeper 192.168.1.128:2181/kafka --create --replication-factor 2 --partitions 2 --topic testKafka
3.5、启动producer和consumer
启动producer并发送消息,发送消息之后用Ctrl+C结束。
命令:./kafka-console-producer.sh --broker-list 192.168.1.128:9092 --topic testKafka
./kafka-console-producer.sh --broker-list 192.168.1.128:9092 --topic testKafka
启动consumer并接受消息。按Ctrl+C结束。
命令:./kafka-console-consumer.sh --bootstrap-server 192.168.1.128:9092 --topic testKafka --from-beginning
./kafka-console-consumer.sh --bootstrap-server 192.168.1.128:9092 --topic testKafka --from-beginning
相关文章:
kafka下载安装部署
Apache kafka 是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各…...
python包管理工具:pipenv的基本使用
很多语言都提供了环境隔离的支持,例如nodejs的node_module,golang的go mod,python也有virtualenv和pyvenv等机制。 为了建立依赖快照,通常会用pip freeze > requirements.txt 命令生成一个requirements.txt文件,在…...
AI系统ChatGPT网站系统源码AI绘画详细搭建部署教程,支持GPT语音对话+DALL-E3文生图+GPT-4多模态模型识图理解
一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…...
MC-4/11/03/400步进电机驱动器的主要驱动方式有哪些?
MC-4/11/03/400步进电机驱动器的主要驱动方式有哪些? 步进电机驱动器是一种将电脉冲转化为角位移的执行机构。当步进驱动器接收到一个脉冲信号,它就驱动步进电机按设定的方向转动一个固定的角度,这个固定的角度被称为“步距角”。步进电机不能…...
大数据技术原理与应用期末复习(林子雨)
大数据技术原理与应用期末复习(林子雨) Hadoop的特性HBase编程实践NoSQL的四大类型键值数据库优点:缺点: 列族数据库优点:缺点: 文档数据库优点:缺点: 图数据库优点:缺点…...
C练习——魔术师猜三位数
题目: 有一种室内互动游戏,魔术师要每位观众心里想一个三位数abc(a、b、c分别是百位、十位和个位数字),然后魔术师让观众心中记下acb、bac、bca、cab、cba五个数以及这5个数的和值。只要观众说出这个和是多少…...
three.js 使用 tweenjs绘制相机运动动画
效果: 代码: <template><div><el-container><el-main><div class"box-card-left"><div id"threejs" style"border: 1px solid red"></div><div class"box-right"…...
Oracle VARCHAR和VARCHAR2区别
在Oracle数据库中,VARCHAR和VARCHAR2是两种不同的数据类型,它们的区别如下: 1.存储空间 VARCHAR和VARCHAR2在存储空间上有所不同。在Oracle 7及以下版本中,VARCHAR类型的长度是固定的,如果存储的数据长度小于定义的长…...
HarmonyOS 开发基础(八)Row和Column
HarmonyOS 开发基础(八)Row和Column 一、Column 容器 1、容器说明: 纵向容器主轴方向:从上到下纵向交叉轴方向:从左到右横向 2、容器属性: justifyContent:设置子元素在主轴方向的对齐格式…...
Visual Studio中项目添加链接文件
这个需求在VS里面使用还真不多见,只是最近在做项目的版本编号的时候遇到一个头大的问题,我一个解决方案下面有几十个类库,再发布的时候这几十个类库的版本号必须要统一,之前我们都是在单个的AssemblyInfo.cs里面去改相关的信息&am…...
做一个个人博客第一步该怎么做?
做一个个人博客第一步该怎么做? 好多零基础的同学们不知道怎么迈出第一步。 那么,就找一个现成的模板学一学呗,毕竟我们是高贵的Ctrl c v 工程师。 但是这样也有个问题,那就是,那些模板都,太!…...
vue前端开发自学练习,Props数据传递-类型校验,默认值的设置!
vue前端开发自学练习,Props数据传递-类型校验,默认值的设置! 实际上,vue开发框架的时候,充分考虑到了前端开发人员可能会遇到的各种各样的情况,比如大家经常遇到的,数据类型的校验,再比如,默认…...
Fooocus 使用笔记
目录 换装,换脸,修复畸形 比较和使用教程: 安装教程: github地址: 换装,换脸,修复畸形 🔥迄今最全!Fooocus AI绘图 详细教程 AI换装 AI换脸 AI修复畸形 - 西瓜视频 …...
18. 从零用Rust编写正反向代理, 主动式健康检查源码实现
wmproxy wmproxy是由Rust编写,已实现http/https代理,socks5代理, 反向代理,静态文件服务器,内网穿透,配置热更新等, 后续将实现websocket代理等,同时会将实现过程分享出来ÿ…...
[DM8] 达梦8配置兼容Oracle
查看版本信息 select *,id_code from v$version; 查询解释: DM Database Server 64 V8 1-1-190-21.03.12-136419-ENT 64 版本位数标识,64表示为64位版本,无64则表示为32位版本 V8 大版本号,目前主要是V7、V8 1-1-190…...
【Pytorch简介】1.Introduction 简介
Introduction 简介 大多数机器学习工作流涉及处理数据、创建模型、使用超参数优化模型,以及保存,然后推理已训练的模型。 本模块介绍在 PyTorch(一种常用的 Python ML 框架)中实现的完整机器学习 (ML) 工作流。 我们使用 Fashio…...
什么是Session以及如何在 NestJS 项目中的优雅管理 Session
前言 Web开发中一个常见的问题是用户身份的管理和状态保持。Session 就是处理这个问题的一个传统技术。在这篇文章中,我们将探讨Session是什么,为什么我们需要Session,以及在NestJS项目中如何优雅地管理Session。 什么是Session 众所周知&…...
高级分布式系统-第6讲 分布式系统的容错性--故障/错误/失效/异常
分布式系统容错性的概念 分布式系统的容错性: 当发生故障时, 分布式系统应当在进行恢复的同时继续以可接受的方式进行操作, 并且可以从部分失效中自动恢复, 且不会严重影响整体性能。 具体包括以下4个方面的内容: 可…...
网络多线程开发小项目--QQ登陆聊天功能(服务端推送新闻、离线留言和文件)
9.1.5、QQ登陆聊天功能(服务端推送新闻、离线留言和文件) 9.1.5.1、服务端推送新闻 1、需求分析 2、思路分析 3、代码实现 QQServer: 1)cn.com.agree.qqserver.service.SendNewsToAllClient package cn.com.agree.qqserver.s…...
Jtti:有哪些方法可以提升Tomcat的性能?
提升 Tomcat 性能是确保 Web 应用程序快速响应并能够处理高并发请求的关键任务。以下是一些提升 Tomcat 性能的常见方法: 1. 调整JVM参数: a. 内存分配: 增加 JVM 的堆内存(Heap Memory)以提高应用程序的内存容量。使用 -Xmx 和 -Xms 参数设置…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...
Xela矩阵三轴触觉传感器的工作原理解析与应用场景
Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知,帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量,能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度,还为机器人、医疗设备和制造业的智…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
uni-app学习笔记三十五--扩展组件的安装和使用
由于内置组件不能满足日常开发需要,uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件,需要安装才能使用。 一、安装扩展插件 安装方法: 1.访问uniapp官方文档组件部分:组件使用的入门教程 | uni-app官网 点击左侧…...
负载均衡器》》LVS、Nginx、HAproxy 区别
虚拟主机 先4,后7...
