当前位置: 首页 > news >正文

4.3、Flink任务怎样读取Kafka中的数据

目录

1、添加pom依赖

2、API使用说明

3、这是一个完整的入门案例

4、Kafka消息应该如何解析

4.1、只获取Kafka消息的value部分

​4.2、获取完整Kafka消息(key、value、Metadata)

4.3、自定义Kafka消息解析器

5、起始消费位点应该如何设置

​5.1、earliest()

5.2、latest()

5.3、timestamp()

6、Kafka分区扩容了,该怎么办 —— 动态分区检查

7、在加载KafkaSource时提取事件时间&添加水位线

7.1、使用内置的单调递增的水位线生成器 + kafka timestamp 为事件时间

7.2、使用内置的单调递增的水位线生成器 + kafka 消息中的 ID字段 为事件时间


1、添加pom依赖

我们可以使用Flink官方提供连接Kafka的工具flink-connector-kafka

该工具实现了一个消费者FlinkKafkaConsumer,可以用它来读取kafka的数据

如果想使用这个通用的Kafka连接工具,需要引入jar依赖

<!-- 引入 kafka连接器依赖-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version>
</dependency>

2、API使用说明

官网链接:Apache Kafka 连接器

语法说明: 

// 1.初始化 KafkaSource 实例
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers)                           // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)                     .setTopics("input-topic")                               // 必填:指定要消费的topic.setGroupId("my-group")                                 // 必填:指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema())     // 必填:指定反序列化器(用来解析kafka消息数据,转换为flink数据类型).setStartingOffsets(OffsetsInitializer.earliest())      // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest()).build(); // 2.通过 fromSource + KafkaSource 获取 DataStreamSource
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

3、这是一个完整的入门案例

开发语言:java1.8

flink版本:flink1.17.0

public class ReadKafka {public static void main(String[] args) throws Exception {newAPI();}public static void newAPI() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics("20230810")                              // 必填:指定要消费的topic.setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填:指定反序列化器(用来解析kafka消息数据).setStartingOffsets(OffsetsInitializer.earliest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest()).build();env.fromSource(source,WatermarkStrategy.noWatermarks(),"Kafka Source").print();// 3.触发程序执行env.execute();}
}

4、Kafka消息应该如何解析

代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析

反序列化器的功能:

                将Kafka ConsumerRecords转换为Flink处理的数据类型(Java/Scala对象)

反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器类型)) 指定

下面介绍两种常用Kafka消息解析器:

        KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) :

                 1、返回完整的Kafka消息,将JSON字符串反序列化为ObjectNode对象

                 2、可以选择是否返回Kafak消息的Metadata信息,true-返回,false-不返回

        KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) :

                1、只返回Kafka消息中的value部分 

4.1、只获取Kafka消息的value部分

4.2、获取完整Kafka消息(key、value、Metadata)

kafak消息格式:

                key =  {"nation":"蜀国"}

                value = {"ID":整数}

    public static void ParseMessageJSONKeyValue() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics("9527")                                  // 必填:指定要消费的topic.setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)// 必填:指定反序列化器(将kafak消息解析为ObjectNode,json对象).setDeserializer(KafkaRecordDeserializationSchema.of(// includeMetadata = (true:返回Kafak元数据信息 false:不返回)new JSONKeyValueDeserializationSchema(true))).setStartingOffsets(OffsetsInitializer.latest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest()).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print();// 3.触发程序执行env.execute();}

运行结果:    

常见报错: 

Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = 9527, partition = 0, leaderEpoch = 0, offset = 1064, CreateTime = 1691668775938, serialized key size = 4, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5e9eaab8, value = [B@67390400).at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)... 14 more
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxx': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')at [Source: (byte[])"xxxx"; line: 1, column: 5]

报错原因:

          出现这个报错,一般是使用flink读取fafka时,使用JSONKeyValueDeserializationSchema

来解析消息时,kafka消息中的key 或者 value 内容不符合json格式而造成的解析错误

例如下面这个格式,就会造成解析错误  key=1000,value=你好

那应该怎么解决呢?

        1、如果有权限修改Kafka消息格式,可以将Kafka消息key&value内容修改为Json格式

        2、如果没有权限修改Kafka消息格式(比如线上环境,修改比较困难),可以重新实现

       JSONKeyValueDeserializationSchema类,根据所需格式来解析Kafka消息(可以参考源码)

4.3、自定义Kafka消息解析器

        生产中对Kafka消息及解析的格式总是各种各样的,当flink预定义的解析器满足不了业务需求时,可以通过自定义kafka消息解析器来完成业务的支持

