当前位置: 首页 > news >正文

java 中多线程、 队列使用实例,处理大数据业务

场景: 从redis 订阅数据 调用线程来异步处理数据

直接上代码

定义线程管理类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.*;/*** Created with IntelliJ IDEA.* @Description 线程池管理类*/
@Component
public class ThreadPoolManager implements BeanFactoryAware {private static Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);//用于从IOC里取对象private BeanFactory factory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下// 线程池维护线程的最少数量 (根据环境而定)private final static int CORE_POOL_SIZE = 10;// 线程池维护线程的最大数量 (根据环境而定)private final static int MAX_POOL_SIZE = 50;// 线程池维护线程所允许的空闲时间private final static int KEEP_ALIVE_TIME = 0;// 线程池所使用的缓冲队列大小 (此处队列设置 需要考虑处理数据的效率  内存的大小)private final static int WORK_QUEUE_SIZE = 99999;@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {factory = beanFactory;}// 消息队列public LinkedBlockingQueue<String> getMsgQueue() {return msgQueue;}LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();/*** 当线程池的容量满了,执行下面代码,将推送数据存入到缓冲队列*/final RejectedExecutionHandler handler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {String  temp = ((MsgHandleThread) r).getRecord();if (StringUtils.isEmpty(temp)) {msgQueue.offer(temp);}}};/*** 创建线程池*/final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);/*** 将任务加入线程池---执行数据处理*/public void addPushRecord(String  record) {MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}/*** 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。*/final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);/*** 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池*/final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//判断缓冲队列是否存在记录if (!msgQueue.isEmpty()) {//当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {String record = msgQueue.poll();MsgHandleThread subThread=new MsgHandleThread(record);threadPool.execute(subThread);}}}}, 0, 1, TimeUnit.SECONDS);/*** 终止订单线程池+调度线程池*/public void shutdown() {//true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止scheduledFuture.cancel(false);scheduler.shutdown();threadPool.shutdown();}
}
任务处理类
/*** Created with IntelliJ IDEA.* @Description 订阅数据 处理*/
@Component
@Scope("prototype")//spring 多例
public class MsgHandleThread implements Runnable {private Logger logger = LoggerFactory.getLogger(SubCheckDataThread.class);private IDataHandleService _serviceprivate String record;public SubCheckDataThread(String  _record) {this.record = _record;}public String getRecord() {return record;}@Overridepublic void run() {try {if (StringUtils.isEmpty(this.record)) {return;}// 无法注入是采用此方法if (_service== null) {_service= ApplicationContextProvider.getBean(IDataHandleService .class);}//TODO 具体业务logger.info("消费完成",record);} catch (Exception e) {e.printStackTrace();}}
}
调用
import com.yicheng.common.properties.SetProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;/*** <p>* 订阅redis消息* </p>** @Author: zhuYaqiang* @Date: 2024/06/12*/
@Component
public class SubscribeCheckData {@Autowiredprivate ThreadPoolManager threadPoolManager;/**** @Description:  查岗信息订阅---redis* @Param: [message]* @return: void* @Author: zhuYaqiang* @Date: 2024/06/12*/public void receiveMessage(String message) {try {threadPoolManager.addPushRecord(message);} catch (Exception e) {e.printStackTrace();}}}
redis 订阅消息后调用线程池处理数据
package com.yicheng.subscribeRedis;import com.yicheng.common.properties.SetProperties;
import com.yicheng.subscribeRedis.alarm.SubscribeAlarmNoticeData;
import com.yicheng.subscribeRedis.check.SubscribeCheckData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;/*** @title RedisSubscribeCHeck* @description* @create 2024/6/12 19:30*/
@Configuration
public class RedisMessageListener {@Autowiredprivate SetProperties setProperties;@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerCheckAdapter, MessageListenerAdapter listenerAlarmNoticeAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);listenerCheckAdapter.afterPropertiesSet();listenerAlarmNoticeAdapter.afterPropertiesSet();//订阅了的通道// 订阅查岗数据container.addMessageListener(listenerCheckAdapter, new PatternTopic(setProperties.getRedisCheckSub().getSubChannel()));//这个container 可以添加多个 messageListenerreturn container;}/*** 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法* 监听查岗消息* @param receiver* @return*/@BeanMessageListenerAdapter listenerCheckAdapter(SubscribeCheckData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}/*** 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法*   监听报警通知信息* @param receiver* @return*/@BeanMessageListenerAdapter listenerAlarmNoticeAdapter(SubscribeAlarmNoticeData receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}}

以上代码已在实际项目中使用,觉得有用的点赞收藏评论

相关文章:

java 中多线程、 队列使用实例,处理大数据业务

场景&#xff1a; 从redis 订阅数据 调用线程来异步处理数据 直接上代码 定义线程管理类 import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org…...

13.图形程序接口(Graphics API)

**图形程序接口&#xff08;Graphics API&#xff09;**是计算机图形学中的一个重要概念&#xff0c;可以理解为“程序员与GPU之间的桥梁”。用通俗易懂的方式来解释&#xff1a; 通俗解释&#xff1a;图形API就像翻译官 想象你是一个老板&#xff08;程序员&#xff09;&…...

PPT自动化 python-pptx -7: 占位符(placeholder)

占位符&#xff08;placeholder&#xff09;是演示文稿中用于容纳内容的预格式化容器。它们通过让模板设计者定义格式选项&#xff0c;简化了创建视觉一致幻灯片的过程&#xff0c;同时让最终用户专注于添加内容。这加快了演示文稿的开发速度&#xff0c;并确保幻灯片之间的外观…...

Pyecharts之图表组合与布局优化

在数据可视化中&#xff0c;我们经常需要将多个图表组合在一起&#xff0c;以展示不同维度的数据或者进行对比分析。同时&#xff0c;合理的布局能够提升图表的可读性和用户体验。Pyecharts 提供了强大的组件和方法&#xff0c;让我们可以轻松实现图表的组合和布局优化。本篇将…...

流行的开源高性能数据同步工具 - Apache SeaTunnel 整体架构运行原理

概述 背景 数据集成在现代企业的数据治理和决策支持中扮演着至关重要的角色。随着数据源的多样化和数据量的迅速增长&#xff0c;企业需要具备强大的数据集成能力来高效地处理和分析数据。SeaTunnel通过其高度可扩展和灵活的架构&#xff0c;帮助企业快速实现多源数据的采集、…...

Android vendor.img中文件执行权问题

问题 Android 9、11往vendor.img增加文件&#xff0c;烧写到设备后发现增加的可执行文件没有执行权限。经过漫长查找&#xff0c;终于找到了问题的根源&#xff0c;谨以此篇献给哪些脚踏实地的人们。 根本原因 system/core/libcutils/fs_config.cpp文件&#xff0c;fs_confi…...

关于使用微服务的注意要点总结

一、防止过度设计 微服务的拆分一定要结合团队人员规模来考虑&#xff0c;笔者就曾遇到过一个公司的项目&#xff0c;是从外部采购回来的&#xff0c;微服务划分为十几个应用&#xff0c;我们在此项目基础上进行自行维护和扩展。由于公司业务规模不大&#xff0c;而且二次开发的…...

C++17 新增属性详解

文章目录 1. [[fallthrough]]用途示例应用场景 2. [[maybe_unused]]用途示例应用场景 3. [[nodiscard]]用途示例应用场景 总结 C17标准引入了多个新的属性&#xff08;Attributes&#xff09;&#xff0c;这些属性为代码提供了更丰富的语义表达能力&#xff0c;同时帮助编译器生…...

使用python-docx包进行多文件word文字、字符批量替换

1、首先下载pycharm。 2、改为中文。 3、安装python-docx包。 搜索包名字&#xff0c;安装。 4、新建py文件&#xff0c;写程序。 from docx import Documentdef replace1(array1):# 替换词典&#xff08;标签值按实际情况修改&#xff09;dic {替换词1: array1[0], 替换…...

15_业务系统基类

创建脚本 SystemRoot.cs 因为 业务系统基类的子类 会涉及资源加载服务层ResSvc.cs 和 音乐播放服务层AudioSvc.cs 所以在业务系统基类 提取引用资源加载服务层ResSvc.cs 和 音乐播放服务层AudioSvc.cs 并调用单例初始化 using UnityEngine; // 功能 : 业务系统基类 public c…...

Pyecharts之散点图的视觉扩展

在数据可视化中&#xff0c;散点图是一种强大的工具&#xff0c;可用于展示数据点在二维平面上的分布情况。通过添加各种视觉组件&#xff0c;我们可以让散点图变得更加丰富和具有表现力&#xff0c;更能反映数据的多维度特征。本文将详细解读如何为散点图添加不同的视觉组件&a…...

Java学习教程,从入门到精通,JDBC删除数据库语法知识点(101)

一、JDBC删除数据库语法知识点 1. 概述 JDBC&#xff08;Java Database Connectivity&#xff09;是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口&#xff0c;提供了诸如查询和更新数据库中数据的方法。 在JDBC中&#xff0c;删除数据库的操作主要是通过执行…...

Baklib如何推动企业知识管理的创新与转型探讨

内容概要 在当今快速发展的数字化时代&#xff0c;企业需要不断适应变化&#xff0c;以保持竞争优势。Baklib作为一款企业知识管理中台&#xff0c;扮演着推动数字化转型的重要角色。它通过提供一个集成的知识管理平台&#xff0c;帮助企业高效管理和共享内部及外部的知识资源…...

【算法】递归型枚举与回溯剪枝初识

递归型枚举与回溯剪枝初识 1.枚举子集2.组合型枚举3.枚举排列4.全排列问题 什么是搜索&#xff1f;搜索&#xff0c;是一种枚举&#xff0c;通过穷举所有的情况来找到最优解&#xff0c;或者统计合法解的个数。因此&#xff0c;搜索有时候也叫作暴搜。搜索一般分为深度优先搜索…...

无人机 PX4 飞控 | PX4源码添加自定义参数方法并用QGC显示与调整

无人机 PX4 飞控 | PX4源码添加自定义参数方法并用QGC显示与调整 0 前言 之前文章添加了一个自定义的模块&#xff0c;本篇文章在之前的自定义模块中&#xff0c;添加两个自定义参数 使用QGC显示出来&#xff0c;并通过QGC调整参数值&#xff0c;代码实现参数更新 新增的参…...

《CPython Internals》阅读笔记:p356-p359

《CPython Internals》学习第 19天&#xff0c;p356-p359 总结&#xff0c;总计 4 页。 一、技术总结 1.benchmark suite The benchmark suite is the tool to use when comparing the complete performance of Python. The Python Benchmark suite is a collection of Pyth…...

Linux--权限

Linux系统的权限管理是保障系统安全的重要机制&#xff0c;以下详细讲解权限相关概念及操作指令&#xff1a; 一、基础权限机制 1. 权限的三元组&#xff0c;读&#xff08;r&#xff09;、写&#xff08;w&#xff09;、执行&#xff08;x&#xff09; 每个文件或目录有三组…...

java后端之登录认证

基础登录功能&#xff1a;根据提供的用户名和密码判断是否存在于数据库 LoginController.java RestController Slf4j public class LoginController {Autowiredprivate UserService userService;PostMapping("/login")public Result login(RequestBody User user) {…...

【矩阵二分】力扣378. 有序矩阵中第 K 小的元素

给你一个 n x n 矩阵 matrix &#xff0c;其中每行和每列元素均按升序排序&#xff0c;找到矩阵中第 k 小的元素。 请注意&#xff0c;它是 排序后 的第 k 小元素&#xff0c;而不是第 k 个 不同 的元素。 你必须找到一个内存复杂度优于 O(n2) 的解决方案。 示例 1&#xff1…...

C语言-构造数据类型

1、构造数据类型 结构体、共用体、枚举。 2、结构体 1、结构体的定义 结构体是一个自定义的复合数据类型&#xff0c;它允许将不同类型的数据组合在一起。 struct 结构体名 {数据类型1 成员变量1;数据类型2 成员变量2;数据类型3 成员变量3;数据类型4 成员变量4; } 2、结构体变…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

嵌入式学习笔记DAY33(网络编程——TCP)

一、网络架构 C/S &#xff08;client/server 客户端/服务器&#xff09;&#xff1a;由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序&#xff0c;负责提供用户界面和交互逻辑 &#xff0c;接收用户输入&#xff0c;向服务器发送请求&#xff0c;并展示服务…...

从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践

作者&#xff1a;吴岐诗&#xff0c;杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言&#xff1a;融合数据湖与数仓的创新之路 在数字金融时代&#xff0c;数据已成为金融机构的核心竞争力。杭银消费金…...

Python 高效图像帧提取与视频编码:实战指南

Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...

【Veristand】Veristand环境安装教程-Linux RT / Windows

首先声明&#xff0c;此教程是针对Simulink编译模型并导入Veristand中编写的&#xff0c;同时需要注意的是老用户编译可能用的是Veristand Model Framework&#xff0c;那个是历史版本&#xff0c;且NI不会再维护&#xff0c;新版本编译支持为VeriStand Model Generation Suppo…...