MQ:kafka-消费者的三种语义
文章目录
- 前言
- (一) 创建topic
- (二) 生产者
- (三)消费者
- 1. At-most-once Kafka Consumer
- 2. At-least-once kafka consumer
- 3. 使用subscribe实现Exactly-once
- 4. 使用assign实现Exactly-once
前言
本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等。
(一) 创建topic
bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1
(二) 生产者
public class ProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, String> producer = createProducer();sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20);}private static Producer<String, String> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size", 10);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer(props);}private static void sendMessages(Producer<String, String> producer) {String topic = "normal-topic";int partition = 0;long record = 1;for (int i = 1; i <= 10; i++) {producer.send(new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));}}
}
(三)消费者
消费者注册到kafka有多种方式:
subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的再平衡。
assign:这种方式注册的消费者不会进行rebalance。
上面两种方式都是可以实现,三种消费语义的。具体API的使用请看下文。
1. At-most-once Kafka Consumer
做多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1). enable.auto.commit设置为true。
2). auto.commit.interval.ms设置为一个较低的时间范围。
3). consumer.commitSync()不要调用该方法。
由于上面的配置,就可以使得kafka有线程负责按照指定间隔提交offset。
但是这种方式会使得kafka消费者有两种消费语义:
a.最多一次语义->at-most-once
消费者的offset已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的offset处消费,导致上次在处理的消息部分丢失。
b. 最少一次消费语义->at-least-once
消费者已经处理完了,但是offset还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。
代码如下:
public class AtMostOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AtMostOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Auto commit interval, kafka would commit offset at this interval.props.put("auto.commit.interval.ms", "101");// This is how to control number of records being read in each pollprops.put("max.partition.fetch.bytes", "135");// Set this if you want to always read from beginning.// props.put("auto.offset.reset", "earliest");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the message.Thread.sleep(20);}
}
2. At-least-once kafka consumer
实现最少一次消费语义的消费者也很简单。
1). 设置enable.auto.commit为false
2). 消息处理完之后手动调用consumer.commitSync()
这种方式就是要手动在处理完该次poll得到消息之后,调用offset异步提交函数consumer.commitSync()。建议是消费者内部实现密等,来避免消费者重复处理消息进而得到重复结果。最多一次发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。
代码如下:
public class AtLeastOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Make Auto commit interval to a big number so that auto commit does not happen,// we are going to control the offset commit via consumer.commitSync(); after processing // message.props.put("auto.commit.interval.ms", "999999999999");// This is how to control number of messages being read in each pollprops.put("max.partition.fetch.bytes", "135");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();// Below call is important to control the offset commit. Do this call after you// finish processing the business process.consumer.commitSync();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the record.Thread.sleep(20);}
}
3. 使用subscribe实现Exactly-once
使用subscribe实现Exactly-once 很简单,具体思路如下:
1). 将enable.auto.commit设置为false。
2). 不调用consumer.commitSync()。
3). 使用subcribe定于topic。
4). 实现一个ConsumerRebalanceListener,在该listener内部执行
consumer.seek(topicPartition,offset),从指定的topic/partition的offset处启动。
5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。
6). 实现密等,作为保护层。
代码如下:
public class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage.offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}}
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));}}
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/
public class OffsetManager {private String storagePrefix;public OffsetManager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** Overwrite the offset for the topic in an external storage.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;}
}
4. 使用assign实现Exactly-once
使用assign实现Exactly-once 也很简单,具体思路如下:
1). 将enable.auto.commit设置为false。
2). 不调用consumer.commitSync()。
3). 调用assign注册kafka消费者到kafka
4). 初次启动的时候,调用consumer.seek(topicPartition,offset)来指定offset。
5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。
6). 实现密等,作为保护层。
代码如下:
public class ExactlyOnceStaticConsumer {private static OffsetManager offsetManager = new OffsetManager("storage1");public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ExactlyOnceStaticConsumer ...");readMessages();}private static void readMessages() throws InterruptedException, IOException {KafkaConsumer<String, String> consumer = createConsumer();String topic = "normal-topic";int partition = 1;TopicPartition topicPartition =registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.long offset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition.consumer.seek(topicPartition, offset);processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg2";props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}/*** Manually listens for specific topic partition. But, if you are looking for example of how to * dynamically listens to partition and want to manually control offset then see* ExactlyOnceDynamicConsumer.java*/private static TopicPartition registerConsumerToSpecificPartition(KafkaConsumer<String, String> consumer, String topic, int partition) {TopicPartition topicPartition = new TopicPartition(topic, partition);List<TopicPartition> partitions = Arrays.asList(topicPartition);consumer.assign(partitions);return topicPartition;}/*** Process data and store offset in external store. Best practice is to do these operations* atomically. */private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}}
}
相关文章:
MQ:kafka-消费者的三种语义
文章目录 前言(一) 创建topic(二) 生产者(三)消费者1. At-most-once Kafka Consumer2. At-least-once kafka consumer3. 使用subscribe实现Exactly-once4. 使用assign实现Exactly-once 前言 本文主要是以kafka 09的client为例子,详解kafka c…...
中国1km分辨率SSP119情景(SSP119、SSP245 SSP585),模式逐月降水量数据集(2021-2100)
目录 简介 摘要 代码 引用 网址推荐 知识星球 机器学习 干旱监测平台 中国1km分辨率SSP119情景EC-Earth3模式逐月降水量数据集(2021-2100) 简介 该数据集为中国多情景多模式逐月降水量数据,空间分辨率为0.0083333(约1km),时间为2021年1月-2100年…...
21天掌握javaweb-->第8天:前后端分离架构与Axios请求
前后端分离架构概念 前后端分离架构是一种现代Web应用开发模式,其中前端和后端分别独立开发和部署,通过API进行数据交互。这种架构使得前端专注于用户界面和用户体验,而后端则专注于业务逻辑和数据处理。 优势 开发效率高:前后端可以并行开发,减少了开发时间。技术栈灵活…...
基于阻塞队列的生产者消费者模型动画演示
一个基于阻塞队列的生产者消费者模型的动画演示: 这是打包好的程序。程序是用 QT 写的。 通过网盘分享的文件:CP模型.7z 链接: https://pan.baidu.com/s/1YjC7YiSqHGqdr6bbffaDWg?pwde6g5 提取码: e6g5 CP模型...
DHCP和BOOTP选项及DHCP协议操作详解
DHCP和BOOTP选项及DHCP协议操作详解 DHCP与BOOTP简介 1. BOOTP(Bootstrap Protocol) 功能:提供静态配置的IP分配。用途:在早期用于无盘工作站启动时获取IP地址和基本配置。缺点:只能提供静态IP配置,无法动…...
数据结构--链表和单链表详解及实现
一.前言 数据结构思维导图如下,灰色标记的是之前讲过的,本文将带你走近单链表(红色标记部分),希望大家有所收获🌹🌹 二.链表的定义和概念 在讲单链表之前,我们先学习一下链表 2.1 链表的定义 链表是一种…...
vue3基础知识
书接上文,这篇继续来学习vue3的核心语法,可以先看上一篇再来看这篇效果更好。 1. computed computed 用于创建 计算属性,即基于其他响应式数据的值动态计算并缓存的属性。它的主要作用是优化性能和提高代码的可维护性,避免不必要…...
【Linux系统】Ubuntu 缓冲区机制
在Ubuntu中,和其他操作系统有个不一样的机制:缓冲区。这篇文章是对与缓冲区的详细介绍。 在 Ubuntu 中(以及其他基于 Linux 的操作系统),缓冲区(Buffer)是内核用于优化 I/O 操作的重要机制。它…...
ChatGPT 最新推出的 Pro 订阅计划,具备哪些能力 ?
OpenAI 最近推出了 ChatGPT Pro,这是一个每月收费 200 美元的高级订阅计划,旨在为用户提供对 OpenAI 最先进模型和功能的高级访问。 以下是 ChatGPT Pro 的主要功能和能力: 高级模型访问: o1 模型:包括 o1 和 o1 Pro…...
数据结构理论
内容来源青岛大学数据结构与算法课程,链接:数据结构与算法基础(青岛大学-王卓)_哔哩哔哩_bilibili 绪论 数据结构概述 数据结构和算法的定义:我们如何把现实中大量而复杂的问题以特定的数据类型和特定的存储结构保存…...
es 3期 第14节-全文文本分词查询
#### 1.Elasticsearch是数据库,不是普通的Java应用程序,传统数据库需要的硬件资源同样需要,提升性能最有效的就是升级硬件。 #### 2.Elasticsearch是文档型数据库,不是关系型数据库,不具备严格的ACID事务特性ÿ…...
六安市第二届网络安全大赛复现
misc 听说你也喜欢俄罗斯方块? ppt拼接之后 缺三个角补上 flag{qfnh_wergh_wqef} 流量分析 流量包分离出来一个压缩包 出来一张图片 黑色代表0白色代表1 101010 1000 rab 反的压缩包 转一下 密码:拾叁拾陆叁拾贰陆拾肆 密文:4p4n5758…...
Sarcomere仿人灵巧手ARTUS,20个自由度拓宽机器人作业边界
Sarcomere Dynamics 是一家深度技术先驱,通过开发和商业化仿人机械来改变机器人行业。专注于为科研人员,系统集成商和制造商提供更实惠、更轻便且更灵活的末端执行器替代品。凭借创新的致动器技术,创造了一款紧凑、轻便且非常坚固的机械手Art…...
Django drf 基于serializers 快速使用
1. 安装: pip install djangorestframework 2. 添加rest_framework到您的INSTALLED_APPS设置。 settings.pyINSTALLED_APPS [...rest_framework, ] 3. 定义模型 models.pyfrom django.db import modelsclass BookModel(models.Model):name models.CharField(max_length64)…...
pycharm集成环境中关于安装sklearn库报错问题分析及解决
在输入pip install sklearn后,出现如下提示: pip install sklearn Collecting sklearn Using cached sklearn-0.0.post12.tar.gz (2.6 kB) Installing build dependencies ... done Getting requirements to build wheel ... error error: subprocess-…...
AI - 浅聊一下基于LangChain的AI Agent
AI - 浅聊一下基于LangChain的AI Agent 大家好,今天我们来聊聊一个很有意思的主题: AI Agent ,就是目前非常流行的所谓的AI智能体。AI的发展日新月异,都2024年末了,如果此时小伙伴们对这个非常火的概念还不清楚的话&a…...
《【Linux】深入理解进程管理与 fork 系统调用的实现原理》
一、引言 在 Linux 操作系统中,进程管理是核心功能之一。进程是操作系统进行资源分配和调度的基本单位。理解进程管理的原理以及 fork 系统调用的实现对于深入掌握 Linux 系统的运行机制至关重要。本文将深入探讨 Linux 中的进程管理以及 fork 系统调用的实现原理&a…...
docker-compose部署skywalking 8.1.0
一、下载镜像 #注意 skywalking-oap-server和skywalking java agent版本强关联,版本需要保持一致性 docker pull elasticsearch:7.9.0 docker pull apache/skywalking-oap-server:8.1.0-es7 docker pull apache/skywalking-ui:8.1.0二、部署文件docker-compose.yam…...
AI 总结的的 AI 学习路线
一、入门阶段:数学基础与编程语言 数学基础 线性代数 当年白纸黑字推演, 都是泪啊,草稿本都用了一卷。 学习向量、矩阵的基本概念,包括向量的加法、减法、点积和叉积,矩阵的乘法、转置等运算。例如,在计算…...
离散傅里叶级数(DFS)详解
1. 引言 离散傅里叶级数(Discrete Fourier Series, DFS)是信号处理领域中一项基础且重要的数学工具,用于分析和处理周期性的离散信号。它通过将离散时间信号表示为一组正弦和余弦的和,从而使得信号在频域上得到更清晰的描述。与连…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
