【2024】kafka streams的详细使用与案例练习(2)
目录
- 前言
- 使用
- 1、整体结构
- 1.1、序列化
- 2、 Kafka Streams 常用的 API
- 2.1、 StreamsBuilder
- 2.2、 KStream 和 KTable
- 2.3、 filter和 filterNot
- 2.4、 map 和 mapValues
- 2.5、 flatMap 和 flatMapValues
- 2.6、 groupByKey 和 groupBy
- 2.7、 count、reduce 和 aggregate
- 2.8、 join 和 leftJoin
- 2.9、 to 和 toTable
- 2.10、 foreach
- 3、简单案例练习
- 3.1、通过streams实现数据流处理,把字符串装为大写
- 3.2、map 使用,把内容转为key,把value转为内容长度
- 3.3、filter和 filterNot使用过滤消息
- 3.4、通过selectKey配置key
- 4、复杂对象案例练习
- 4.1、自定义序列化
- 4.2、发送消息
- 4.3、通过KafkaStreams接收复杂对象,并且使用split
前言
上一篇文章已经对kafka streams
进行了大致的介绍以及简单案例使用,这一片文章主要是对kafka streams
常用API的介绍以及使用,如果需要看概念介绍的可以看
Kafka Streams详细介绍与具体使用
下面直接开始进行kafka streams
使用操作
使用
1、整体结构
在我们整体处理
streams
时,总共就分为三部分
第一部分是创建配置,告诉kafka我们的连接信息,通过StreamsConfig传递,然后创建一个StreamsBuilder类用于构建
kafka streams
拓扑的主要类。该类主要用于定义数据流处理的拓扑结构。
还有就是通过Serde
进行一个序列化第二部分主要是构建流处理拓扑,通过
StreamsBuilder
类得到源Processor
处理器也就是KStream
类,然后通过KStream的API方法得到不同的Processor
处理器(重点)第三部分通过配置流处理拓扑,创建流处理实例,然后通过
kafkaStreams.start()
方法启动,结束时再通过kafkaStreams.close()
关闭
我们在使用时,1和3基本上都是相同的,一般我们要处理的就是2,根据不同的业务需求,我们得到不同的Processor
处理器,然后发送到不同的topic
1.1、序列化
在使用Kafka Streams
时,我们需要把从topic接收到的消息进行序列化,然后再把返回topic的消息进行反序列化,
可以和kafka发送一样使用Properties类直接定义全部的,但这样会有很大的局限性,因为我们发送给不同的处理过的topic消息往往都会是不同的类型,所以我们会使用到Serde进行该操作,再每次和topic进行读取或写入消息时通过Serde进行指定key和消息的类型。
基础数据类型都有默认的写好的通过Serdes方法可以获得,如果需要定义复杂的则需要自己定义一个序列化和反序列化的类。
如下:
Serde<String> stringSerde = Serdes.String()
:String类型Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer)
:自定义复杂类型JsonSerializer<Purchase> serializer = new JsonSerializer<>();
:自定义的序列化类JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);
:反序列化类
kafka也有默认的可以直接用的:org.springframework.kafka.support.serializer.JsonSerializer
,但一般建议自定义序列化
2、 Kafka Streams 常用的 API
2.1、 StreamsBuilder
StreamsBuilder
是构建 Kafka Streams 拓扑的主要类。它用于定义数据流处理的拓扑结构。
StreamsBuilder builder = new StreamsBuilder();
2.2、 KStream 和 KTable
KStream
:表示一个无界的数据流,每个记录都是一个键值对。
KStream<String, String> stream = builder.stream("input-topic");
KTable
:表示一个表,每个键对应一个最新的值。
KTable<String, Long> table = builder.table("input-topic");
2.3、 filter和 filterNot
filter
:根据条件过滤记录。
KStream<String, String> filteredStream = stream.filter((key, value) -> value.contains("important"));
filterNot
:过滤掉不符合条件的记录。
KStream<String, String> filteredStream = stream.filterNot((key, value) -> value.contains("unimportant"));
2.4、 map 和 mapValues
map
:对每个记录的键和值进行转换。
KStream<String, Integer> mappedStream = stream.map((key, value) -> new KeyValue<>(key, value.length()));
mapValues
:仅对记录的值进行转换。
KStream<String, Integer> mappedStream = stream.mapValues(value -> value.length());
2.5、 flatMap 和 flatMapValues
flatMap
:将每个输入记录映射为零个或多个输出记录。
KStream<String, String> flatMappedStream = stream.flatMap((key, value) -> {List<KeyValue<String, String>> result = new ArrayList<>();for (String word : value.split(" ")) {result.add(new KeyValue<>(key, word));}return result;});
flatMapValues
:仅将每个输入记录的值映射为零个或多个输出值。
****KStream<String, String> flatMappedStream = stream.flatMapValues(value -> Arrays.asList(value.split(" ")));
2.6、 groupByKey 和 groupBy
groupByKey
:根据记录的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupByKey();
groupBy
:根据新的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupBy((key, value) -> value);
2.7、 count、reduce 和 aggregate
count
:计算每个分组中的记录数。
KTable<String, Long> countTable = groupedStream.count();
reduce
:对每个分组中的记录进行归约操作。
KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + newValue);
aggregate
:自定义聚合操作。
KTable<String, Integer> aggregatedTable = groupedStream.aggregate(() -> 0,(key, value, aggregate) -> aggregate + value.length(),Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-store"));
2.8、 join 和 leftJoin
join
:对两个流或表进行内连接操作。
KStream<String, String> joinedStream = stream.join(otherStream,(value1, value2) -> value1 + value2,JoinWindows.of(Duration.ofMinutes(5)));
leftJoin
:对两个流或表进行左连接操作。
KStream<String, String> leftJoinedStream = stream.leftJoin(otherStream,(value1, value2) -> value1 + (value2 == null ? "" : value2),JoinWindows.of(Duration.ofMinutes(5)));
2.9、 to 和 toTable
to
:将流中的记录写入到Kafka主题。
**stream.to("output-topic");**
toTable
:将流转换为表。
KTable<String, String> table = stream.toTable();
2.10、 foreach
对每个记录执行一个操作,但不返回新的流。
stream.foreach((key, value) -> System.out.println(key + ": " + value));
上面这些就是KafkaStreams常用到的一些API,通过组合使用这些API,你可以构建复杂的流处理拓扑,以满足各种数据处理需求。
3、简单案例练习
使用脚本
#先进入容器
docker exec -it kafka-server /bin/bash#创建topic(把ip换为自己的)
/opt/kafka/bin/kafka-topics.sh --create --topic sell.purchase.transaction --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1# 进入生产者发消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic#进入消费者监听
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction
3.1、通过streams实现数据流处理,把字符串装为大写
@Slf4j
public class KafkaStreamsYellingApp {
// appidprivate final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "out-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {//1、配置客户端和序列化器
// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();//2、构建流处理拓扑
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStream<String, String> upperStream = inputStream.peek((key, value) -> {log.info("[收集]key:{},value:{}", key, value);}).filter((key, value) -> value.length() > 5).mapValues(time -> time.toUpperCase()).peek((key, value) -> log.info("[过滤结束]key:{},value:{}", key, value));
// 日志打印upperStream处理器的数据upperStream.print(Printed.toSysOut());
// 把upperStream处理器的数据输出到指定的topic中upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));//3、通过配置流处理拓扑,创建流处理实例KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}
}
3.2、map 使用,把内容转为key,把value转为内容长度
@Slf4j
public class KafkaStreamsYellingApp2 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStream<String, Integer> k1 = inputStream.map((noKey, value) -> KeyValue.pair(value, value.length()));k1.print(Printed.<String,Integer>toSysOut().withLabel("map"));// k1.to(OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}}
3.3、filter和 filterNot使用过滤消息
/*** filter和 filterNot使用*/
@Slf4j
public class KafkaStreamsYellingApp3 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//inputStream.filter((key, value) -> (value.contains("kafka")), Named.as("filtering-processor")).print(Printed.<String,String>toSysOut().withLabel("filtering"));inputStream.filterNot((key, value) -> (value.contains("kafka")), Named.as("filtering-not-processor")).print(Printed.<String,String>toSysOut().withLabel("filtering-not"));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}
}
3.4、通过selectKey配置key
@Slf4j
public class KafkaStreamsYellingApp4 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//inputStream.flatMap( //把一个消息的内容通过转为多条消息以map的方式返回,(key, value) -> Arrays.stream(value.split(" ")) //通过使用java的stream把内容转为map.map(e -> KeyValue.pair(e, e.length())).collect(Collectors.toList())).print(Printed.<String,Integer>toSysOut().withLabel("flatMap"));inputStream.flatMapValues( //不改变key,直接转换valuevalue -> Arrays.stream(value.split(" ")).map(String::toUpperCase).toList()).print(Printed.<String,String>toSysOut().withLabel("flatMapValues"));// 配置keyinputStream.selectKey((key, value) -> value).print(Printed.<String,String>toSysOut().withLabel("selectKey"));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}}
4、复杂对象案例练习
4.1、自定义序列化
对接收topic的消息到拓扑还是发送消息到topic都需要进行序列化和反序列化
- 序列化
public class JsonSerializer<T> implements Serializer<T> {private Gson gson= new Gson();public void configure(Map<String ,?> map, boolean b) {}public byte[] serialize(String topic, T t) {return gson.toJson(t).getBytes();}@Overridepublic void close() {Serializer.super.close();}
}
- 反序列化
/*** 反序列化* @param <T>*/
public class JsonDeserializer<T> implements Deserializer<T> {private Gson gson= new Gson();private Class<T> deserializeClass;public JsonDeserializer(Class<T> deserializeClass){this.deserializeClass=deserializeClass;}public JsonDeserializer(){}@Override@SuppressWarnings("unchecked")public void configure(Map<String,?> map, boolean b){if (deserializeClass == null){deserializeClass = (Class<T>) map.get("serializedClass");}}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null){return null;}return gson.fromJson(new String(data),deserializeClass);}@Overridepublic void close() {}
}
4.2、发送消息
/*** 生产消息到kafka*/
@Slf4j
public class MyProducer {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private final static String TOPIC_NAME = "production-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();
// 设置参数props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
// 设置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 连接客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 5; i++) {Purchase purchase = new Purchase();purchase.setId("12431234253");purchase.setDate(new Date().toString());purchase.setName("苹果");purchase.setPrice(100.0+i);String json = new JSONObject(purchase).toString();ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);// 同步send(producer,producerRecord);}}/*** @param producer: 客户端对象* @return void* 同步发送* @date 2024/3/22 17:09*/private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {// 等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}}
4.3、通过KafkaStreams接收复杂对象,并且使用split
split使用介绍
BranchedKStream<K, V> split(final Named named)
:split方法一个参数,该参数主要是用来定义分裂后的分支的一个统一前缀。会返回一个BranchedKStream
对象。
BranchedKStream
:主要用来使用分裂校验的
branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched)
:会接收两个参数,前面的是Predicate
用做校验判断作为判断条件的,后面的是一个Branched
对象,用作处理满足该分支条件拆分出来的消息进行处理defaultBranch(Branched<K, V> branched)
:作为结尾方法用作对不满足前面的全部branch
条件的消息,进行一个最后处理noDefaultBranch()
:结尾方法,表示对接下来的消息不做处理
Predicate
:判断验证条件,满足该条件的消息会被拆分出来到当前分支
Branched
:对分裂到当前分支的消息进行处理
as(final String name)
:给该分支添加一个名字,不做处理。withFunction()
:对该分支的消息进行处理,然后会把处理后的消息返回到结果的map中去withConsumer()
:对该分支的消息进行处理,不会把消息返回回去
- 具体代码
在测试使用前需要把几个topic先创建出来
@Slf4j
public class ZMartKafkaStreamsApp {private final static String APPLICATION_ID = "ZMart_app_id";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private final static String TOPIC_NAME = "production-topic";private final static String APPLE_TOPIC_NAME = "apple-topic";private final static String WATERMELON_TOPIC_NAME = "watermelon-topic";private final static String OUT_TOPIC_NAME = "out-topic";public static void main(String[] args) throws InterruptedException {StreamsConfig streamsConfig = new StreamsConfig(getProperties());JsonSerializer<Purchase> serializer = new JsonSerializer<>();JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer);Serde<String> stringSerde = Serdes.String();StreamsBuilder builder = new StreamsBuilder();KStream<String, Purchase> inputStream = builder.stream(TOPIC_NAME, Consumed.with(stringSerde, propertiesSerde));// 谓词条件 判断把流输送到哪个分支上Predicate<String, Purchase> isWatermelon = (key, value) -> {String name = value.getName();return name.equals("西瓜");};// split:分裂流Map<String, KStream<String, Purchase>> stringKStreamMap = inputStream.peek((k,v)-> log.info("[分裂消息===>] k:{},value:{}" ,k,v)).split(Named.as("split-"))//拼接的key前缀.branch(isWatermelon, Branched.withFunction(ks -> { //对满足isWatermelon条件的分支的消息进行处理的处理器
// ks.print(Printed.<String, Purchase>toSysOut().withLabel("西瓜分支"));return ks.mapValues(v -> {String modifiedNumber = v.getId().replaceAll("(?<=\\d)\\d(?=\\d)", "*");v.setId( modifiedNumber);return v;});},"watermelon")) //这个地方需要指定name,用作为返回的map的key 如该分支存放到返回的map的key为:split-watermelon.branch((key, value) -> value.getName().equals("苹果"),Branched.withConsumer( //不满足上面的条件的会继续向下匹配,满足条件直接发送ks -> ks.peek((k, v) -> log.info("[苹果分支] value:{}" ,v)).to(APPLE_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"apple")).defaultBranch(Branched.withConsumer( //把不满足前面所以branch条件的消息发送到默认主题ks -> ks.to(OUT_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"defaultBranch"));// 把上面的消息打印出来stringKStreamMap.forEach((s, kStream) ->kStream.print(Printed.<String, Purchase>toSysOut().withLabel(s)));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
// jvm关闭时,把流也关闭CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();latch.countDown();log.info("The Kafka Streams 执行关闭!");}));kafkaStreams.start();log.info("kafka streams 启动成功!>>>>");latch.await();}@NotNullprivate static Properties getProperties() {Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);return properties;}
}
相关文章:

