Pipeline流水线组件
文章目录
- 1、新建pipeline流水线
- 2、定义处理器
- 3、定义处理器上下文
- 4、pipeline流水线实现
- 5、处理器抽象类实现
- 6、pipeline流水线构建者
- 7、具体处理器实现
- 8、流水线测试
- 9、运行结果
1、新建pipeline流水线
package com.summer.toolkit.model.chain;import java.util.List;
import java.util.concurrent.Executor;public interface Pipeline<T> {/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(Handler<T> handler);/*** 向pipeline中添加一个执行器** @param name 执行器名称* @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(String name, Handler<T> handler);/*** pipeline执行** @param list 数据集合* @return 返回值,执行完成返回true*/boolean execute(List<T> list);/*** pipeline并行执行** @param list 数据集合* @param executor 线程池* @return 返回值,执行完成返回true*/boolean parallelExecute(List<T> list, Executor executor);/*** pipeline执行** @param object 单个数据* @return 返回值,执行完成返回true*/boolean execute(T object);}
2、定义处理器
package com.summer.toolkit.model.chain;public interface Handler<T> {/*** 处理器处理方法** @param handlerContext 上下文* @param t 要处理的数据*/void doHandler(HandlerContext<T> handlerContext, T t);}
3、定义处理器上下文
package com.summer.toolkit.model.chain;import lombok.Data;@Data
public class HandlerContext<T> {/*** 执行器名称 */private String name;/*** 执行器 */private Handler<T> handler;/*** 链表的下一个节点,用来保存下一个执行器 */public HandlerContext<T> next;public HandlerContext(Handler<T> handler) {this.name = handler.getClass().getName();this.handler = handler;}public HandlerContext(String name, Handler<T> handler) {this.name = name;this.handler = handler;}/*** 调用该方法即调用上下文中处理器的执行方法** @param t 需要处理的数据*/public void handle(T t) {this.handler.doHandler(this, t);}/*** 执行下一个节点的处理器** @param t 待执行的数据*/public void runNext(T t) {if (this.next != null) {this.next.handle(t);}}
}
4、pipeline流水线实现
package com.summer.toolkit.model.chain;import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;@Slf4j
public class DefaultPipeline<T> implements Pipeline<T> {/*** 默认pipeline中有一个处理器上下文的头结点* 头结点无处理逻辑,直接执行下一个节点的处理器*/HandlerContext<T> head = new HandlerContext<>(HandlerContext::runNext);@Overridepublic Pipeline<T> addLast(Handler<T> handler) {this.addLast(null, handler);return this;}@Overridepublic Pipeline<T> addLast(String name, Handler<T> handler) {if (handler == null) {log.warn("处理器为空,不进行添加!");return this;}if (StringUtils.isEmpty(name)) {name = handler.getClass().getName();}// 将处理器添加到处理器上下文的尾节点HandlerContext<T> context = head;while (context.next != null) {context = context.next;}context.next = new HandlerContext<T>(name, handler);return this;}@Overridepublic boolean execute(List<T> list) {List<Object> result = list.stream().peek(this::execute).collect(Collectors.toList());return true;}@Overridepublic boolean parallelExecute(List<T> list, Executor executor) {Map<String, List<T>> parts = this.split(list);List<CompletableFuture<Boolean>> results = new ArrayList<>();for (Map.Entry<String, List<T>> entry : parts.entrySet()) {CompletableFuture<Boolean> completableFuture = CompletableFuture// 提交任务.supplyAsync(() -> this.execute(entry.getValue()), executor)// 打印异常信息.exceptionally(e -> {log.error("并行处理数据时发生异常!{}", e.getMessage(), e);return Boolean.FALSE;});results.add(completableFuture);}CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();return true;}@Overridepublic boolean execute(T t) {this.head.handle(t);return true;}/*** 对集合进行分组拆分** @param list 集合* @return 返回值*/private Map<String, List<T>> split(List<T> list) {Map<String, List<T>> parts = new HashMap<>(8);if (CollectionUtils.isEmpty(list)) {return parts;}// 如果集合数量过少,则不进行分组int limit = 10;if (list.size() < limit) {String key = String.valueOf(0);parts.put(key, list);return parts;}// 固定分五个分组int group = 5;for (int i = 0, length = list.size(); i < length; i++) {int key = i % group;List<T> part = parts.computeIfAbsent(String.valueOf(key), k -> new ArrayList<>());T t = list.get(i);part.add(t);}return parts;}}
5、处理器抽象类实现
package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;@Slf4j
public abstract class AbstractHandler<T> implements Handler<T> {/*** 开始处理数据,通用方法** @param handlerContext 上下文* @param t 要处理的数据*/@Overridepublic void doHandler(HandlerContext<T> handlerContext, T t) {long start = System.currentTimeMillis();String threadName = Thread.currentThread().getName();String handlerName = handlerContext.getName();log.info("====={} 开始处理:{}=====", threadName, handlerName);try {// 此处处理异常,如果执行过程失败,则继续执行下一个handlerthis.handle(t);} catch (Throwable throwable) {log.error("====={} 处理异常:{},异常原因:{}=====", threadName, handlerName, throwable.getMessage(), throwable);this.handleException(t, throwable);}long end = System.currentTimeMillis();log.info("====={} 处理完成:{},耗时:{} 毫秒=====", threadName, handlerName, (end - start));// 处理完该上下文中的处理器逻辑后,调用上下文中的下一个执行器的执行方法handlerContext.runNext(t);}/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象*/public abstract void handle(T t);/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象* @param throwable 异常对象*/public void handleException(T t, Throwable throwable) {log.error("=====处理数据发生异常:{}", throwable.getMessage(), throwable);}}
6、pipeline流水线构建者
package com.summer.toolkit.model.chain;public class DefaultPipelineBuilder<T> {private final Pipeline<T> pipeline;public DefaultPipelineBuilder() {this.pipeline = new DefaultPipeline<>();}/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(Handler<T> handler) {pipeline.addLast(handler);return this;}/*** 向pipeline中添加一个执行器** @param name 执行器名称* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(String name, Handler<T> handler) {pipeline.addLast(name, handler);return this;}/*** 返回pipeline对象** @return 返回值*/public Pipeline<T> build() {return this.pipeline;}}
7、具体处理器实现
package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;import java.util.Objects;@Slf4j
public class StringHandler extends AbstractHandler<String> {@Overridepublic void handle(String s) {log.info("入参:{}", s);}@Overridepublic void handleException(String s, Throwable throwable) {if (Objects.nonNull(throwable)) {log.error("异常:{}", throwable.getMessage());}}}
8、流水线测试
package com.summer.toolkit.model;import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;public class Processor {public static void main(String[] args) {DefaultPipelineBuilder<String> builder = new DefaultPipelineBuilder<>();Pipeline<String> pipeline = builder.addLast("字符串信息", new StringHandler()).addLast("寄件人信息", new StringHandler()).addLast("收件人信息", new StringHandler()).build();pipeline.execute("1");}}
9、运行结果
20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:字符串信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:字符串信息,耗时:5 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:寄件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:寄件人信息,耗时:0 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:收件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:收件人信息,耗时:0 毫秒=====
相关文章:
Pipeline流水线组件
文章目录 1、新建pipeline流水线2、定义处理器3、定义处理器上下文4、pipeline流水线实现5、处理器抽象类实现6、pipeline流水线构建者7、具体处理器实现8、流水线测试9、运行结果 1、新建pipeline流水线 package com.summer.toolkit.model.chain;import java.util.List; impo…...

闪灵CMS电子商城系统源码v5.0(自带微信小程序)
源码介绍 闪灵CMS电子商城系统源码,双语带手机版,PHPMYSQL进行开发,网站安装简单、快捷。 闪灵CMS系统更新日志 1.修复:修复了开启强制https后,说明文档重定向过多的问题 2.修复:修复了商品名称过长时无…...

基于SSM的旅游民宿预定系统【源码】【运行教程】
基于SSM的旅游民宿预定系统 一、项目介绍1. 游客功能2. 管理员功能3. 高级功能 二、项目技术栈三、项目运行四、项目演示总结 大家好,这里是程序猿代码之路!随着旅游业的快速发展,民宿作为一种独特的住宿方式越来越受到游客的喜爱。为了提升用…...

PgSQL技术内幕 - psql与服务端连接与交互机制
PgSQL技术内幕 - 客户端psql与服务端连接与交互机制 简单来说,PgSQL的psql客户端向服务端发起连接请求,服务端接收到请求后,fork出一个子进程,之后由该子进程和客户端进行交互,处理客户端的SQL等,并将结果返…...

实现开发板三盏灯点亮熄灭
实现开发板三盏灯点亮熄灭 typedef struct {volatile unsigned int MODER; // 0x00volatile unsigned int OTYPER; // 0x04volatile unsigned int OSPEEDR; // 0x08volatile unsigned int PUPDR; // 0x0Cvolatile unsigned int IDR; // 0x10volatile unsigned int OD…...

外汇天眼:盈透证券为客户提供更丰富的欧洲衍生品交易渠道
电子交易巨头盈透证券(纳斯达克代码:IBKR)今日宣布,通过Cboe欧洲期权交易所(CEDX)新增欧洲股票期权和欧洲指数期货及期权。这一新增功能使得盈透证券的客户可以在单一统一平台上,除了股票、期权…...

论文阅读Rolling-Unet,卷积结合MLP的图像分割模型
这篇论文提出了一种新的医学图像分割网络Rolling-Unet,目的是在不用Transformer的前提下,能同时有效提取局部特征和长距离依赖性,从而在性能和计算成本之间找到良好的平衡点。 论文地址:https://ojs.aaai.org/index.php/AAAI/article/view/2…...
Linux Shell命令vim使用
一、引例 以判断引出(学过C其他语言容易接受)。 简单命令说明: -e 测试文件是否存在 -f 测试文件是否为普通文件 -d 测试文件是否为目录 -r 测试当前用户对某文件是否具有“可读”权限 -w 测试当前用户对某文件是否具有“可写”权限…...

如何将 API 管理从 Postman 转移到 Apifox
上一篇推文讲到用 Swagger 管理的 API 怎么迁移到 Apifox,有许多同学反馈说能不能介绍一下 Postman 的迁移以及迁移过程中需要注意的事项。那么今天,它来了! 从 Postman 迁移到 Apifox 的方法有两种: 导出 Postman 集合 &#x…...
用链表实现的C语言队列
一、队列概述 在数据结构中,队列是一种先进先出(FIFO)的线性表。它在许多应用场景中非常有用,例如任务调度、进程管理、资源管理等。队列是一种重要的数据结构,其主要特点是先进先出(FIFO, First In First …...

国产SDI视频均衡驱动器,功能与 LMH0387/LMH0344 一致
视频均衡驱动器,功能与 LMH0387 一致、LMH0344。本期间支持 DVB-ASI,作为驱动器能够选择输出速率,作为均衡接收器能支持100m以上传输距离(线缆类型Belden 1694A)。最大支持3Gbps 速率的信号 2 产品特征 a)…...

如何用Xinstall CPS结算系统打破传统营销桎梏,实现用户增长?
在互联网流量红利逐渐衰退的今天,App推广和运营面临着前所未有的挑战。如何快速搭建起满足用户需求的运营体系,成为了众多企业急待解决的问题。而在这个关键时刻,Xinstall CPS结算系统应运而生,以其独特的优势帮助企业解决了一系列…...

(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2+bx+c0 的两个根:
(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2bxc0 的两个根: b2-4ac 称作一元二次方程的判别式。如果它是正值,那么一元二次方程就有两个实数根。 如果它为 0,方程式就只有一个根。如果它是负值,方程式无实根。 编写程序,提示…...

如何提高网站收录?
GSI服务就是专门干这个的,这个服务用的是光算科技自己研发的GPC爬虫池系统。这个系统通过建立一个庞大的站群和复杂的链接结构,来吸引谷歌的爬虫。这样一来,你的网站就能更频繁地被谷歌的爬虫访问,从而提高被收录的机会。 说到效…...

Docker 学习总结(83)—— 配置文件daemon.json介绍及优化建议
一、daemon.json 文件概述 daemon.json是Docker守护进程的配置文件,它允许系统管理员自定义Docker守护程序的行为。此文件通常位于/etc/docker/目录下。通过修改daemon.json,可以调整Docker守护进程的多种设置,包括网络配置、日志记录、存储驱动等。 二、daemon.json 文件结…...

Javaweb04-Servlet技术2(HttpServletResponse, HttpServletRequest)
Servlet技术基础 HttpServletResponse对象 HttpServletResponce对象是继承ServletResponse接口,专门用于封装Http请求 HttpServletResponce有关响应行的方法 方法说明功能描述void setStatus(int stauts)用于设置HTTP响应消息的状态码,并生成响应状态…...
chat gpt基本原理解读
chat gpt基本原理解读 ChatGPT是一种基于生成式预训练变换器(Generative Pre-trained Transformer, GPT)的对话模型,主要通过大量的文本数据训练生成自然语言回复。以下是ChatGPT的基本原理解读: 1. 基本架构 ChatGPT 是基于 GPT…...

单目标应用:基于蛇鹫优化算法SBOA的微电网优化(MATLAB代码)
一、微电网模型介绍 微电网多目标优化调度模型简介_vmgpqv-CSDN博客 参考文献: [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、蛇鹫优化算法求解微电网 2.1算法简介 蛇鹫优化算法(Secre…...

MySQL系列-安装配置使用说明(MAC版本)
1、前言 本文将介绍MySQL的安装配置以及基本语法操作说明 环境:mac 版本:MySQL 8.0.28 之前电脑安装卸载过,后面在装的时候遇到一些问题,用了四五天才解决,主要是参考 https://blog.csdn.net/zz00008888/article/deta…...
vue elementui el-input 正则验证,限制只能输入数字和小数
vue elementui el-input 正则验证 限制只能输入数字和小数,以下两种方法都可以: 1、οninput“value value.replace(/[^0-9.]/g,‘’)” 2、οninput“value value.replace(/[^\d.]/g, ‘’)” 限制只能输入数字: 1、oninput “valuevalu…...

网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...

Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.
ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #:…...

自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...