Java多线程 - 利用Callable或CompletableFuture实现多线程异步任务执行
文章目录
- 1. Callable接口源码
- 2. Future接口的源码
- 3. RunnableFuture接口和FutureTask实现类
- 4. 利用线程池和Callable接口实现异步执行任务
- 5. 利用CompleteFutable实现多线程异步任务执行
1. Callable接口源码
@FunctionalInterface
public interface Callable<V> {// 这个call()方法有返回值,且声明了受检异常,可以直接抛出Exception异常V call() throws Exception;
}
2. Future接口的源码
public interface Future<V> {// 取消异步执行中的任务boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();// 判断异步任务是否执行成功boolean isDone();// 获取异步任务完成后的结果V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口中的方法:
get():获取异步任务执行的结果。注意,这个方法的调用是阻塞性的。如果异步任务没有执行完成,异步结果获取线程(调用线程)会一直被阻塞,一直阻塞到异步任务执行完成,其异步结果返回给调用线程。
get(Long timeout,TimeUnit unit):设置时限,(调用线程)阻塞性地获取异步任务执行的结果。该方法的调用也是阻塞性的,但是结果获取线程(调用线程)会有一个阻塞时长限制,不会无限制地阻塞和等待,如果其阻塞时间超过设定的timeout时间,该方法将抛出异常,调用线程可捕获此异常。
boolean isDone():获取异步任务的执行状态。如果任务执行结束,就返回true。
boolean isCancelled():获取异步任务的取消状态。如果任务完成前被取消,就返回true。
boolean cancel(boolean mayInterruptRunning):取消异步任务的执行。
3. RunnableFuture接口和FutureTask实现类
如何使用Callable接口创建线程呢?
public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}
RunnableFuture只是一个接口,无法直接创建对象,如果需要创建对象,就需用到它的实现类——FutureTask。
public class FutureTask<V> implements RunnableFuture<V> {private Callable<V> callable;// 构造方法public FutureTask(Callable<V> callable) {if (callable == null) throw new NullPointerException();// callable实例属性需要在FutureTask实例构造时进行初始化this.callable = callable;this.state = NEW; }
}
RunnableFuture接口实现了2个目标:
(1) RunnableFuture通过继承Runnable接口,从而保证了其实例可以作为Thread线程实例的target目标;
(2) RunnableFuture通过继承Future接口,从而保证了可以获取未来的异步执行结果。
首先,通过实现Runnable接口的方式创建一个异步执行任务:
public class CallableTaskDemo implements Callable {// call()方法有返回值,并且可以抛出Exception异常@Overridepublic String call() throws Exception {System.out.println("实现Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回线程执行结果";}
}
方式1:通过Thread类创建线程执行异步任务
public class CallableDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建异步执任务实例Callable callable = new CallableTaskDemo();// 初始化callable实例属性FutureTask futureTask = new FutureTask(callable);// 创建线程执行异步任务Thread thread = new Thread(futureTask);thread.start();System.out.println("获取异步执行任务结果:"+futureTask.get());// 获取异步执行任务结果:返回线程执行结果}
}
方式2:通过线程池创建线程,并提交异步执行任务:
public class CallableDemo2 {// 通过线程池创建3个线程private static ExecutorService executorService = Executors.newFixedThreadPool(3);public static void main(String[] args) throws ExecutionException, InterruptedException {// 通过线程池的submit()方法提交异步执行任务,有返回结果Future submit = executorService.submit(new CallableTaskDemo());// 获取返回结果System.out.println(submit.get());}
}
对于Calleble来说,Future和FutureTask均可以用来获取任务执行结果,不过Future是个接口,FutureTask是Future的具体实现,而且FutureTask还间接实现了Runnable接口,也就是说FutureTask可以作为Runnable任务提交给线程池。
4. 利用线程池和Callable接口实现异步执行任务
1、定义3个线程执行任务:
public class FirstCallableTask implements Callable<String> {// call()方法有返回值,并且可以抛出Exception异常@Overridepublic String call() throws Exception {System.out.println("实现 First Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 First Callable 接口线程执行结果";}
}public class SecondCallableTask implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("实现 Second Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 Second Callable 接口线程执行结果";}
}public class ThirdCallableTask implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("实现 Third Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 Third Callable 接口线程执行结果";}
}
2、定义线程池提交线程任务:
@Service
@Slf4j
public class BeanLoadService implements ApplicationContextAware, ApplicationListener<ContextStoppedEvent> {private ApplicationContext applicationContext;// 定义一个线程池private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5,7,4,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNamePrefix(BeanLoadService.class.getSimpleName() + "-pool-%d").setDaemon(true).build(),new ThreadPoolExecutor.DiscardOldestPolicy());public Map<String,Integer> richInfo() {Callable<String> firstCallable = new FirstCallableTask();Callable<String> secondCallable = new SecondCallableTask();Callable<String> thirdCallable = new ThirdCallableTask();List<Callable<String>> callableList = new ArrayList<>();callableList.add(firstCallable);callableList.add(secondCallable);callableList.add(thirdCallable);try {List<Future<String>> futures = THREAD_POOL_EXECUTOR.invokeAll(callableList, 1, TimeUnit.MINUTES);for (Future<String> future : futures) {try {System.out.println(future.get());} catch (Exception e) {log.warn("incident delay data,future get warn", e);}}} catch (Exception e) {log.warn("incident delay data warn ", e);}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Overridepublic void onApplicationEvent(ContextStoppedEvent contextStoppedEvent) {try {THREAD_POOL_EXECUTOR.shutdown();} catch (Exception e) {log.error("停止线程池失败", e);}}
}
3、测试:
@SpringBootTest
@RunWith(SpringRunner.class)
public class BeanLoadServiceTest {@Autowiredprivate BeanLoadService beanLoadService;@Testpublic void test(){beanLoadService.richInfo();}
}
实现 First Callable接口来编写异步执行任务
实现 Second Callable接口来编写异步执行任务
实现 Third Callable接口来编写异步执行任务
返回 First Callable 接口线程执行结果
返回 Second Callable 接口线程执行结果
返回 Third Callable 接口线程执行结果
4、定义线程池提交线程任务(内部类):
@Service
@Slf4j
public class BeanLoadService implements ApplicationContextAware, ApplicationListener<ContextStoppedEvent> {private ApplicationContext applicationContext;// 定义一个线程池private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5,7,4,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNamePrefix(BeanLoadService.class.getSimpleName() + "-pool-%d").setDaemon(true).build(),new ThreadPoolExecutor.DiscardOldestPolicy());public Map<String,Integer> richInfo() {Callable<String> firstCallable = getFirstCallable();Callable<String> secondCallable = getSecondCallable();Callable<String> thirdCallable = getThirdCallable();List<Callable<String>> callableList = new ArrayList<>();callableList.add(firstCallable);callableList.add(secondCallable);callableList.add(thirdCallable);try {List<Future<String>> futures = THREAD_POOL_EXECUTOR.invokeAll(callableList, 1, TimeUnit.MINUTES);for (Future<String> future : futures) {try {System.out.println(future.get());} catch (Exception e) {log.warn("incident delay data,future get warn", e);}}} catch (Exception e) {log.warn("incident delay data warn ", e);}}private Callable<String> getThirdCallable() {Callable<String> callable = new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("实现 Third Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 Third Callable 接口线程执行结果";}};}private Callable<String> getSecondCallable() {Callable<String> callable = new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("实现 Second Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 Second Callable 接口线程执行结果";};};return callable;}private Callable<String> getFirstCallable() {Callable<String> callable = new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("实现 First Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 First Callable 接口线程执行结果";}};return callable;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Overridepublic void onApplicationEvent(ContextStoppedEvent contextStoppedEvent) {try {THREAD_POOL_EXECUTOR.shutdown();} catch (Exception e) {log.error("停止线程池失败", e);}}
}
5、定义线程池提交任务:(Java 8)
@Service
@Slf4j
public class BeanLoadService implements ApplicationContextAware, ApplicationListener<ContextStoppedEvent> {private ApplicationContext applicationContext;// 定义一个线程池private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5,7,4,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNamePrefix(BeanLoadService.class.getSimpleName() + "-pool-%d").setDaemon(true).build(),new ThreadPoolExecutor.DiscardOldestPolicy());public void richInfo() {Callable<String> firstCallable = getFirstCallable();Callable<String> secondCallable = getSecondCallable();Callable<String> thirdCallable = getThirdCallable();List<Callable<String>> callableList = new ArrayList<>();callableList.add(firstCallable);callableList.add(secondCallable);callableList.add(thirdCallable);try {List<Future<String>> futures = THREAD_POOL_EXECUTOR.invokeAll(callableList, 1, TimeUnit.MINUTES);for (Future<String> future : futures) {try {System.out.println(future.get());} catch (Exception e) {log.warn("incident delay data,future get warn", e);}}} catch (Exception e) {log.warn("incident delay data warn ", e);}}private Callable<String> getThirdCallable() {Callable<String> callable = ()->{System.out.println("实现 Third Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 Third Callable 接口线程执行结果";};return callable;}private Callable<String> getSecondCallable() {Callable<String> callable = ()-> {System.out.println("实现 Second Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 Second Callable 接口线程执行结果";};return callable;}private Callable<String> getFirstCallable() {Callable<String> callable = () -> {System.out.println("实现 First Callable接口来编写异步执行任务");Thread.sleep(1000);return "返回 First Callable 接口线程执行结果";};return callable;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Overridepublic void onApplicationEvent(ContextStoppedEvent contextStoppedEvent) {try {THREAD_POOL_EXECUTOR.shutdown();} catch (Exception e) {log.error("停止线程池失败", e);}}
}
5. 利用CompleteFutable实现多线程异步任务执行
在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture, 提供了非常强大的Future 的扩展功能, 可以帮助我们简化异步编程的复杂性, 提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口, 所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果, 但是这种方式不推荐使用。
CompletableFuture 和 FutureTask 同属于 Future 接口的实现类, 都可以获取线程的执行结果。
CompletableFuture 提供了四个静态方法来创建一个异步操作 :
runXxxx 都是没有返回结果的, supplyXxx 都是可以获取返回结果的,可以传入自定义的线程池, 否则就用默认的线程池 。
//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)//子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
@Service
@Slf4j
public class BeanLoadService {public void getInfo() {List<CompletableFuture<String>> futures = new ArrayList<>();CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "返回 First Callable 接口线程执行结果";});futures.add(firstFuture);CompletableFuture<String> secondFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "返回 Second Callable 接口线程执行结果";});futures.add(secondFuture);CompletableFuture<String> thirdFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "返回 Third Callable 接口线程执行结果";});futures.add(thirdFuture);try {futures.forEach(future -> {try {System.out.println(future.get());} catch (Exception e) {log.warn("incident delay data,future get warn", e);}});} catch (Exception e) {log.warn("incident delay data warn ", e);}}
}
相关文章:
Java多线程 - 利用Callable或CompletableFuture实现多线程异步任务执行
文章目录1. Callable接口源码2. Future接口的源码3. RunnableFuture接口和FutureTask实现类4. 利用线程池和Callable接口实现异步执行任务5. 利用CompleteFutable实现多线程异步任务执行1. Callable接口源码 FunctionalInterface public interface Callable<V> {// 这个…...
【ts + webpack】贪吃蛇小游戏
目录 一、项目搭建 1.1 初始化项目 二、项目界面布局 三、完成Food类 四、完成记分牌类 五、初步完成snake类 六、创建游戏控制器类 - 键盘事件 七、GameControl - 使蛇移动 八、蛇撞墙和吃食检测 一、项目搭建 1.1 初始化项目 1.使用init命令生成package.json文件 …...
传统巨头生“变”,中国毫米波雷达市场战火再升级
进入2023年,中国车载毫米波雷达市场战火明显升级。 一方面,愈演愈烈的份额抢夺战不仅仅存在于几大传统巨头之间,也快速转移到与国产供应商之间;随着部分外资巨头的本土化战略深入落地,同时对国产供应商造成了压力。 …...
26岁曾月薪15K,现已失业3个月,我依然没有拿到offer......
我做测试5年,一线城市薪水拿到15K,中间还修了一个专升本,这个年限不说资深肯定也是配得上经验丰富的。今年行情不好人尽皆知,但我还是对我的薪水不是很满意,于是打算出去面试,希望可以搏一个高薪。 但真到面…...
华为OD机试 - 打印文件 | 机试题算法思路 【2023】
最近更新的博客 华为OD机试 - 简易压缩算法(Python) | 机试题算法思路 【2023】 华为OD机试题 - 获取最大软件版本号(JavaScript) 华为OD机试 - 猜字谜(Python) | 机试题+算法思路 【2023】 华为OD机试 - 删除指定目录(Python) | 机试题算法思路 【2023】 华为OD机试 …...
【前端】浏览器的渲染流程(完整)
本文主要包含以下内容:浏览器渲染整体流程解析 HTML样式计算布局分层生成绘制指令分块光栅化绘制常见面试题浏览器渲染整体流程浏览器,作为用户浏览网页最基本的一个入口,我们似乎认为在地址栏输入 URL 后网页自动就出来了。殊不知在用户输入…...
华为OD机试 - 有效子字符串 | 机试题算法思路 【2023】
最近更新的博客 华为OD机试 - 简易压缩算法(Python) | 机试题算法思路 【2023】 华为OD机试题 - 获取最大软件版本号(JavaScript) 华为OD机试 - 猜字谜(Python) | 机试题+算法思路 【2023】 华为OD机试 - 删除指定目录(Python) | 机试题算法思路 【2023】 华为OD机试 …...
抽象类和接口
抽象类和接口 抽象类和接口的定义 抽象类主要用来抽取子类的通用特性,作为子类的模板,它不能被实例化,只能被用作为子类的超类。 接口是抽象方法的集合,声明了一系列的方法操作,如果一个类实现了某个接口,…...
STM32DSP库汇总
前言 本文仅对stm32的DSP库进行汇总,具体函数使用方式持续更新…… 分类函数名描述 BasicMathFunctions 基础数学函数 abs绝对值add加法dot_prod向量点积mult乘法negate相反数offset 偏置 scale比例缩放shift移位sub减法 ComplexMathFunctions 复数数学函数 conj…...
C++类和对象----思想基础应用
类与对象的思想&基础应用一、类声明1.1、封装类的意义1.1.1、在设计类的时候,属性和行为写在一起,表现事物1.1.2、成员权限1.2、struct和class区别1.3、成员属性设置为私有二、对象的初始化和清理2.1、构造函数&析构函数2.2、构造函数分类方法一…...
力扣解法汇总1792. 最大平均通过率
目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接:力扣 描述: 一所学校里有一些班级,每个班级里有一些学生,现在每个班…...
动手学深度学习(第二版)学习笔记 第二章
官网:http://zh.d2l.ai/ 视频可以去b站找 记录的是个人觉得不太熟的知识 第二章 预备知识 代码地址:d2l-zh/pytorch/chapter_preliminaries 2.1 数据操作 2.1. 数据操作 — 动手学深度学习 2.0.0 documentation 如果只想知道张量中元素的总数&#…...
CMake构建静态库与动态库以及使用
CMake构建静态库与动态库一、任务二、准备工作三、编译共享库四、ADD_LIBRARY指令五、编译静态库5.1、SET_TARGET_PROPERTIES指令5.2、GET_TARGET_PROPERTY指令六、动态库版本号七、安装共享库和头文件八、使用外部共享库和头文件8.1、准备工作8.2、引入头文件搜索路径8.3、为 …...
Linux 系统目录结构
登录系统后,在当前命令窗口下输入命令: ls / 你会看到如下图所示: 树状目录结构: 以下是对这些目录的解释: /bin: bin 是 Binaries (二进制文件) 的缩写, 这个目录存放着最经常使用的命令。 /boot: 这里…...
stable diffusion webui安装与使用(官方超简单教程)
预备依赖 下载miniconda 教程参考:https://blog.csdn.net/weixin_43828245/article/details/124768518安装git 参考教程:https://blog.csdn.net/weixin_46474921/article/details/127091723 下载sd-webui 官网 https://github.com/AUTOMATIC1111/stab…...
机器学习:学习k-近邻(KNN)模型建立、使用和评价
机器学习:学习k-近邻(KNN)模型建立、使用和评价 文章目录机器学习:学习k-近邻(KNN)模型建立、使用和评价一、实验目的二、实验原理三、实验环境四、实验内容五、实验步骤1.数据读取2.数据理解3.数据准备4.算…...
Hive Sampling 抽样函数:Random随机抽样、Block 基于数据块抽样、Bucket table 基于分桶表抽样
Hive Sampling 抽样函数 文章目录Hive Sampling 抽样函数Random随机抽样Block 基于数据块抽样Bucket table 基于分桶表抽样语法在HQL中,可以通过三种方式采样数据:随机采样,存储桶表采样和块采样。Random随机抽样 随机抽样使用rand()函数确保…...
2023年中职网络安全竞赛跨站脚本渗透解析-1(超详细)
跨站脚本渗透 任务环境说明:需求环境可私信博主! 服务器场景:Server2125(关闭链接)服务器场景操作系统:未知访问服务器网站目录1,根据页面信息完成条件,将获取到弹框信息作为flag提交;访问服务器网站目录2,根据页面信息完成条件,将获取到弹框信息作为flag提交;访问服…...
虚拟 DOM 详解
什么是虚拟 dom? 虚拟 dom 本质上就是一个普通的 JS 对象,用于描述视图的界面结构 在vue中,每个组件都有一个render函数,每个render函数都会返回一个虚拟 dom 树,这也就意味着每个组件都对应一棵虚拟 DOM 树 查看虚拟…...
Delphi Http Https 最好的解决方法(一)
当前文章主要解决Delphi调用http、https的常见报错。 开发工具: Delphi XE 10.1 Berlin版本 可能所需的控件包: QDAC 请自行下载。 1. 接口描述 dll_init 接口初始化,程序启动时调用,主要是对工具类实例的创建 dll_post 发送post请求&am…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...
嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)
目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 编辑编辑 UDP的特征 socke函数 bind函数 recvfrom函数(接收函数) sendto函数(发送函数) 五、网络编程之 UDP 用…...
【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统
Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...