例如,当使用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时,只返回了 offset、topic、partition 三个字段信息,现在需要`kafka生产者写入数据时的timestamp`,就可以通过自定义kafka消息解析器来完成

代码示例:

// TODO 自定义Kafka消息解析器,在 metadata 中增加 timestamp字段
public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode>{private static final long serialVersionUID = 1509391548173891955L;private final boolean includeMetadata;private ObjectMapper mapper;public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {this.includeMetadata = includeMetadata;}@Overridepublic void open(DeserializationSchema.InitializationContext context) throws Exception {mapper = JacksonMapperFactory.createObjectMapper();}@Overridepublic ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {ObjectNode node = mapper.createObjectNode();if (record.key() != null) {node.set("key", mapper.readValue(record.key(), JsonNode.class));}if (record.value() != null) {node.set("value", mapper.readValue(record.value(), JsonNode.class));}if (includeMetadata) {node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition())// 添加 timestamp 字段.put("timestamp",record.timestamp());}return node;}@Overridepublic boolean isEndOfStream(ObjectNode nextElement) {return false;}@Overridepublic TypeInformation<ObjectNode> getProducedType() {return getForClass(ObjectNode.class);}}

运行结果:


5、起始消费位点应该如何设置

起始消费位点说明:

        起始消费位点是指 启动flink任务时,应该从哪个位置开始读取Kafka的消息   

        下面介绍下常用的三个设置:    

                OffsetsInitializer.earliest()  :

                        从最早位点开始消

                        这里的最早指的是Kafka消息保存的时长(默认为7天,生成环境各公司略有不同)

                        该这设置为默认设置,当不指定OffsetsInitializer.xxx时,默认为earliest() 

                OffsetsInitializer.latest()   :

                        从最末尾位点开始消费

                        这里的最末尾指的是flink任务启动时间点之后生产的消息

                OffsetsInitializer.timestamp(时间戳) :

                        从时间戳大于等于指定时间戳(毫秒)的数据开始消费

下面用案例说明下,三种设置的效果,kafak生成10条数据,如下:

5.1、earliest()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092").setTopics("23230811").setGroupId("FlinkConsumer")// 将kafka消息解析为Json对象,并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点:从最早位置开始消费(该设置为默认设置).setStartingOffsets(OffsetsInitializer.earliest()).build();

运行结果:

5.2、latest()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092").setTopics("23230811").setGroupId("FlinkConsumer")// 将kafka消息解析为Json对象,并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点:从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();

运行结果:

5.3、timestamp()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092").setTopics("23230811").setGroupId("FlinkConsumer")// 将kafka消息解析为Json对象,并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点:从指定时间戳后开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L)).build();

运行结果:


6、Kafka分区扩容了,该怎么办 —— 动态分区检查

        在flink1.13的时候,如果Kafka分区扩容了,只有通过重启flink任务,才能消费到新增分区的数据,小编就曾遇到过上游业务部门的kafka分区扩容了,并没有通知下游使用方,导致实时指标异常,甚至丢失了数据。

        在flink1.17的时候,可以通过`开启动态分区检查`,来实现不用重启flink任务,就能消费到新增分区的数据

开启分区检查:(默认不开启)

KafkaSource.builder().setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092").setTopics("9527").setGroupId("FlinkConsumer")// 将kafka消息解析为Json对象,并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点:从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest())// 开启动态分区检查(默认不开启).setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区.build();

7、在加载KafkaSource时提取事件时间&添加水位线

可以在 fromSource(source,WatermarkStrategy,sourceName) 时,提取事件时间和制定水位线生成策略

注意:当不指定事件时间提取器时,Kafka Source 使用 Kafka 消息中的时间戳作为事件时间

7.1、使用内置的单调递增的水位线生成器 + kafka timestamp 为事件时间

代码示例:

    // 在读取Kafka消息时,提取事件时间&插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092").setTopics("9527").setGroupId("FlinkConsumer")// 将kafka消息解析为Json对象,并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点:从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器(默认使用 kafka的timestamp作为事件时间)WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source")// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunction<ObjectNode, String>() {@Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {// 当前处理时间long currentProcessingTime = ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark = ctx.timerService().currentWatermark();StringBuffer record = new StringBuffer();record.append("========================================\n");record.append(kafkaJson + "\n");record.append("currentProcessingTime:" + currentProcessingTime + "\n");record.append("currentWatermark:" + currentWatermark + "\n");record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}

运行结果:

7.2、使用内置的单调递增的水位线生成器 + kafka 消息中的 ID字段 为事件时间

代码示例:

    // 在读取Kafka消息时,提取事件时间&插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder().setBootstrapServers("worker01:9092").setTopics("9527").setGroupId("FlinkConsumer")// 将kafka消息解析为Json对象,并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点:从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器(使用 kafka消息中的ID字段作为事件时间)WatermarkStrategy.<ObjectNode>forMonotonousTimestamps()// 提取 Kafka消息中的 ID字段作为 事件时间.withTimestampAssigner((json, timestamp) -> Long.parseLong(json.get("value").get("ID").toString())),"Kafka Source")// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunction<ObjectNode, String>() {@Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {// 当前处理时间long currentProcessingTime = ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark = ctx.timerService().currentWatermark();StringBuffer record = new StringBuffer();record.append("========================================\n");record.append(kafkaJson + "\n");record.append("currentProcessingTime:" + currentProcessingTime + "\n");record.append("currentWatermark:" + currentWatermark + "\n");record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}

运行结果:

相关文章:

4.3、Flink任务怎样读取Kafka中的数据

目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、lat…...

C语言实例_和校验算法

一、算法介绍 和校验&#xff08;Checksum&#xff09;是一种简单的纠错算法&#xff0c;用于检测或验证数据传输或存储过程中的错误。它通过对数据进行计算并生成校验和&#xff0c;然后将校验和附加到数据中&#xff0c;在接收端再次计算校验和并进行比较&#xff0c;以确定…...

安全加密框架图——Oracle安全开发者

Oracle安全开发者 ACLs 设计 ACLs&#xff08;访问控制列表&#xff09;时&#xff0c;可以根据以下思路进行设计&#xff1a; 所有者文件权限&#xff1a;确定文件的所有者能够对文件执行哪些操作&#xff0c;如读取、写入、执行等。这可以根据文件的性质和拥有者的职责来决…...

Android databinding 被多次定义

一、报错&#xff1a; AndroidStudio运行代码时&#xff0c;编译器报 Type androidx.databinding.Bindable is defined multiple times...... 二、解决&#xff1a; 点击 Build -> Clean Project&#xff0c;关闭编译器再打开即可。 三、解决过程&#xff1a; 在使用Andro…...

云原生周刊:Kubernetes v1.28 新特性一览 | 2023.8.14

推荐一个 GitHub 仓库&#xff1a;Fast-Kubernetes。 Fast-Kubernetes 是一个涵盖了 Kubernetes 的实验室&#xff08;LABs&#xff09;的仓库。它提供了关于 Kubernetes 的各种主题和组件的详细内容&#xff0c;包括 Kubectl、Pod、Deployment、Service、ConfigMap、Volume、…...

机器学习之分类模型

机器学习之分类模型 概述分类模型逻辑回归最近邻分类朴素贝叶斯支持向量机决策树随机森林多层感知机基于集成学习的分类模型VotingBaggingStackingBlendingBoosting 概述 机器学习分类模型通过训练集进行学习&#xff0c;建立一个从输入空间 X X X到输出空间 Y Y Y&#xff08…...

学习Vue:创建第一个Vue实例

当您开始探索 Vue.js&#xff0c;第一步就是创建一个 Vue 实例。Vue 实例是 Vue.js 应用程序的核心构建块&#xff0c;它使您能够将数据与用户界面连接起来&#xff0c;实现动态交互。在本文中&#xff0c;我们将详细介绍如何创建您的第一个 Vue 实例。 步骤1&#xff1a;引入 …...

JavaFx基础学习【二】:Stage

一、介绍 窗口Stage为图中标绿部分&#xff1a; 实际为如下部分&#xff1a; 不同的操作系统表现的样式不同&#xff0c;以下都是以Windows操作系统为例&#xff0c;为了使大家更清楚Stage是那部分&#xff0c;直接看以下图&#xff0c;可能更清楚&#xff1a; 有点潦草&…...

C语言——动态内存函数(malloc、calloc、realloc、free)

了解动态内存函数 前言&#xff1a;一、malloc函数二、calloc函数三、realloc函数四、free函数 前言&#xff1a; 在C语言中&#xff0c;动态内存函数是块重要的知识点。以往&#xff0c;我们开辟空间都是固定得&#xff0c;数组编译结束后就不能继续给它开辟空间了&#xff0…...

Redis数据结构——Redis简单动态字符串SDS

定义 众所周知&#xff0c;Redis是由C语言写的。 对于字符串类型的数据存储&#xff0c;Redis并没有直接使用C语言中的字符串。 而是自己构建了一个结构体&#xff0c;叫做“简单动态字符串”&#xff0c;简称SDS&#xff0c;比C语言中的字符串更加灵活。 SDS的结构体是这样的…...

【计算机网络】TCP协议超详细讲解

文章目录 1. TCP简介2. TCP和UDP的区别3. TCP的报文格式4. 确认应答机制5. 超时重传6. 三次握手7. 为什么两次握手不行?8. 四次挥手9. 滑动窗口10. 流量控制11. 拥塞控制12. 延时应答13. 捎带应答14. 面向字节流15. TCP的连接异常处理 1. TCP简介 TCP协议广泛应用于可靠性要求…...

Salesforce特别元数据部署技巧

标准的picklist字段部署 <?xml version"1.0" encoding"UTF-8" standalone"yes"?> <Package xmlns"http://soap.sforce.com/2006/04/metadata"><types><members>Opportunity.StageName</members><…...

[前端系列第2弹]CSS入门教程:从零开始学习Web页面的样式和布局

在这篇文章中&#xff0c;我将介绍CSS的基本概念、语法、选择器、属性和值&#xff0c;以及如何使用它们来定义Web页面的外观和布局。还将给一些简单而实用的例子&#xff0c;可以跟着我一步一步地编写自己的CSS样式表。 目录 一、什么是CSS 二、CSS的语法 三、CSS的选择器 …...

非计算机科班如何丝滑转码?

转码&#xff0c;也就转行为程序员&#xff0c;已成为当今数字化时代的一种重要技能。随着科技的发展&#xff0c;越来越多的人开始意识到掌握编程技能的重要性&#xff0c;而非计算机科班出身的朋友们&#xff0c;想要丝滑转码&#xff0c;也许可以从以下几个方面入手。 一、明…...

亿发创新中医药信息化解决方案,自动化煎煮+调剂,打造智能中药房

传统中医药行业逐步复兴&#xff0c;同时互联网科技和人工智能等信息科技助力中医药行业逐步实现数字化转型。利用互联网、物联网、大数据等科技&#xff0c;实现现代科学与传统中医药的结合&#xff0c;提供智能配方颗粒调配系统、中药自动化调剂系统、中药煎配智能管理系统、…...

Vulnhub: MoneyBox: 1靶机

kali&#xff1a;192.168.111.111 靶机&#xff1a;192.168.111.194 信息收集 端口扫描 nmap -A -sC -v -sV -T5 -p- --scripthttp-enum 192.168.111.194 ftp匿名登录发现trytofind.jpg 目录爆破发现blogs目录 gobuster dir -u http://192.168.111.194 -w /usr/share/word…...

[国产MCU]-BL602开发实例-LCD1602 I2C驱动

LCD1602 I2C驱动 文章目录 LCD1602 I2C驱动1、LCD1602/LCD2004介绍2、硬件准备3、驱动实现本文将详细介绍如何在K210中驱动LCD1602/LCD2004 I2C显示屏。 1、LCD1602/LCD2004介绍 LCD1602液晶显示器是广泛使用的一种字符型液晶显示模块。它是由字符型液晶显示屏(LCD)、控制驱…...

AI 绘画Stable Diffusion 研究(七) 一文读懂 Stable Diffusion 工作原理

大家好&#xff0c;我是风雨无阻。 本文适合人群&#xff1a; 想要了解AI绘图基本原理的朋友。 对Stable Diffusion AI绘图感兴趣的朋友。 本期内容&#xff1a; Stable Diffusion 能做什么 什么是扩散模型 扩散模型实现原理 Stable Diffusion 潜扩散模型 Stable Diffu…...

URLSearchParams:JavaScript中的URL查询参数处理工具

文章目录 导言&#xff1a;一、URLSearchParams的来历二、URLSearchParams的作用三、URLSearchParams的方法和属性四、使用示例五、注意事项六、结论参考资料 导言&#xff1a; 在Web开发中&#xff0c;处理URL查询参数是一项常见的任务。为了简化这一过程&#xff0c;JavaScr…...

1.4 数据库管理与优化

数据库管理与优化 文章目录 数据库管理与优化1. 数据库概述1.1 数据库的定义和作用1.2 数据库管理系统&#xff08;DBMS&#xff09; 2. 数据库模型2.1 关系型数据库**2.2 非关系型数据库 3. 数据库设计3.1 数据库设计原则3.2 数据库设计步骤 4. 数据库优化4.1 数据库性能优化4…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...