当前位置: 首页 > news >正文

SpringBoot 处理 @KafkaListener 消息

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");this.listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating = this.listener;while (delegating instanceof DelegatingMessageListener) {delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();}listenerType = ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {this.consumerThread = Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager != null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count = 0;this.last = System.currentTimeMillis();if (isRunning() && this.definedPartitions != null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error("Failed to set initial offsets", e);}}long lastReceive = System.currentTimeMillis();long lastAlertAt = lastReceive;while (isRunning()) {try {if (!this.autoCommit && !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused && isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused = true;if (this.logger.isDebugEnabled()) {this.logger.debug("Paused consumption from: " + this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll = System.currentTimeMillis();if (this.consumerPaused && !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug("Resuming consumption from: " + this.consumer.paused());}Set<TopicPartition> paused = this.consumer.paused();this.consumer.resume(paused);this.consumerPaused = false;publishConsumerResumedEvent(paused);}if (records != null && this.logger.isDebugEnabled()) {this.logger.debug("Received: " + records.count() + " records");if (records.count() > 0 && this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p -> records.records(p).stream())// map to same format as send metadata toString().map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));}}if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {lastReceive = System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() != null) {long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt = now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, we're stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager == null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info("Consumer stopped");}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties);}else {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<return factory;
}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;}
}// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713

相关文章:

SpringBoot 处理 @KafkaListener 消息

消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息&#xff0c;并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理&#xff0c;相当于一个消费者&#xff1b; 看看其整体代码结构&#xff1a; 可以发现其入口方法为doStart(),…...

Spring Boot-API版本控制问题

在现代软件开发中&#xff0c;API&#xff08;应用程序接口&#xff09;版本控制是一项至关重要的技术。随着应用的不断迭代&#xff0c;API 的改动不可避免&#xff0c;如何在引入新版本的同时保证向后兼容&#xff0c;避免对现有用户的影响&#xff0c;是每个开发者需要考虑的…...

Git 提取和拉取的区别在哪

1. 提取&#xff08;Fetch&#xff09; 操作说明&#xff1a;Fetch 操作会从远程仓库下载最新的提交、分支信息等&#xff0c;但不会将这些更改合并到你当前的分支中。它只是将远程仓库的更新信息存储在本地&#xff0c;并不会自动修改你当前的工作区。 使用场景&#xff1a; …...

【数据结构与算法 | 每日一题 | 力扣篇】力扣2390, 2848

1. 力扣2390&#xff1a;从字符串中删除星号 1.1 题目&#xff1a; 给你一个包含若干星号 * 的字符串 s 。 在一步操作中&#xff0c;你可以&#xff1a; 选中 s 中的一个星号。移除星号 左侧 最近的那个 非星号 字符&#xff0c;并移除该星号自身。 返回移除 所有 星号之…...

破解信息架构实施的密码:常见挑战与最佳解决方案全指南

信息架构的成功实施是企业数字化转型的关键步骤&#xff0c;但在实际操作中&#xff0c;企业往往会遇到各种复杂的挑战。这些挑战包括 技术整合的难度、数据管理的复杂性、合规性要求的变化 以及 资源限制 等。《信息架构&#xff1a;商业智能&分析与元数据管理参考模型》为…...

CodeChef Starters 151 (Div.2) A~D

codechef是真敢给分&#xff0c;上把刚注册&#xff0c;这把就div2了&#xff0c;再加上一周没打过还是有点不适应的&#xff0c;好在最后还是能够顺利上分 今天的封面是P3R的设置菜单 我抠出来做我自己的游戏主页了&#xff08; A - Convert string 题意 在01串里面可以翻转…...

Redis学习——数据不一致怎么办?更新缓存失败了又怎么办?

文章目录 引言正文读写缓存的数据一致性只读缓存的数据一致性删除和修改数据不一致问题操作执行失败导致数据不一致解决办法 多线程访问导致数据不一致问题总结 总结参考信息 引言 最近面试快手的时候被问到了缓存不一致怎么解决&#xff1f;一开始还是很懵的&#xff0c;因为…...

跨境电商代购新纪元:一键解锁全球好物,系统流程全揭秘

添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 在全球化日益加深的今天&#xff0c;跨境电商代购成为了连接消费者与世界各地优质商品的桥梁。本文将在CSDN平台上&#xff0c;深入剖析跨境电商代购系统的功能流程&#xff0c;带您一窥其背后的技术奥秘与…...

Mac 上终端使用 MySql 记录

文章目录 下载安装终端进入 MySql常用操作查看数据库选择一个数据库查看当前选择的数据库Navcat 打开提示报错参考文章 下载安装 先下载社区版的 MySql 安装的过程需要设置 root 的密码&#xff0c;这个是要进入数据库所设定的&#xff0c;所以要记住 终端进入 MySql 首先输…...

461. 汉明距离