【2024】kafka streams的详细使用与案例练习(2)
目录 前言使用1、整体结构1.1、序列化 2、 Kafka Streams 常用的 API2.1、 StreamsBuilder2.2、 KStream 和 KTable2.3、 filter和 filterNot2.4、 map 和 mapValues2.5、 flatMap 和 flatMapValues2.6、 groupByKey 和 groupBy2.7、 count、reduce 和 aggregate2.8、 join 和 …...

qt 简单实验 读取json格式的配置文件
1.概要 2.代码 //#include "mainwindow.h"#include <QApplication> #include <QFile> #include <QJsonDocument> #include <QJsonObject> #include <QDebug> //读取json数据的配置文件QJsonObject readJsonConfigFile(const QString …...

Docker常用命令与实战示例
docker 1. 安装2. 常用命令3. 存储4. 网络5. redis主从复制示例6. wordpress示例7. DockerFile8. 一键安装超多中间件(compose) 1. 安装 以centOS系统为例 # 移除旧版本docker sudo yum remove docker \docker-client \docker-client-latest \docker-c…...
数据结构(基础知识)
基础概念: 数据:数据是信息的载体,是描述客观事物属性的数,字符及所有能输入到计算机中并被计算机程序识别和处理的符号的集合 数据元素:是数据的基本单位,在程序中常作为一个整体来考虑 数据对象&#…...

