kafka 从入门到精通
kafka
安装
zookeeper模式
创建软件目录
mkdir /opt/soft
cd /opt/soft
下载
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
解压
tar -zxvf kafka_2.13-3.4.0.tgz
修改目录名称
mv kafka_2.13-3.4.0 kafka
配置环境变量
vim /etc/profile
export KAFKA_HOME=/opt/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin
修改配置文件
配置文件存放在 kafka/config目录
# 在每个节点创建目录
mkdir -p /opt/soft/kafka-logs
vim /opt/soft/kafka/config/server.properties
主要修改以下三个参数:
broker.id=1 注意不同的节点id号不同
log.dirs=/tmp/kafka-logs 修改为 log.dirs=/opt/soft/kafka-logs
zookeeper.connect=localhost:2181 修改为
zookeeper.connect=spark01:2181,spark02:2181,spark03:2181/kafka
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=1############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/opt/soft/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=spark01:2181,spark02:2181,spark03:2181/kafka# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
分发配置到其它节点
scp -r /opt/soft/kafka root@spark02:/opt/soft
scp -r /opt/soft/kafka root@spark03:/opt/soft
scp /etc/profile root@spark02:/etc
scp /etc/profile root@spark03:/etc
在所有节点刷新环境变量
source /etc/profile
启动停止
在每个节点分别启动
kafka-server-start.sh -daemon /opt/soft/kafka/config/server.properties
kafka-server-stop.sh
启动脚本
vim kafka-service.sh
#!/bin/bashcase $1 in
"start"){for i in spark01 spark02 spark03doecho ------------- kafka $i 启动 ------------ssh $i "/opt/soft/kafka/bin/kafka-server-start.sh -daemon /opt/soft/kafka/config/server.properties"done
}
;;
"stop"){for i in spark01 spark02 spark03doecho ------------- kafka $i 停止 ------------ssh $i "/opt/soft/kafka/bin/kafka-server-stop.sh"done
}
esac
kraft模式
创建软件目录
mkdir /opt/soft
cd /opt/soft
下载
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
解压
tar -zxvf kafka_2.13-3.4.0.tgz
修改目录名称
mv kafka_2.13-3.4.0 kafka
配置环境变量
vim /etc/profile
export KAFKA_HOME=/opt/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin
修改配置文件
配置文件存放在 kafka/config/kraft目录
# 在每个节点创建目录
mkdir -p /opt/soft/kraft-combined-logs
vim /opt/soft/kafka/config/kraft/server.properties
主要修改以下三个参数:
- process.roles=broker,controller
- node.id=1 注意不同的节点id号不同
- controller.quorum.voters=controller.quorum.voters=1@localhost:9093 修改为 controller.quorum.voters=controller.quorum.voters=1@spark01:9093,2@spark02:9093,3@spark03:9093
- advertised.listeners=PLAINTEXT://localhost:9092 修改为 advertised.listeners=PLAINTEXT://spark01:9092
- log.dirs=/tmp/kraft-combined-logs 修改为 log.dirs=/opt/soft/kraft-combined-logs
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
############################## Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
node.id=1# The connect string for the controller quorum
controller.quorum.voters=1@spark01:9093,2@spark02:9093,3@spark03:9093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://spark01:9092# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/opt/soft/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
分发配置到其它节点
scp -r /opt/soft/kafka root@spark02:/opt/soft
scp -r /opt/soft/kafka root@spark03:/opt/soft
scp /etc/profile root@spark02:/etc
scp /etc/profile root@spark03:/etc
在所有节点刷新环境变量
source /etc/profile
初始化集群数据目录
生成存储目录唯一 ID
kafka-storage.sh random-uuid
生成结果:
JfRaZDSORA2xK8pMSCa9AQ
用该 ID 格式化 kafka 存储目录
注意:在每个节点都要执行一次
kafka-storage.sh format -t JfRaZDSORA2xK8pMSCa9AQ \
-c /opt/soft/kafka/config/kraft/server.properties
执行结果:
Formatting /opt/soft/kraft-combined-logs with metadata.version 3.4-IV0.
启动停止
在每个节点分别启动
kafka-server-start.sh -daemon /opt/soft/kafka/config/kraft/server.properties
kafka-server-stop.sh
启动脚本
vim kafka-service.sh
#!/bin/bashcase $1 in
"start"){for i in spark01 spark02 spark03doecho ------------- kafka $i 启动 ------------ssh $i "/opt/soft/kafka/bin/kafka-server-start.sh -daemon /opt/soft/kafka/config/kraft/server.properties"done
}
;;
"stop"){for i in spark01 spark02 spark03doecho ------------- kafka $i 停止 ------------ssh $i "/opt/soft/kafka/bin/kafka-server-stop.sh"done
}
esac
命令行操作
主题命令行
查看操作主题命令参数
kafka-topics.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–create | 创建主题 |
–delete | 删除主题 |
–alter | 修改主题 |
–list | 查看所有主题 |
–describe | 查看主题详细描述 |
–partitions <Integer: # of partitions> | 设置分区数 |
–replication-factor<Integer: replication factor> | 设置分区副本 |
–config <String: name=value> | 更新系统默认的配置 |
查看当前服务器中的所有 topic
kafka-topics.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --list
创建 lihaozhe topic
选项说明:
–topic 定义 topic 名
–partitions 定义分区数
–replication-factor 定义副本数
kafka-topics.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 \
--topic lihaozhe --create --partitions 1 --replication-factor 3
查看主题详情
kafka-topics.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 \
--describe --topic lihaozhe
执行结果:
Topic: lihaozhe TopicId: kJWVrG0xQQSaFcrWGMYEGg PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: lihaozhe Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
修改分区数
注意:
分区数只能增加,不能减少
不能通过命令行的方式修改副本
kafka-topics.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 \
--alter --topic lihaozhe --partitions 3
执行成功后再次查看主题详细信息结果如下:
Topic: lihaozhe TopicId: kJWVrG0xQQSaFcrWGMYEGg PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: lihaozhe Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3Topic: lihaozhe Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: lihaozhe Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
生产者命令行
查看操作生产者命令参数
kafka-console-producer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–key.serializer | 指定发送消息的 key 的序列化类 一定要写全类名 |
–value.serializer | 指定发送消息的 value 的序列化类 一定要写全类名 |
–buffer.memory | RecordAccumulator 缓冲区总大小,默认 32Mb |
–batch.size | 缓冲区一批数据最大值,默认 16Kb。 适当增加该值,可以提高吞吐量, 但是如果该值设置太大,会导致数据传输延迟增加 |
–linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。 单位 ms,默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。 |
–acks | 0:生产者发送过来的数据,不需要等数据落盘应答 1:生产者发送过来的数据,Leader 收到数据后应答 -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答 默认值是-1,-1 和all 是等价的 |
–max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5, 开启幂等性要保证该值是 1-5 的数字 |
–retries | 当消息发送出现错误的时候,系统会重发消息 retries表示重试次数。默认是 int 最大值,2147483647 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了 |
–retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms |
–enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
–compression.type | 生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩 支持压缩类型:none、gzip、snappy、lz4 和 zstd |
发送消息
kafka-console-producer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe
消费者命令行
查看操作消费者命令参数
kafka-console-consumer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–from-beginning | 从头开始消费 |
–group <String: consumer group id> | 指定消费者组名称 |
消费 lihaozhe 主题中的数据
kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 \
--topic lihaozhe
把主题中所有的数据都读取出来
包括历史数据
kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 \
--topic lihaozhe --from-beginning
生产者
生产者发送数据流程
RecordAccumulator:每一个是生产上都会维护一个固定大小的内存空间,主要用于合并单条消息,进行批量发送,提高吞吐量,减少带宽消耗。
RecordAccumulator的大小是可配置的,可以配置buffer.memory来修改缓冲区大小,默认值为:33554432(32M)
RecordAccumulator内存结构分为两部分
第一部分为已经使用的内存,这一部分主要存放了很多的队列。
每一个主题的每一个分区都会创建一个队列,来存放当前分区下待发送的消息集合。
第二部分为未使用的内存,这一部分分为已经池化后的内存和未池化的整个剩余内存(nonPooledAvailableMemory)。
池化的内存的会根据batch.size(默认值为16K)的配置进行池化多个ByteBuffer,
放入一个队列中。所有的剩余空间会形成一个未池化的剩余空间。
java api
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lihaozhe</groupId><artifactId>kafka-code</artifactId><version>1.0.0</version><packaging>jar</packaging><name>kafka</name><url>http://maven.apache.org</url><properties><jdk.version>1.8</jdk.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.test.failure.ignore>true</maven.test.failure.ignore><maven.test.skip>true</maven.test.skip></properties><dependencies><!-- junit-jupiter-api --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>5.9.3</version><scope>test</scope></dependency><!-- junit-jupiter-engine --><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-engine</artifactId><version>5.9.3</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.20.0</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.31</version></dependency><dependency><groupId>com.github.binarywang</groupId><artifactId>java-testdata-generator</artifactId><version>1.1.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.name}</finalName><!--<outputDirectory>../package</outputDirectory>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.11.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-clean-plugin</artifactId><version>3.2.0</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>3.3.1</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>3.3.2</version></plugin><!-- 编译级别 --><!-- 打包的时候跳过测试junit begin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.2</version><configuration><skip>true</skip></configuration></plugin></plugins></build>
</project>
生产者
producer 异步发送数据到 topic 不带回调函数
com.lihaozhe.producer.AsyncProducer
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** producer 异步发送数据到 topic 不带回调函数* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducer {public static void main(String[] args) {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 5; i++) {producer.send(new ProducerRecord<>("lihaozhe", "李昊哲" + i));}// 4、释放资源producer.close();System.out.println("success");}
}
producer 同步发送数据到 topic 不带回调函数
com.lihaozhe.producer.SyncProducer
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** producer 同步发送数据到 topic 不带回调函数* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class syncProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 5; i++) {producer.send(new ProducerRecord<>("lihaozhe", "李昊哲" + i)).get();}// 4、释放资源producer.close();System.out.println("success");}
}
producer 异步发送数据到 topic 带回调函数
com.lihaozhe.producer.AsyncProducerCallback
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** producer 异步发送数据到 topic 回调函数* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerCallback {public static void main(String[] args) throws InterruptedException {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 500; i++) {producer.send(new ProducerRecord<>("lihaozhe", "李昊哲" + i), (metadata, exception) -> {if (exception == null){System.out.println("topic: " + metadata.topic() + "\tpartition: " + metadata.partition());}});Thread.sleep(2);}// 4、释放资源producer.close();System.out.println("success");}
}
producer 异步发送数据到 topic 指定分区号
com.lihaozhe.producer.AsyncProducerCallbackPartitions01
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** producer 异步发送数据到 topic 带回调函数* 指定分区号* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerCallbackPartitions01 {public static void main(String[] args) throws InterruptedException {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 500; i++) {// topic partion key valueproducer.send(new ProducerRecord<>("lihaozhe", 0, null, "李昊哲" + i), (metadata, exception) -> {if (exception == null) {System.out.println("topic: " + metadata.topic() + "\tpartition: " + metadata.partition());}});Thread.sleep(2);}// 4、释放资源producer.close();System.out.println("success");}
}
producer 异步发送数据到 topic 根据指定的 key 的 hash 值 对分区数取模
com.lihaozhe.producer.AsyncProducerCallbackPartitions02
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** producer 异步发送数据到 topic 带回调函数* 根据指定的 key 的 hash 值 对分区数取模* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerCallbackPartitions02 {public static void main(String[] args) throws InterruptedException {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 500; i++) {// 字符 a 的 hash 值为 97producer.send(new ProducerRecord<>("lihaozhe", "a", "李昊哲" + i), (metadata, exception) -> {if (exception == null) {System.out.println("topic: " + metadata.topic() + "\tpartition: " + metadata.partition());}});Thread.sleep(2);}// 4、释放资源producer.close();System.out.println("success");}
}
producer 异步发送数据到 topic 关联自定义分区器
自定义分区类
com.lihaozhe.producer.MyPartitioner
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 自定义分区器** @author 李昊哲* @version 1.0.0 2023/5/15 下午4:01*/
public class MyPartitioner implements Partitioner {/*** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes The serialized key to partition on( or null if no key)* @param value The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster The current cluster metadata* @return partition*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msg = value.toString();if (msg.contains("李哲")) {return 0;} else if (msg.contains("李昊哲")) {return 1;} else {return 2;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
com.lihaozhe.producer.AsyncProducerCallbackPartitions03
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Arrays;
import java.util.List;
import java.util.Properties;/*** producer 异步发送数据到 topic 带回调函数* 关联自定义分区器* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerCallbackPartitions03 {public static void main(String[] args) throws InterruptedException {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 关联自定义分区器 注意必须些完整类名字properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据List<String> names = Arrays.asList("李昊哲", "李哲", "李大宝");for (int i = 0; i < 500; i++) {// topic partion key valueproducer.send(new ProducerRecord<>("lihaozhe", names.get(i % names.size())), (metadata, exception) -> {if (exception == null) {System.out.println("topic: " + metadata.topic() + "\tpartition: " + metadata.partition());}});Thread.sleep(2);}// 4、释放资源producer.close();System.out.println("success");}
}
调整生产者发送参数
com.lihaozhe.producer.AsyncProducerParameters
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 调整生产者发送参数* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerParameters {public static void main(String[] args) {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 压缩 none, gzip, snappy, lz4, zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 5; i++) {producer.send(new ProducerRecord<>("lihaozhe", "李昊哲" + i));}// 4、释放资源producer.close();System.out.println("success");}
}
调整生产者发送参数 ack retries
com.lihaozhe.producer.AsyncProducerAck
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** producer 异步发送数据到 topic 带回调函数* 修改 ack retries* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerAck {public static void main(String[] args) throws InterruptedException {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// acksproperties.put(ProducerConfig.ACKS_CONFIG, "1");// retries 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3、发送数据for (int i = 0; i < 500; i++) {producer.send(new ProducerRecord<>("lihaozhe", "李昊哲" + i), (metadata, exception) -> {if (exception == null) {System.out.println("topic: " + metadata.topic() + "\tpartition: " + metadata.partition());}});Thread.sleep(2);}// 4、释放资源producer.close();System.out.println("success");}
}
事务
com.lihaozhe.producer.AsyncProducerTransactions
package com.lihaozhe.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** producer 异步发送数据到 topic 不带回调函数* 提前在控制台 打开消费者监听 命令如下* kafka-console-consumer.sh --bootstrap-server spark01:9092,spark02:9092,spark03:9092 --topic lihaozhe** @author 李昊哲* @version 1.0.0 2023/5/15 下午1:45*/
public class AsyncProducerTransactions {public static void main(String[] args) {// 1、基础配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "spark01:9092,spark02:9092,spark03:9092");// 指定对应的key和value的序列化类型 key.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 指定事务idproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_01");// 2、创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);producer.initTransactions();producer.beginTransaction();try {// 3、发送数据for (int i = 0; i < 5; i++) {producer.send(new ProducerRecord<>("lihaozhe", "李昊哲" + i));}// int i = 1 / 0;producer.commitTransaction();System.out.println("success");} catch (Exception e) {System.out.println("failed");producer.abortTransaction();} finally {// 4、释放资源producer.close();}}
}
相关文章:

kafka 从入门到精通
kafka 安装 zookeeper模式 创建软件目录 mkdir /opt/soft cd /opt/soft下载 wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz解压 tar -zxvf kafka_2.13-3.4.0.tgz 修改目录名称 mv kafka_2.13-3.4.0 kafka配置环境变量 vim /etc/profileexport K…...
写PPT没有思路, 这些底层方法论让你灵感爆棚……
作为一个10年经验的策划人,以下是个人多年经验,看完绝对对你写PPT会有很大帮助! 首先,有很多新手写PPT有一个不好的习惯,就是喜欢直接上手就打开PPT开始啪啪啪打字。 这是非常错误的,这就等于你是想到哪写…...

【小沐学Python】Python实现Web服务器(Flask+Vue+node.js,web单页增删改查)
文章目录 1、简介1.1 flask1.2 vue 2、开发2.1 新建flask项目2.2 安装flask库2.3 新建flask的主脚本2.4 新建Vue项目2.5 安装vue项目依赖项2.6 新增组件Ping.vue2.7 Ping.vue增加HTTP请求2.8 美化vue前端页面2.9 新增组件Books.vue2.10 flask增加路由Books2.11 Books.vue增加HT…...

甘肃非煤矿山电子封条 智慧矿山 opencv
甘肃非煤矿山电子封条 智慧煤矿接入国家矿山安全平台是通过pythonopencv网络模型,甘肃非煤矿山电子封条pythonopencv网络模型对关键位置(回风井口、运人井口、车辆出入口)对现场人员行为、数量、穿戴着装及设备状态各数据进行实时监控分析。p…...

工业识别与定位系统源码解决方案
工厂人员定位系统源码,工业领域定位系统源码 近年来人员定位系统在工业领域的发展势头迅猛,工业识别与定位成为促进制造业数字化的关键技术。通过实时定位可以判断所有的人、物、车的位置。实时定位系统要适用于复杂工业环境,单一技术是很难…...

PCL学习之滤波算法
前言 点云滤波作为常见的点云处理算法,一般是点云处理的第一步,对后续处理有很重要作用。滤波 有很多方面也有很多种功能,比如去除噪声点、离群点、点云平滑以及空洞、数据压缩等 原始点云数据往往包含大量散列点、孤立点,在获取…...
第二章 链表
目录 一、移除链表元素二、设计链表三、反转链表四、两两交换链表中的节点五、删除链表倒数第N个节点六、链表相交七、环形链表Ⅱ 一、移除链表元素 Leetcode 203 class Solution { public:ListNode* removeElements(ListNode* head, int val) {ListNode* dummyHead new Lis…...
Spring Security OAuth2实现单点登录:简化多个系统之间的登录流程
Spring Security OAuth2实现单点登录:简化多个系统之间的登录流程 一、介绍OAuth21. OAuth2的定义和作用2. OAuth2的优点和使用场景 二、Spring Security1. Spring Security的介绍2. Spring Security的特点和优势 三、OAuth2与Spring Security的结合1. OAuth2在Spri…...
语义分析器
语义分析器(Semantic Analyzer)是编译器中的一个重要组成部分,它负责对源代码进行语义分析,检查源代码是否符合语义规范,并进行错误处理和类型推导等操作。 举个例子,假设有以下的源代码: int…...

爬虫基本原理
爬虫基本原理 1.1获取网页1.1.1提取信息1.1.2保存数据 1.2请求1.2.1 请求方法1.2.2 请求网址1.2.3 请求头1.2.4请求体1.3响应 1.1获取网页 爬虫首先要做的工作就是获取网页,这里就是获取网页的源代码。源代码里包含了网页的部分有用信息,所以只要把源代…...

常见电子元器件和电路
目录 常见电子元器件一览表(字母标志)NTC(负温度系数热敏电阻)压敏电阻X2电容(抑制电源电磁干扰用电容器)泄放电阻共模电压共模电感整流桥滤波电容RCD吸收二极管Y电容整流器的原理输出整流肖特基二极管 功率晶体管(GTR,三极管)双极型晶体管(BJTÿ…...
English Learning - L3 Lesson1 VOA-Color 译文
听碎 VOA NOW, THE VOA SPECIAL ENGLISH PROGRAM WORDS AND THEIR STORIES Every people has its own way of saying things, its own special expressions. Many everyday American expressions are based on colors. 各国人民都有自己说话的方式,有自己独特的表…...

如何在linux中配置JDK环境变量
在linux系统部署皕杰报表,因皕杰报表是一款纯java报表工具,运行时需要jre环境,所以要在服务器上配置三个jdk环境变量path、classpath、JAVA_HOME。 那么为什么要配置jdk环境变量呢?因为java软件运行时要用到一些java命令ÿ…...

横截面收益率(二) 阿尔法策略是如何构建的
资本资产定价模型自从首次被提出以来在金融经济学中一直处于中心地位。 在一系列简化假定条件下,资本资产定价模型表明,任何证券的收益率与该证券 的系统性风险(或者贝塔值)呈线性关系。因此,依据资本资产定价模型横截…...
【ConfluxNews】2023.5.15 警惕任何未经合约审计的项目
1.【网络状态】当前版本V2.2.3,全网算力≈8T,昨日交易次数20K,昨日新增账户0.17K,昨日新增合约0个; 2.【POS参数】总锁仓275M,节点总数284,年利率13.7%(理论计算)&#x…...

MySQL学习---17、MySQL8其它新特性
1、MySQL新增特性 1.1 更简便的NoSQL支持 NoSQL泛指非关系型数据库和数据存储。随着互联网平台的规模飞速发展,传统的关系型数据库已经越来越不能瞒住需求。从5.6版本开始,MySQL就开始支持简单的NoSQL存储功能。MySQL 8对这一功能做了优化,…...

快速入门matlab——变量练习
学习目标:1.掌握matlab编程中最常用的几种变量类型 2.对变量类型的属性有所熟悉,不要求记忆,知道了解即可 3.要求熟练运用这几种变量类型创建自己的变量 clear all; % 清除Workspace中的所有…...

c++ 11标准模板(STL) std::set(三)
定义于头文件 <set> template< class Key, class Compare std::less<Key>, class Allocator std::allocator<Key> > class set;(1)namespace pmr { template <class Key, class Compare std::less<Key>> using se…...
ChatGPT详细介绍
ChatGPT: 自然语言处理的强大工具 ChatGPT是一种基于人工智能的自然语言处理模型,它是由OpenAI开发的一款先进的语言模型。ChatGPT基于GPT-3.5架构,具有强大的语言生成和理解能力。它被设计用于与人类进行自然对话,并提供广泛的应用场景。 …...
【算法】【算法杂谈】让[0,x)区间上的出现概率变为x^k
目录 前言问题介绍解决方案代码编写java语言版本c语言版本c语言版本 思考感悟写在最后 前言 当前所有算法都使用测试用例运行过,但是不保证100%的测试用例,如果存在问题务必联系批评指正~ 在此感谢左大神让我对算法有了新的感悟认识! 问题介…...
python版若依框架开发:集成Dash应⽤
python版若依框架开发 从0起步,扬帆起航。 python版若依部署代码生成指南,迅速落地CURD!项目结构解析前端开发规范后端开发规范集成Dash应⽤文章目录 python版若依框架开发后端部分1.安装 Dash2.在 sub_applications 目录下新建 dash_app.py ⽂件3.在 sub_applications/han…...

Orthanc:轻量级PACS服务器与DICOMweb支持的技术详解
🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C++, C#, Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C++、C#等开发语言,熟悉Java常用开发技术,能熟练应用常用数据库SQL server,Oracle,mysql,postgresql等进行开发应用…...
LLM基础2_语言模型如何文本编码
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 字节对编码(BPE) 上一篇博文说到 为什么GPT模型不需要[PAD]和[UNK]? GPT使用更先进的字节对编码(BPE),总能将词语拆分成已知子词 为什么需要BPE? 简…...
Wireshark使用教程(含安装包和安装教程)
Wireshark使用入门教程 0.资源下载以及软件安装1.Wireshark中无法显示网卡列表2.Wireshark抓取H264过程 0.资源下载以及软件安装 参考blog: 抓包神器wireshark安装保姆级教程 压缩包下载:Wireshark安装包 1.Wireshark中无法显示网卡列表 Wireshark中无法显示网…...

(LeetCode 动态规划(基础版))96. 不同的二叉搜索树 (递推 || 递归)
题目:96. 不同的二叉搜索树 思路:二叉树长度为n时,枚举每个点u作为根节点root,那么root左边的数构成左子树种数left,root右边的数构成右子树种数right,那么当前u为根节点下,二叉树的种数为left*…...

掌握Git核心:版本控制、分支管理与远程操作
前言 无论热爱技术的阅读者你是希望掌握Git的企业级应用,能够深刻理解Git操作过程及操作原理,理解工作区暂存区、版本库的含义;还是想要掌握Git的版本、分支管理,自由的进行版本回退、撤销、修改等Git操作方式与背后原理和通过分…...
next,react封装axios,http请求
import axios from axios;//声明一个基础接口变量1 let base_url; //配置开发环境 if (process.env.NODE_ENV development) {base_url "http://127.0.0.1/"; } // 配置生产环境 if (process.env.NODE_ENV production) {base_url "http://127.0.0.1/"; …...

moon游戏服务器-demo运行
下载地址 https://github.com/sniper00/MoonDemo redis安装 Redis-x64-3.0.504.msi 服务器配置文件 D:\gitee\moon_server_demo\serverconf.lua 貌似不修改也可以的,redis不要设置密码 windows编译 安装VS2022 Community 下载premake5.exe放MoonDemo\server\moon 双…...
Go 语言 := 运算符详解(短变量声明)
Go 语言 : 运算符详解(短变量声明) : 是 Go 语言中特有的短变量声明运算符(short variable declaration),它提供了简洁的声明并初始化变量的方式。这是 Go 语言中常用且强大的特性之一。 基本语法和用途 va…...
.Net Framework 4/C# 集合和索引器
一、ArrayList 类(集合) ArrayList 类位于 System.Collections 命名空间下,它可以动态地添加和删除元素。 ArrayList 提供了3个构造器,通过这3个构造器可以有3种声明方式: 默认构造器,将会以默认ÿ…...