2、Kafka 生产者
3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程
中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

3.1.2 生产者重要参数列表

3.2 异步发送 API
3.2.1 普通异步发送
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

2)代码编写
(1)创建工程 kafka
(2)导入依赖
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>
(3)创建包名:com.atguigu.kafka.producer
(4)编写不带回调函数的 API 代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throws
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new
ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}
测试:
①在 hadoop102 上开启 Kafka 消费者。
[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息
[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4
3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first",
"prince " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata,
Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}// 5. 关闭资源kafkaProducer.close();}
}
测试:
①在 hadoop102 上开启 Kafka 消费者。
[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4
③在 IDEA 控制台观察回调信息。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
3.3 同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {public static void main(String[] args) throws
InterruptedException, ExecutionException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息for (int i = 0; i < 10; i++) {// 异步发送 默认
// kafkaProducer.send(new
ProducerRecord<>("first","kafka" + i));// 同步发送kafkaProducer.send(new
ProducerRecord<>("first","kafka" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
测试:
①在 hadoop102 上开启 Kafka 消费者。
[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4
3.4 生产者分区
3.4.1 分区好处

3.4.2 生产者发送消息的分区策略
1)默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky
partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

2)案例一
将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first",
1,"","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,
Exception e) {if (e == null){System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}
测试:
①在 hadoop102 上开启 Kafka 消费者。
[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4
③在 IDEA 控制台观察回调信息。
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
3)案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取
余得到 partition 值。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
分别发往 1、2、0kafkaProducer.send(new ProducerRecord<>("first",
"a","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,
Exception e) {if (e == null){System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}
测试:
①key="a"时,在控制台查看结果。
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
②key="b"时,在控制台查看结果。
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
③key="f"时,在控制台查看结果。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
3.4.3 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {/* 返回信息对应的分区* @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[]
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")){partition = 0;}else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}
(3)使用分区器的方法,在生产者的配置中添加分区器参数。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws
InterruptedException {
Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first",
"prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,
Exception e) {if (e == null){System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}
(4)测试
①在 hadoop102 上开启 Kafka 消费者。
[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
②在 IDEA 控制台观察回调信息。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
3.5 生产经验——生产者如何提高吞吐量

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {public static void main(String[] args) throws
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、
lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
}
测试
①在 hadoop102 上开启 Kafka 消费者。
[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4
3.6 生产经验——数据可靠性
1)ack 应答原理



2)代码配置
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {public static void main(String[] args) throws
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
}
3.7 生产经验——数据去重
3.7.1 数据传递语义

3.7.2 幂等性
1)幂等性原理

2)如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。
3.7.3 生产者事务
1)Kafka 事务原理

2)Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
3)单个 Producer,使用事务保证消息的仅一次发送
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {public static void main(String[] args) throws
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());// 设置事务 id(必须),事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 发送消息kafkaProducer.send(new ProducerRecord<>("first",
"prince " + i));}
// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}
3.8 生产经验——数据有序

3.9 生产经验——数据乱序

相关文章:
2、Kafka 生产者
3.1 生产者消息发送流程 3.1.1 发送原理 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中…...
使用CDN构建读取缓存设计
在构建需要高吞吐量和最小响应时间的系统的API时,缓存几乎是不可避免的。每个在分布式系统上工作的开发人员都曾在某个时候使用过某种缓存机制。在本文中,我们将探讨如何使用CDN构建读取缓存设计,不仅可以优化您的API,还可以降低基…...
windows上下载github上的linux内核项目遇到的问题
问题一:clone的时候报错 Cloning into G:\github\linux... POST git-upload-pack (gzip 27925 to 14032 bytes) remote: Counting objects: 6012062, done. remote: Compressing objects: 100% (1031/1031), done. remote: Total 6012062 (delta 893), reused 342 (…...
Leetcode 15:三数之和
给你一个整数数组 nums ,判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k ,同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复的三元组。 注意:答案中不可以包含重复的三元组。 解题思…...
npm常用命令与操作篇
npm简介 npm是什么 npm 的英文是,node package manager,是 node 的包管理工具 为什么需要npm 类比建造汽车一样,如果发动机、车身、轮胎、玻璃等等都自己做的话,几十年也做不完。但是如果有不同的厂商,已经帮我们把…...
Go 语言的垃圾回收机制:自动化内存管理
在编程的世界中,内存管理一直是一个重要的问题。不正确的内存管理可能导致内存泄漏和程序崩溃。Go 语言以其高效的垃圾回收机制而闻名,使开发者从手动内存管理的烦恼中解脱出来。本文将深入探讨Go语言的垃圾回收机制,介绍它的工作原理以及如何…...
java-各种成员变量初始化过程-待完善
前置条件 一、本文章讨论的成员变量 public static final String aa "aa";public static final Integer bb 1;public static final Students cc new Students();public static String aa1 "aa";public static Integer bb1 1;public static String bb2…...
059:mapboxGL监听键盘事件,通过eastTo控制左右旋转
第059个 点击查看专栏目录 本示例是介绍演示如何在vue+mapbox中监听键盘事件,通过eastTo控制左右旋转。 本例通过easeTo方法来加减一定数值的bearing角度,通过.addEventListener的方法来监听键盘的按键动作。这里一定要设置interactive: false, 否则展现不出来旋转效果。 直…...
jdk对linux cgroup v2容器化环境识别情况
Linux各发行版将cgroups v2作为默认的情况如下: Container-Optimized OS(从 M97 开始)Ubuntu(从 21.10 开始,推荐 22.04)Debian GNU/Linux(从 Debian 11 Bullseye 开始)Fedora&…...
vue3后台管理系统之顶部tabbar组件搭建
1.1静态页面搭建 <template><div class"tabbar"><div class"tabbar_left"><!-- 面包屑 --><Breadcrumb /></div><div class"tabbar_right"><!-- 设置 --><Setting /></div></di…...
安装Apache2.4
二、安装配置Apache: 中文官网:Apache 中文网 官网 (p2hp.com) 我下的是图中那个版本,最新的64位 下载下后解压缩。如解压到D:\tool\Apache24 PS:特别要注意使用的场景和64位还是32位版本 2、修改Apcahe配置文件 2.1配置Apache…...
KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(9)
接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8) 上一回讲完了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,本文继续讲解framebuffer_check函数中的余下步骤。为了便于理解,再次贴出framebuffer_ch…...
Win10使用nginx,注册到服务设置自启与后台运行,解决 Access is denied 问题
安装 nginx 下载 nginx 官网下载页面:https://nginx.org/en/download.html直接选择当前最新 window 版本的或找到自己需要的版本下载即可 安装使用 下载完成后是有一个压缩包文件,直接解压到自己需要的目录下即可。是免安装的,解压即用简…...
短视频矩阵系统源头开发
一、智能剪辑、矩阵分发、无人直播、爆款文案于一体独立应用开发 抖去推----主要针对本地生活的----移动端(小程序软件系统,目前是全国源头独立开发),开发功能大拆解分享,功能大拆解: 7大模型剪辑法(数学阶乘&#x…...
《windows核心编程》第1章 错误处理
一、错误信息的获取 1.1 C库错误信息 1、获取错误信息 #include <stdio.h> #include <stdlib.h> #include <string.h>int main() {fopen("D:\\ASC", "r");printf("%s\n", strerror(errno));getchar();return 0; } 2、设置错…...
解剖—单链表相关OJ练习题
目录 一、移除链表元素 二、找出链表的中间节点 三、合并两个有序链表 四、反转链表 五、求链表中倒数第k个结点 六、链表分割 七、链表的回文结构 八、判断链表是否相交 九、判断链表中是否有环(一) 十、 判断链表中是否有环(二) 注:第六题和第七题牛…...
php对接飞书机器人
有同事接到对接飞书机器人任务,开发中遇到响应错误: {"code": 19021,"msg": "sign match fail or timestamp is not within one hour from current time" } 意思应该就是签名错误或者时间戳不在有效范围内等,…...
中间件安全-CVE复现IISApacheTomcatNginx漏洞复现
目录 中间件安全&CVE复现&IIS&Apache&Tomcat&Nginx漏洞复现中间件-IIS安全问题中间件-Nginx安全问题漏洞复现Nginx 解析漏洞复现Nginx 文件名逻辑漏洞 中间件-Apache-RCE&目录遍历&文件解析等安全问题漏洞复现漏洞复现CVE_2021_42013 RCE代码执行&…...
@ResponseBodyAdvice @RequestBodyAdivce失效
背景 最近项目要有向外部提供服务的能力,但是考虑到数据安全问题,要对接口进行加解密;实现加解密的方案有很多,比如过滤器、拦截器、继承RequestResponseBodyMethodProcessor什么的,不过我最近正在了解ResponseBodyAd…...
【c#】Quartz开源任务调度框架学习及练习Demo
Quartz开源任务调度框架学习及练习Demo 1、定义、作用 2、原理 3、使用步骤 4、使用场景 5、Demo代码参考示例 6、注意事项 7、一些Trigger属性说明 1、定义、作用 Quartz是一个开源的任务调度框架,作用是支持开发人员可以定时处理业务,比如定时…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
Rust 开发环境搭建
环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu 2、Hello World fn main() { println…...
