当前位置: 首页 > news >正文

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandlerworkHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费

定义的哈希函数接口和统一的基础父类

public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>*     默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>*     建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}

测试代码

public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile  int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}

相关文章:

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器&#xff0c;一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务&#xff0c;WorkHandler可以共同竞争消费同一个队列中的任务。也就是说&#xff0c;假设任务队列中有a、b、c、d三个事件&#xff0c;eventHa…...

单例设计模式双重检查的作用

先看双重校验锁的写法 public class Singleton {/*volatile 修饰&#xff0c;singleton new Singleton() 可以拆解为3步&#xff1a;1、分配对象内存(给singleton分配内存)2、调用构造器方法&#xff0c;执行初始化&#xff08;调用 Singleton 的构造函数来初始化成员变量&am…...

NGINX_十二 nginx 地址重写 rewrite

十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite&#xff0c;即URL重写&#xff0c;就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化&#xff0c;是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...

react用ECharts实现组织架构图

找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...

坚持刷题|合并有序链表

文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello&#xff0c;大家好&#xff0c;我是阿月。坚持刷题&#xff0c;老年痴呆追不上我&#xff0c;消失了一段时间&#xff0c;我又回来刷题啦&#xff0c;今天…...

SPI协议——对外部SPI Flash操作

目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区&#xff08;每个块有16个扇区&#xff09; 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页&#xff0c;每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...

kotlin类型检测与类型转换

一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...

【JDBC】Oracle数据库连接问题记录

Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载&#xff0c;可以先尝试使用下面方式加载检查类是否存在&#xff0c;如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...

leetcode45 跳跃游戏II

题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...

【数学】什么是方法矩估计?和最大似然估计是什么关系?

背景 方法矩估计&#xff08;Method of Moments Estimation&#xff09;和最大似然估计&#xff08;Maximum Likelihood Estimation, MLE&#xff09;是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系&#xff0c;通过样本数据计算样本矩来估计总体参数。最大似…...

C++初学者指南第一步---10.内存(基础)

C初学者指南第一步—10.内存&#xff08;基础&#xff09; 文章目录 C初学者指南第一步---10.内存&#xff08;基础&#xff09;1.内存模型1.1 纸上谈兵&#xff1a;C的抽象内存模型1.2 实践&#xff1a;内存的实际处理 2. 自动存储3.动态存储&#xff1a;std::vector3.1 动态内…...

扩散模型详细推导过程——编码与解码

符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)​第 i i i个训练数据&#xff0c;其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)​第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...

js如何实现开屏弹窗

开屏弹窗是什么&#xff0c;其实就是第一次登录后进入页面给你的一种公告提示&#xff0c;此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...

C#——文件读取Directory类详情

文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...

Ruby on Rails Post项目设置网站初始界面

在构建了Ruby的Web服务器后&#xff0c;第三步就可以去掉框架的官方页面&#xff0c;设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...

03-QTWebEngine中使用qtvirtualkeyboard

qt提供了 virtualKeyboard 虚拟键盘模块&#xff0c;只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题&#xff1a; 1、中文输入的时候没有输入提示 Qvirt…...

leetcode3无重复字符的最长字串(重点讲滑动窗口)

本文主要讲解无重复字符的最长字串的要点与细节&#xff0c;根据步骤一步步走更方便理解 c与java代码如下&#xff0c;末尾 具体要点&#xff1a; 1. 区分一下子串和子序列 子串&#xff1a;要求元素在母串中是连续地出现 子序列&#xff1a;不要求连续 2. 题目中有两个核心…...

Gobject tutorial 八

The GObject base class Object memory management Gobject的内存管理相关的API很复杂&#xff0c;但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下&#xff0c;与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...

DDMA信号处理以及数据处理的流程---cfar检测

Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...

【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 目录 &#x1f4d5;引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 &#x1f916;决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

PHP 8.5 即将发布:管道操作符、强力调试

前不久&#xff0c;PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5&#xff01;作为 PHP 语言的又一次重要迭代&#xff0c;PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是&#xff0c;借助强大的本地开发环境 ServBay&am…...

Python竞赛环境搭建全攻略

Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型&#xff08;算法、数据分析、机器学习等&#xff09;不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...