计算机网络:网络层 - 路由选择协议
计算机网络:网络层 - 路由选择协议 路由器的结构路由选择协议概述自治系统 AS内部网关协议路由信息协议 RIP距离向量算法RIP报文格式收敛问题 开放最短路径优先 OSPF基本工作原理自治系统分区 外部网关协议BGP-4 路由器的结构 如图所示,路由器被分为路由…...

JupyterLab使用指南(六):JupyterLab的 Widget 控件
1. 什么是 Widget 控件 JupyterLab 中的 Widget 控件是一种交互式的小部件,可以用于创建动态的、响应用户输入的界面。通过使用 ipywidgets 库,用户可以在 Jupyter notebook 中创建滑块、按钮、文本框、选择器等控件,从而实现数据的交互式展…...

OpenCV 特征点检测与匹配
一 OpenCV特征场景 ①图像搜索,如以图搜图; ②拼图游戏; ③图像拼接,将两长有关联得图拼接到一起; 1 拼图方法 寻找特征 特征是唯一的 可追踪的 能比较的 二 角点 在特征中最重要的是角点 灰度剃度的最大值对应的…...

css布局之flex应用
/*父 100*/.parent-div {/* 这里添加你想要的属性 */display: flex;flex-direction: row; //行justify-content: space-between; //左右对齐align-items: center;flex-wrap: wrap; //换行}/*中 90 10 */.middle-div {/* 这里添加你想要的属性 */display: flex;flex-direction:…...
树莓派4B设置AP热点步骤
树莓派4B设置AP热点步骤:先进入root模式 预先进行apt-get update 第1步:安装network-manager sudo apt-get install network-manager第2步:安装git apt-get install git apt-get install util-linux procps hostapd iproute2 iw haveged …...

Java程序之百鸡百钱问题
题目: 百钱买百鸡的问题算是一套非常经典的不定方程的问题,题目很简单:公鸡5文钱一只,母鸡3文钱一只,小鸡3只一文钱,用100文钱买一百只鸡,其中公鸡,母鸡,小鸡都必须要有,…...

Mybatis——动态sql
if标签 用于判断条件是否成立。使用test属性进行条件判断,如果条件为true,则拼接sql。 <where>标签用于识别语句是否需要连接词and,识别sql语句。 package com.t0.maybatisc.mapper;import com.t0.maybatisc.pojo.Emp; import org.a…...

可视化大屏开发系列——页面布局
页面布局是可视化大屏的基础,想要拥有一个基本美观的大屏,就得考虑页面整体模块的宽高自适应,我们自然就会想到具有强大灵活性flex布局,再借助百分比布局来辅助。至此,大屏页面布局问题即可得到解决。 可视化大屏开发系…...
Python statistics 模块
Python 的 statistics 模块提供了一组用于执行各种统计计算的函数,包括平均值、中位数、标准差、方差以及其他统计量。让我来简单介绍一下。 首先,你可以使用以下方式导入 statistics 模块: python import statistics 接下来,…...
wireshark常见使用表达式
目录 1. 捕获过滤器 (Capture Filters)基本捕获过滤器组合捕获过滤器 2. 显示过滤器 (Display Filters)基本显示过滤器复杂显示过滤器协议特定显示过滤器 3. 进阶显示过滤器技巧使用函数和操作符逻辑操作符 4. 常见网络协议过滤表达式示例HTTP 协议HTTPS 协议DNS 协议DHCP 协议…...

用Java获取键盘输入数的个十百位数
这段Java代码是一个简单的程序,用于接收用户输入的一个三位数,并将其分解为个位、十位和百位数字,然后分别打印出来。下面是代码的详细解释: 导入所需类库: import java.util.Scanner;:导入Scanner类,用于从…...

第10章 启动过程组 (制定项目章程)
第10章 启动过程组 9.1制定项目章程,在第三版教材第356~360页; 文字图片音频方式 视频12 第一个知识点:主要输出 1、项目章程(重要知识点) 项目目的 为了稳定与发展公司的客户群(抽象,非具体) 可测量的项目…...

html侧导航栏客服栏
ico 替换 ICO <html xmlns"http://www.w3.org/1999/xhtml"><head><meta http-equiv"Content-Type" content"text/html; charsetutf-8"><title>返回顶部</title><script src"js/jquery-2.0.3.min.js"…...

Clonable接口和拷贝
Hello~小伙伴们!本篇学习Clonable接口与深拷贝,一起往下看吧~(画图水平有限,两张图,,我真的画了巨久,求路过的朋友来个3连~阿阿阿~~~) 目录 1、Clonable接口概念 2、拷贝 2、1浅拷贝 2、2深拷贝 1、Clon…...

关于小蛋の编程和小蛋编程为同一作者的说明
小蛋の编程和小蛋编程的作品为同一人制作,因前者为父母的手机号进行注册,现用本人手机号注册了新账号小蛋编程,后续文章将在新账号小蛋编程上进行刊登,同时在小蛋编程上对原账号文章进行转载。此账号不再发布帖子,请大…...
大数据平台之Spark
Apache Spark 是一个开源的分布式计算系统,主要用于大规模数据处理和分析。它由UC Berkeley AMPLab开发,并由Apache Software Foundation维护。Spark旨在提供比Hadoop MapReduce更快的处理速度和更丰富的功能,特别是在处理迭代算法和交互式数…...

地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...

GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...