当前位置: 首页 > news >正文

2、Kafka 生产者

3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程
中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
在这里插入图片描述
3.1.2 生产者重要参数列表

在这里插入图片描述
3.2 异步发送 API
3.2.1 普通异步发送
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
在这里插入图片描述
2)代码编写
(1)创建工程 kafka
(2)导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

(3)创建包名:com.atguigu.kafka.producer
(4)编写不带回调函数的 API 代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3.3 同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {public static void main(String[] args) throws
InterruptedException, ExecutionException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息for (int i = 0; i < 10; i++) {// 异步发送 默认
// kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i));// 同步发送kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.4 生产者分区
3.4.1 分区好处
在这里插入图片描述
3.4.2 生产者发送消息的分区策略
1)默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

在这里插入图片描述
2)案例一
将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first", 
1,"","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3)案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取
余得到 partition 值。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
分别发往 120kafkaProducer.send(new ProducerRecord<>("first", 
"a","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试:
①key="a"时,在控制台查看结果。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

②key="b"时,在控制台查看结果。

主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2

③key="f"时,在控制台查看结果。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.4.3 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {/* 返回信息对应的分区* @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] 
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")){partition = 0;}else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws 
InterruptedException {
Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

(4)测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.5 生产经验——生产者如何提高吞吐量
在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、
lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
} 

测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

3.6 生产经验——数据可靠性
1)ack 应答原理
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2)代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
}

3.7 生产经验——数据去重
3.7.1 数据传递语义
在这里插入图片描述
3.7.2 幂等性
1)幂等性原理
在这里插入图片描述
2)如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。

3.7.3 生产者事务
1)Kafka 事务原理
在这里插入图片描述
2)Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

3)单个 Producer,使用事务保证消息的仅一次发送

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 设置事务 id(必须),事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 发送消息kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i));}
// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}

3.8 生产经验——数据有序
在这里插入图片描述
3.9 生产经验——数据乱序
在这里插入图片描述

相关文章:

2、Kafka 生产者

3.1 生产者消息发送流程 3.1.1 发送原理 在消息发送的过程中&#xff0c;涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator&#xff0c; Sender 线程不断从 RecordAccumulator 中…...

使用CDN构建读取缓存设计

在构建需要高吞吐量和最小响应时间的系统的API时&#xff0c;缓存几乎是不可避免的。每个在分布式系统上工作的开发人员都曾在某个时候使用过某种缓存机制。在本文中&#xff0c;我们将探讨如何使用CDN构建读取缓存设计&#xff0c;不仅可以优化您的API&#xff0c;还可以降低基…...

windows上下载github上的linux内核项目遇到的问题

问题一&#xff1a;clone的时候报错 Cloning into G:\github\linux... POST git-upload-pack (gzip 27925 to 14032 bytes) remote: Counting objects: 6012062, done. remote: Compressing objects: 100% (1031/1031), done. remote: Total 6012062 (delta 893), reused 342 (…...

Leetcode 15:三数之和

给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复的三元组。 注意&#xff1a;答案中不可以包含重复的三元组。 解题思…...

npm常用命令与操作篇

npm简介 npm是什么 npm 的英文是&#xff0c;node package manager&#xff0c;是 node 的包管理工具 为什么需要npm 类比建造汽车一样&#xff0c;如果发动机、车身、轮胎、玻璃等等都自己做的话&#xff0c;几十年也做不完。但是如果有不同的厂商&#xff0c;已经帮我们把…...

Go 语言的垃圾回收机制:自动化内存管理

在编程的世界中&#xff0c;内存管理一直是一个重要的问题。不正确的内存管理可能导致内存泄漏和程序崩溃。Go 语言以其高效的垃圾回收机制而闻名&#xff0c;使开发者从手动内存管理的烦恼中解脱出来。本文将深入探讨Go语言的垃圾回收机制&#xff0c;介绍它的工作原理以及如何…...

java-各种成员变量初始化过程-待完善

前置条件 一、本文章讨论的成员变量 public static final String aa "aa";public static final Integer bb 1;public static final Students cc new Students();public static String aa1 "aa";public static Integer bb1 1;public static String bb2…...

059:mapboxGL监听键盘事件,通过eastTo控制左右旋转

第059个 点击查看专栏目录 本示例是介绍演示如何在vue+mapbox中监听键盘事件,通过eastTo控制左右旋转。 本例通过easeTo方法来加减一定数值的bearing角度,通过.addEventListener的方法来监听键盘的按键动作。这里一定要设置interactive: false, 否则展现不出来旋转效果。 直…...

jdk对linux cgroup v2容器化环境识别情况

Linux各发行版将cgroups v2作为默认的情况如下&#xff1a; Container-Optimized OS&#xff08;从 M97 开始&#xff09;Ubuntu&#xff08;从 21.10 开始&#xff0c;推荐 22.04&#xff09;Debian GNU/Linux&#xff08;从 Debian 11 Bullseye 开始&#xff09;Fedora&…...

vue3后台管理系统之顶部tabbar组件搭建

1.1静态页面搭建 <template><div class"tabbar"><div class"tabbar_left"><!-- 面包屑 --><Breadcrumb /></div><div class"tabbar_right"><!-- 设置 --><Setting /></div></di…...

安装Apache2.4

二、安装配置Apache&#xff1a; 中文官网&#xff1a;Apache 中文网 官网 (p2hp.com) 我下的是图中那个版本&#xff0c;最新的64位 下载下后解压缩。如解压到D:\tool\Apache24 PS&#xff1a;特别要注意使用的场景和64位还是32位版本 2、修改Apcahe配置文件 2.1配置Apache…...

KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(9)

接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8) 上一回讲完了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,本文继续讲解framebuffer_check函数中的余下步骤。为了便于理解,再次贴出framebuffer_ch…...

Win10使用nginx,注册到服务设置自启与后台运行,解决 Access is denied 问题

安装 nginx 下载 nginx 官网下载页面&#xff1a;https://nginx.org/en/download.html直接选择当前最新 window 版本的或找到自己需要的版本下载即可 安装使用 下载完成后是有一个压缩包文件&#xff0c;直接解压到自己需要的目录下即可。是免安装的&#xff0c;解压即用简…...

短视频矩阵系统源头开发

一、智能剪辑、矩阵分发、无人直播、爆款文案于一体独立应用开发 抖去推----主要针对本地生活的----移动端(小程序软件系统&#xff0c;目前是全国源头独立开发)&#xff0c;开发功能大拆解分享&#xff0c;功能大拆解&#xff1a; 7大模型剪辑法&#xff08;数学阶乘&#x…...

《windows核心编程》第1章 错误处理

一、错误信息的获取 1.1 C库错误信息 1、获取错误信息 #include <stdio.h> #include <stdlib.h> #include <string.h>int main() {fopen("D:\\ASC", "r");printf("%s\n", strerror(errno));getchar();return 0; } 2、设置错…...

解剖—单链表相关OJ练习题

目录 一、移除链表元素 二、找出链表的中间节点 三、合并两个有序链表 四、反转链表 五、求链表中倒数第k个结点 六、链表分割 七、链表的回文结构 八、判断链表是否相交 九、判断链表中是否有环(一) 十、 判断链表中是否有环(二) 注&#xff1a;第六题和第七题牛…...

php对接飞书机器人

有同事接到对接飞书机器人任务&#xff0c;开发中遇到响应错误&#xff1a; {"code": 19021,"msg": "sign match fail or timestamp is not within one hour from current time" } 意思应该就是签名错误或者时间戳不在有效范围内等&#xff0c…...

中间件安全-CVE复现IISApacheTomcatNginx漏洞复现

目录 中间件安全&CVE复现&IIS&Apache&Tomcat&Nginx漏洞复现中间件-IIS安全问题中间件-Nginx安全问题漏洞复现Nginx 解析漏洞复现Nginx 文件名逻辑漏洞 中间件-Apache-RCE&目录遍历&文件解析等安全问题漏洞复现漏洞复现CVE_2021_42013 RCE代码执行&…...

@ResponseBodyAdvice @RequestBodyAdivce失效

背景 最近项目要有向外部提供服务的能力&#xff0c;但是考虑到数据安全问题&#xff0c;要对接口进行加解密&#xff1b;实现加解密的方案有很多&#xff0c;比如过滤器、拦截器、继承RequestResponseBodyMethodProcessor什么的&#xff0c;不过我最近正在了解ResponseBodyAd…...

【c#】Quartz开源任务调度框架学习及练习Demo

Quartz开源任务调度框架学习及练习Demo 1、定义、作用 2、原理 3、使用步骤 4、使用场景 5、Demo代码参考示例 6、注意事项 7、一些Trigger属性说明 1、定义、作用 Quartz是一个开源的任务调度框架&#xff0c;作用是支持开发人员可以定时处理业务&#xff0c;比如定时…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

vscode里如何用git

打开vs终端执行如下&#xff1a; 1 初始化 Git 仓库&#xff08;如果尚未初始化&#xff09; git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

C++ 基础特性深度解析

目录 引言 一、命名空间&#xff08;namespace&#xff09; C 中的命名空间​ 与 C 语言的对比​ 二、缺省参数​ C 中的缺省参数​ 与 C 语言的对比​ 三、引用&#xff08;reference&#xff09;​ C 中的引用​ 与 C 语言的对比​ 四、inline&#xff08;内联函数…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...