一&#xff1a;题目&#xff1a; 两个整数之间的 汉明距离 指的是这两个数字对应二进制位不同的位置的数目。 给你两个整数 x 和 y&#xff0c;计算并返回它们之间的汉明距离。 示例 1&#xff1a; 输入&#xff1a;x 1, y 4 输出&#xff1a;2 解释&#xff1a; 1 (0 0…...

开发指南061-nexus权限管理

平台后台服务的核心是组件&#xff0c;管理组件的软件有&#xff1a; Apache的Archiva、JFrog的Artifactory、Sonatype的Nexus。 本平台选择nexus。nexus的权限模型是用户-角色-权限体系&#xff1a;通过组合权限定义角色&#xff0c;通过给用户赋角色来赋权限。有关nexus的权…...

Qt 弹出菜单右键菜单 QMenu 设置不同颜色的子项

概述 在Qt中&#xff0c;可以使用样式表&#xff08;StyleSheet&#xff09;来自定义 QMenu 的外观&#xff0c;包括其子项&#xff08;如菜单项QAction&#xff09;的颜色。但是&#xff0c;这通常可以设置 QMenu 的整体样式&#xff0c;而不能单独设置某个子项的颜色。不过&…...

Git换行符自动转换参数core.autocrlf的用法

core.autocrlf 是 Git 中用于控制换行符自动转换的配置选项。它有以下几个可能的值&#xff1a; 1. true 作用&#xff1a;在 checkin 时将 CRLF 转换为 LF&#xff0c;在 checkout 时将 LF 转换为 CRLF。适用场景&#xff1a;适用于 Windows 用户&#xff0c;希望在本地文件…...

C语言的结构体类型

在我们使用C语言进行编写代码时&#xff0c;常常会使用已经给定的类型来创建变量&#xff0c;比如int型&#xff0c;char型&#xff0c;double型等&#xff0c;而当我们想创建一些较为复杂的东西时&#xff0c;单单用一个类型变量是没办法做到的&#xff0c;比如我们想创建一个…...

illustrator 收集字体插件VBscript

这是早些年从俄罗斯网站上看到的一个收集字体插件,语言是用VBscript写的,能用,但个别字体不能收集完成,现在Adobe也在illustrator中加入了收集字体打包功能,所以这个也很少用啦。 使用方法: 下好插件,或把下面的代码存入到本地侯后缀名改为.vbs,然后把.ai文件往.vbs文…...

【LLM多模态】文生视频评测基准VBench

note VBench的16个维度自动化评估指标代码实践&#xff08;待完成&#xff09;16个维度的prompt举例人类偏好标注&#xff1a;计算VBench评估结果与人类偏好之间的相关性、用于DPO微调 文章目录 note一、相关背景二、VBench评测基准概述&#xff1a;论文如何解决这个问题&…...

通过覆写 url_for 将 flask 应用部署到子目录下

0. 缘起 最近用 flask 写了一个 web 应用&#xff0c;需要部署到服务器上。而服务器主域名已经被使用了&#xff0c;只能给主域名加个子目录进行部署&#xff0c;比如主域名 example.org &#xff0c;我需要在 example.org/flask 下部署。这时 flask 应用里的内部连接们就出现…...

攻防世界---->埃尔隆德32

做题笔记。 下载 查壳。 32ida 打开。 发现就一个判断。 跟进看看。 // 首次a20 int __cdecl sub_8048414(_BYTE *a1, int a2) {int result; // eaxswitch ( a2 ){case 0:if ( *a1 105 )goto LABEL_19;result 0;break;case 1:if ( *a1 101 ) // e…...

redis短信登录模型

基于Session实现登录 &#xff0c;...

【React】React18.2.0核心源码解读

前言 本文使用 React18.2.0 的源码&#xff0c;如果想回退到某一版本执行git checkout tags/v18.2.0即可。如果打开源码发现js文件报ts类型错误请看本人另一篇文章&#xff1a;VsCode查看React源码全是类型报错如何解决。 阅读源码的过程&#xff1a; 下载源码 观察 package…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

Mac软件卸载指南,简单易懂!

刚和Adobe分手&#xff0c;它却总在Library里给你写"回忆录"&#xff1f;卸载的Final Cut Pro像电子幽灵般阴魂不散&#xff1f;总是会有残留文件&#xff0c;别慌&#xff01;这份Mac软件卸载指南&#xff0c;将用最硬核的方式教你"数字分手术"&#xff0…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

如何更改默认 Crontab 编辑器 ?

在 Linux 领域中&#xff0c;crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用&#xff0c;用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益&#xff0c;允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案

在大数据时代&#xff0c;海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构&#xff0c;在处理大规模数据抓取任务时展现出强大的能力。然而&#xff0c;随着业务规模的不断扩大和数据抓取需求的日益复杂&#xff0c;传统…...