当前位置: 首页 > news >正文

2、Kafka 生产者

3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程
中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
在这里插入图片描述
3.1.2 生产者重要参数列表

在这里插入图片描述
3.2 异步发送 API
3.2.1 普通异步发送
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
在这里插入图片描述
2)代码编写
(1)创建工程 kafka
(2)导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

(3)创建包名:com.atguigu.kafka.producer
(4)编写不带回调函数的 API 代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3.3 同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {public static void main(String[] args) throws
InterruptedException, ExecutionException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息for (int i = 0; i < 10; i++) {// 异步发送 默认
// kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i));// 同步发送kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.4 生产者分区
3.4.1 分区好处
在这里插入图片描述
3.4.2 生产者发送消息的分区策略
1)默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

在这里插入图片描述
2)案例一
将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first", 
1,"","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3)案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取
余得到 partition 值。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
分别发往 120kafkaProducer.send(new ProducerRecord<>("first", 
"a","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试:
①key="a"时,在控制台查看结果。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

②key="b"时,在控制台查看结果。

主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2

③key="f"时,在控制台查看结果。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.4.3 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {/* 返回信息对应的分区* @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] 
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")){partition = 0;}else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws 
InterruptedException {
Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

(4)测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.5 生产经验——生产者如何提高吞吐量
在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、
lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
} 

测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

3.6 生产经验——数据可靠性
1)ack 应答原理
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2)代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
}

3.7 生产经验——数据去重
3.7.1 数据传递语义
在这里插入图片描述
3.7.2 幂等性
1)幂等性原理
在这里插入图片描述
2)如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。

3.7.3 生产者事务
1)Kafka 事务原理
在这里插入图片描述
2)Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

3)单个 Producer,使用事务保证消息的仅一次发送

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 设置事务 id(必须),事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 发送消息kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i));}
// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}

3.8 生产经验——数据有序
在这里插入图片描述
3.9 生产经验——数据乱序
在这里插入图片描述

相关文章:

2、Kafka 生产者

3.1 生产者消息发送流程 3.1.1 发送原理 在消息发送的过程中&#xff0c;涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator&#xff0c; Sender 线程不断从 RecordAccumulator 中…...

使用CDN构建读取缓存设计

在构建需要高吞吐量和最小响应时间的系统的API时&#xff0c;缓存几乎是不可避免的。每个在分布式系统上工作的开发人员都曾在某个时候使用过某种缓存机制。在本文中&#xff0c;我们将探讨如何使用CDN构建读取缓存设计&#xff0c;不仅可以优化您的API&#xff0c;还可以降低基…...

windows上下载github上的linux内核项目遇到的问题

问题一&#xff1a;clone的时候报错 Cloning into G:\github\linux... POST git-upload-pack (gzip 27925 to 14032 bytes) remote: Counting objects: 6012062, done. remote: Compressing objects: 100% (1031/1031), done. remote: Total 6012062 (delta 893), reused 342 (…...

Leetcode 15:三数之和

给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复的三元组。 注意&#xff1a;答案中不可以包含重复的三元组。 解题思…...

npm常用命令与操作篇

npm简介 npm是什么 npm 的英文是&#xff0c;node package manager&#xff0c;是 node 的包管理工具 为什么需要npm 类比建造汽车一样&#xff0c;如果发动机、车身、轮胎、玻璃等等都自己做的话&#xff0c;几十年也做不完。但是如果有不同的厂商&#xff0c;已经帮我们把…...

Go 语言的垃圾回收机制:自动化内存管理

在编程的世界中&#xff0c;内存管理一直是一个重要的问题。不正确的内存管理可能导致内存泄漏和程序崩溃。Go 语言以其高效的垃圾回收机制而闻名&#xff0c;使开发者从手动内存管理的烦恼中解脱出来。本文将深入探讨Go语言的垃圾回收机制&#xff0c;介绍它的工作原理以及如何…...

java-各种成员变量初始化过程-待完善

前置条件 一、本文章讨论的成员变量 public static final String aa "aa";public static final Integer bb 1;public static final Students cc new Students();public static String aa1 "aa";public static Integer bb1 1;public static String bb2…...

059:mapboxGL监听键盘事件,通过eastTo控制左右旋转

第059个 点击查看专栏目录 本示例是介绍演示如何在vue+mapbox中监听键盘事件,通过eastTo控制左右旋转。 本例通过easeTo方法来加减一定数值的bearing角度,通过.addEventListener的方法来监听键盘的按键动作。这里一定要设置interactive: false, 否则展现不出来旋转效果。 直…...

jdk对linux cgroup v2容器化环境识别情况

Linux各发行版将cgroups v2作为默认的情况如下&#xff1a; Container-Optimized OS&#xff08;从 M97 开始&#xff09;Ubuntu&#xff08;从 21.10 开始&#xff0c;推荐 22.04&#xff09;Debian GNU/Linux&#xff08;从 Debian 11 Bullseye 开始&#xff09;Fedora&…...

vue3后台管理系统之顶部tabbar组件搭建

1.1静态页面搭建 <template><div class"tabbar"><div class"tabbar_left"><!-- 面包屑 --><Breadcrumb /></div><div class"tabbar_right"><!-- 设置 --><Setting /></div></di…...

安装Apache2.4

二、安装配置Apache&#xff1a; 中文官网&#xff1a;Apache 中文网 官网 (p2hp.com) 我下的是图中那个版本&#xff0c;最新的64位 下载下后解压缩。如解压到D:\tool\Apache24 PS&#xff1a;特别要注意使用的场景和64位还是32位版本 2、修改Apcahe配置文件 2.1配置Apache…...

KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(9)

接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8) 上一回讲完了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,本文继续讲解framebuffer_check函数中的余下步骤。为了便于理解,再次贴出framebuffer_ch…...

Win10使用nginx,注册到服务设置自启与后台运行,解决 Access is denied 问题

安装 nginx 下载 nginx 官网下载页面&#xff1a;https://nginx.org/en/download.html直接选择当前最新 window 版本的或找到自己需要的版本下载即可 安装使用 下载完成后是有一个压缩包文件&#xff0c;直接解压到自己需要的目录下即可。是免安装的&#xff0c;解压即用简…...

短视频矩阵系统源头开发

一、智能剪辑、矩阵分发、无人直播、爆款文案于一体独立应用开发 抖去推----主要针对本地生活的----移动端(小程序软件系统&#xff0c;目前是全国源头独立开发)&#xff0c;开发功能大拆解分享&#xff0c;功能大拆解&#xff1a; 7大模型剪辑法&#xff08;数学阶乘&#x…...

《windows核心编程》第1章 错误处理

一、错误信息的获取 1.1 C库错误信息 1、获取错误信息 #include <stdio.h> #include <stdlib.h> #include <string.h>int main() {fopen("D:\\ASC", "r");printf("%s\n", strerror(errno));getchar();return 0; } 2、设置错…...

解剖—单链表相关OJ练习题

目录 一、移除链表元素 二、找出链表的中间节点 三、合并两个有序链表 四、反转链表 五、求链表中倒数第k个结点 六、链表分割 七、链表的回文结构 八、判断链表是否相交 九、判断链表中是否有环(一) 十、 判断链表中是否有环(二) 注&#xff1a;第六题和第七题牛…...

php对接飞书机器人

有同事接到对接飞书机器人任务&#xff0c;开发中遇到响应错误&#xff1a; {"code": 19021,"msg": "sign match fail or timestamp is not within one hour from current time" } 意思应该就是签名错误或者时间戳不在有效范围内等&#xff0c…...

中间件安全-CVE复现IISApacheTomcatNginx漏洞复现

目录 中间件安全&CVE复现&IIS&Apache&Tomcat&Nginx漏洞复现中间件-IIS安全问题中间件-Nginx安全问题漏洞复现Nginx 解析漏洞复现Nginx 文件名逻辑漏洞 中间件-Apache-RCE&目录遍历&文件解析等安全问题漏洞复现漏洞复现CVE_2021_42013 RCE代码执行&…...

@ResponseBodyAdvice @RequestBodyAdivce失效

背景 最近项目要有向外部提供服务的能力&#xff0c;但是考虑到数据安全问题&#xff0c;要对接口进行加解密&#xff1b;实现加解密的方案有很多&#xff0c;比如过滤器、拦截器、继承RequestResponseBodyMethodProcessor什么的&#xff0c;不过我最近正在了解ResponseBodyAd…...

【c#】Quartz开源任务调度框架学习及练习Demo

Quartz开源任务调度框架学习及练习Demo 1、定义、作用 2、原理 3、使用步骤 4、使用场景 5、Demo代码参考示例 6、注意事项 7、一些Trigger属性说明 1、定义、作用 Quartz是一个开源的任务调度框架&#xff0c;作用是支持开发人员可以定时处理业务&#xff0c;比如定时…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

JavaScript 数据类型详解

JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型&#xff08;Primitive&#xff09; 和 对象类型&#xff08;Object&#xff09; 两大类&#xff0c;共 8 种&#xff08;ES11&#xff09;&#xff1a; 一、原始类型&#xff08;7种&#xff09; 1. undefined 定…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?

Pod IP 的本质与特性 Pod IP 的定位 纯端点地址&#xff1a;Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址&#xff08;如 10.244.1.2&#xff09;无特殊名称&#xff1a;在 Kubernetes 中&#xff0c;它通常被称为 “Pod IP” 或 “容器 IP”生命周期&#xff1a;与 Pod …...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...

软件工程 期末复习

瀑布模型&#xff1a;计划 螺旋模型&#xff1a;风险低 原型模型: 用户反馈 喷泉模型:代码复用 高内聚 低耦合&#xff1a;模块内部功能紧密 模块之间依赖程度小 高内聚&#xff1a;指的是一个模块内部的功能应该紧密相关。换句话说&#xff0c;一个模块应当只实现单一的功能…...