93、Redis 之 使用连接池管理Redis6.0以上的连接 及 消息的订阅与发布
★ 使用连接池管理Redis连接
从Redis 6.0开始,Redis可支持使用多线程来接收、处理客户端命令,因此应用程序可使用连接池来管理Redis连接。
上一章讲的是创建单个连接来操作redis数据库,这次使用连接池来操作redis数据库
Lettuce连接池 支持需要 Apache Commons Pool2 的支持,需要添加该依赖
接下来即可在程序中通过类似如下代码片段来创建连接池了。
var conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();
conf.setMaxTotal(20); // 设置连接池中允许的最大连接数
// 创建连接池对象(其中连接由redisClient的connectPubSub方法创建)
pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, conf);
代码演示
创建连接池对象,创建两个消息订阅者和一个消息发布者,然后操作redis数据库
1、添加依赖

Subscriper 第一个消息订阅者



启动这个消息订阅者的程序

Subscriper 第二个消息订阅者
直接拷贝第一个消息订阅者,然后修改这个消息订阅者只订阅 c2 这个channel 主题

Publisher 消息发布者
也是拷贝消息订阅者的代码,因为创建连接池对象的代码都是一样的。
这里只需要把消息订阅的方法改成消息发布的方法就可以了,其他代码一样。

测试:
测试成功
消息发布者成功发布消息
消息订阅者也能接收到各自订阅的channel的消息
用小黑窗测试也没有问题

完整代码
Subscriper
package cn.ljh.app;import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import java.time.Duration;//使用 Lettuce ,这个类是消息订阅者
//通过连接池操作redis数据库
public class Subscriper
{private RedisClient redisClient;//连接池pool对象private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;public void init(){//1、定义RedisURIRedisURI uri = RedisURI.builder().withHost("127.0.0.1").withPort(6379)//选择redis 16个数据库中的哪个数据库.withDatabase(0).withPassword(new char[]{'1', '2', '3', '4', '5', '6'}).withTimeout(Duration.ofMinutes(5)).build();//2、创建 RedisClient 客户端this.redisClient = RedisClient.create(uri);//创建连接池的配置对象//GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();//设置连接池允许的最大连接数conf.setMaxTotal(20);//3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);}//关闭资源public void closeResource(){//关闭连接池--先开后关this.pool.close();//关闭RedisClient 客户端------最先开的最后关this.redisClient.shutdown();}//订阅消息的方法public void subscribe() throws Exception{//从连接池中取出连接StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();//4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法RedisPubSubCommands cmd = conn.sync();//监听消息:消息到来时,是通过监听器来实现的conn.addListener(new RedisPubSubAdapter<>(){//匿名内部类重写这3个方法:收到消息、订阅主题、取消订阅主题//接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看)//接收消息的方法@Overridepublic void message(String channel, String message){System.err.printf("从 %s 收到消息 : %s\n " , channel , message);}//订阅普通channel激发的方法,//订阅主题的方法--下面有这个订阅的方法cmd.subscribe("c1", "c2");//不太清楚这个 subscribed方法 和 下面的 cmd.subscribe 方法的关联 todo@Overridepublic void subscribed(String channel, long count){System.err.println("完成订阅 :" + count);}//不订阅普通的channel所使用方法--取消订阅//取消订阅的方法@Overridepublic void unsubscribed(String channel, long count){System.err.println("取消订阅");}});//订阅消息------订阅了 c1 和 c2 这两个主题 channelcmd.subscribe("c1", "c2");}public static void main(String[] args) throws Exception{Subscriper subscriper = new Subscriper();subscriper.init();subscriper.subscribe();//改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了Thread.sleep(600000);//关闭资源subscriper.closeResource();}
}
Subscriper2
package cn.ljh.app;import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import java.time.Duration;//使用 Lettuce ,这个类是消息订阅者2
//通过连接池操作redis数据库
public class Subscriper2
{private RedisClient redisClient;//连接池pool对象private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;public void init(){//1、定义RedisURIRedisURI uri = RedisURI.builder().withHost("127.0.0.1").withPort(6379)//选择redis 16个数据库中的哪个数据库.withDatabase(0).withPassword(new char[]{'1', '2', '3', '4', '5', '6'}).withTimeout(Duration.ofMinutes(5)).build();//2、创建 RedisClient 客户端this.redisClient = RedisClient.create(uri);//创建连接池的配置对象//GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();//设置连接池允许的最大连接数conf.setMaxTotal(20);//3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);}//关闭资源public void closeResource(){//关闭连接池--先开后关this.pool.close();//关闭RedisClient 客户端------最先开的最后关this.redisClient.shutdown();}//订阅消息的方法public void subscribe() throws Exception{//从连接池中取出连接StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();//4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法RedisPubSubCommands cmd = conn.sync();//监听消息:消息到来时,是通过监听器来实现的conn.addListener(new RedisPubSubAdapter<>(){//接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看),@Overridepublic void message(String channel, String message){System.err.printf("从 %s 收到消息 : %s\n " , channel , message);}//订阅普通channel激发的方法,@Overridepublic void subscribed(String channel, long count){System.err.println("完成订阅 :" + count);}//不订阅普通的channel所使用方法@Overridepublic void unsubscribed(String channel, long count){System.err.println("取消订阅");}});//订阅消息------订阅了 c2 这个主题 channelcmd.subscribe( "c2");}public static void main(String[] args) throws Exception{Subscriper2 subscriper2 = new Subscriper2();subscriper2.init();subscriper2.subscribe();//改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了Thread.sleep(600000);//关闭资源subscriper2.closeResource();}}
Publisher
package cn.ljh.app;import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import java.time.Duration;//消息发布者//通过连接池操作redis数据库
public class Publisher
{private RedisClient redisClient;//连接池pool对象private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;public void init(){//1、定义RedisURIRedisURI uri = RedisURI.builder().withHost("127.0.0.1").withPort(6379)//选择redis 16个数据库中的哪个数据库.withDatabase(0).withPassword(new char[]{'1', '2', '3', '4', '5', '6'}).withTimeout(Duration.ofMinutes(5)).build();//2、创建 RedisClient 客户端this.redisClient = RedisClient.create(uri);//创建连接池的配置对象//GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();//设置连接池允许的最大连接数conf.setMaxTotal(20);//3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);}//关闭资源public void closeResource(){//关闭连接池--先开后关this.pool.close();//关闭RedisClient 客户端------最先开的最后关this.redisClient.shutdown();}//订阅消息的方法public void publish() throws Exception{//从连接池中取出连接StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();//4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法RedisPubSubCommands cmd = conn.sync();//向这两个channel主题各自发布了一条消息cmd.publish("c2","c2 c2 c2 这是一条来自 c2 这个channel 里面的消息");cmd.publish("c1","c1 c1 c1 这是一条来自 c1 这个channel 里面的消息");//关闭资源redisClient.shutdown();}//发送消息,消息发出去,程序就退出了public static void main(String[] args) throws Exception{Publisher subscriper2 = new Publisher();subscriper2.init();subscriper2.publish();subscriper2.closeResource();}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>Lettucepool</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- 引入 Lettuce 这个操作redis的框架的依赖 --><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>6.1.4.RELEASE</version></dependency><!-- 创建连接池对象的依赖 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.9.0</version></dependency></dependencies>
</project>
相关文章:
93、Redis 之 使用连接池管理Redis6.0以上的连接 及 消息的订阅与发布
★ 使用连接池管理Redis连接 从Redis 6.0开始,Redis可支持使用多线程来接收、处理客户端命令,因此应用程序可使用连接池来管理Redis连接。 上一章讲的是创建单个连接来操作redis数据库,这次使用连接池来操作redis数据库 Lettuce连接池 支持…...
doris动态分区开启历史分区
举例说明: CREATE TABLE tbl1 (k1 DATE,... ) PARTITION BY RANGE(k1) () DISTRIBUTED BY HASH(k1) PROPERTIES ("dynamic_partition.enable" "true","dynamic_partition.time_unit" "DAY","dynamic_partition.sta…...
Linux用户与权限(认知root用户、修改权限控制 - chmod、修改权限控制 - chown)
目录 1. 认知root用户 1.1 什么是root用户(超级管理员) 1.2 用户切换命令 1.3 sudo命令 1.3.1 为普通用户配置sudo认证 2. 用户、用户组管理 2.1 理解用户、用户组的概念 2.2 掌握用户、用户组管理的相关命令 2.2.1 用户组管理 2.2.2 …...
处理conda安装工具的动态库问题——解决记录 libssl.1.0.0 系统中所有openssl位置全览 whereis openssl
处理conda安装工具的动态库问题——解决记录 处理conda安装工具的动态库问题——解决记录 - 简书 解决libssl.so.1.0.0: cannot open shared object file: No such file or directory问题 - 简书 openssl 默认版本问题(Anaconda相关)_anaconda openssl-…...
如何在Go中格式化字符串
由于字符串通常由书面文本组成,在很多情况下,我们可能希望通过标点符号、换行和缩进来更好地控制字符串的外观,以使其更易于阅读。 在本教程中,我们将介绍一些处理go字符串的方法,以确保所有输出文本的格式正确。 字…...
C程序设计内容与例题讲解 -- 第四章--选择结构程序设计第二部分(第五版)谭浩强
前言:在前面我们学习了选择结构和条件判断,用if语句实现选择结构,关系运算符和关系表达式,逻辑运算符和逻辑表达式等知识。今天我们将接着上一篇未讲完的继续讲解。 鸡汤:种一棵树最好的时间是十年前,其次是现在!加油各…...
接雨水问题
接雨水问题 问题背景 LeetCode 42. 接雨水 接雨水问题是一个经典的计算雨水滞留量的问题,通常使用柱状图来表示不同高度的柱子。在下雨的情况下,柱子之间的凹陷部分能够存储雨水,问题的目标是计算这些柱子所能接收的雨水总量。 相关知识 …...
小谈设计模式(9)—工厂方法模式
小谈设计模式(9)—工厂方法模式 专栏介绍专栏地址专栏介绍 工厂方法模式角色分类抽象产品(Abstract Product)具体产品(Concrete Product)抽象工厂(Abstract Factory)具体工厂&#x…...
Android etc1tool之png图片转换pkm 和 zipalign简介
关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、商业变现、人工智能等,希望大家多多支持。 目录 一、导读二、etc1tool2.1、用法 三、zipalign3.1 使用 四…...
Spring Boot快速入门:构建简单的Web应用
SpringBoot Spring Boot是一个用于简化Spring应用程序开发的框架,它通过提供开箱即用的配置和一组常用的功能,使得构建高效、可维护的应用变得非常容易。在本篇博客中,我们将一步步地介绍如何快速入门Spring Boot,并构建一个简单的…...
JAVA 泛型、序列化和复制
泛型提供了编译时类型安全检测机制,该机制允许程序员在编译时检测到非法的类型。泛型的本质是参数化类型,也就是说所操作的数据类型被指定为一个参数。比如我们要写一个排序方法,能够对整型数组、字符串数组甚至其他任何类型的数组进行排序&a…...
以太网基础学习(二)——ARP协议
一、什么是MAC地址 MAC地址(英语:Media Access Control Address),直译为媒体访问控制位址,也称为局域网地址(LAN Address),MAC位址,以太网地址(Ethernet Addr…...
【Java-LangChain:使用 ChatGPT API 搭建系统-4】评估输入-分类
第三章,评估输入-分类 如果您正在构建一个允许用户输入信息的系统,首先要确保人们在负责任地使用系统,以及他们没有试图以某种方式滥用系统,这是非常重要的。 在本章中,我们将介绍几种策略来实现这一目标。 我们将学习…...
嵌入式Linux应用开发-驱动大全-第一章同步与互斥③
嵌入式Linux应用开发-驱动大全-第一章同步与互斥③ 第一章 同步与互斥③1.4 Linux锁的介绍与使用1.4.1 锁的类型1.4.1.1 自旋锁1.4.1.2 睡眠锁 1.4.2 锁的内核函数1.4.2.1 自旋锁1.4.2.2 信号量1.4.2.3 互斥量1.4.2.4 semaphore和 mutex的区别 1.4.3 何时用何种锁1.4.4 内核抢占…...
树的存储结构以及树,二叉树,森林之间的转换
目录 1.双亲表示法 2.孩子链表 3.孩子兄弟表示法 4.树与二叉树的转换 (1)树转换为二叉树 (2)二叉树转换成树 5.二叉树与森林的转化 (1)森林转换为二叉树 以下树为例 1.双亲表示法 双亲表示法定义了…...
【AI视野·今日NLP 自然语言处理论文速览 第四十二期】Wed, 27 Sep 2023
AI视野今日CS.NLP 自然语言处理论文速览 Wed, 27 Sep 2023 Totally 50 papers 👉上期速览✈更多精彩请移步主页 Daily Computation and Language Papers Attention Satisfies: A Constraint-Satisfaction Lens on Factual Errors of Language Models Authors Mert …...
华为云云耀云服务器L实例评测|部署个人在线电子书库 calibre
华为云云耀云服务器L实例评测|部署个人在线电子书库 calibre 一、云耀云服务器L实例介绍1.1 云服务器介绍1.2 应用场景1.3 支持镜像 二、云耀云服务器L实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置 三、部署 calibre3.1 calibre 介绍3.2 Docker 环境搭建3.3 c…...
代码随想录刷题 Day28
216.组合总和III 和前一个题一样,照着自己就能写出来,就多了一个判断结果是不是等于n的逻辑。有两个地方可以剪纸,一个是当和已经大于要找的时候直接返回,另一个是当剩余元素少于三个的时候直接返回(第一层递归是少于…...
【生命周期】
生命周期 1 引出生命周期2 分析生命周期3 总结生命周期 1 引出生命周期 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta …...
【C语言 模拟实现memcpy函数、memcpy函数】
C语言程序设计笔记---027 C语言之模拟实现memcpy函数、memcpy函数1、介绍memcpy函数1.1、模拟实现memcpy函数 2、介绍memmove函数2.1、模拟实现memmove函数 3、结语 C语言之模拟实现memcpy函数、memcpy函数 前言: 通过C语言内存函数的知识,这篇将对memc…...
基于MCP协议的Shopify数据AI分析:自动化广告优化实战指南
1. 项目概述:用AI打通Shopify数据与广告投放的任督二脉 如果你在运营一个Shopify独立站,并且正在为Google、Meta(Facebook/Instagram)或TikTok广告投放而头疼,那么你很可能正经历着所有电商卖家的共同困境:…...
从音频处理到IoT数据:用scipy.signal.resample_poly搞定实际项目中的采样率转换
从音频处理到IoT数据:用scipy.signal.resample_poly搞定实际项目中的采样率转换 采样率转换是数字信号处理中的常见需求,无论是音频处理、传感器数据分析还是通信系统仿真,都会遇到不同采样率设备间的数据交互问题。想象一下,当你…...
Vite+React+TypeScript构建个人作品集网站:从技术选型到GitHub Pages自动化部署
1. 项目概述:一个现代开发者如何构建自己的技术名片最近刚把自己的个人作品集网站重构上线,地址是https://yucco-k.github.io。这不仅仅是一个展示作品的静态页面,更是一个我用来实践和整合现代前端技术栈的“游乐场”。对于开发者而言&#…...
别再只用memcpy了!手把手教你用memcpy_s写出更安全的C语言代码(附VS2022实战)
从memcpy到memcpy_s:现代C语言安全编程实战指南 在Visual Studio 2022的编译输出窗口中,那个刺眼的C4996警告已经成为许多C语言开发者的"老朋友"。当看到"error C4996: memcpy: This function or variable may be unsafe"时…...
德国工业4.0:从顶层设计到车间实践的制造业数字化转型
1. 工业4.0浪潮下的欧洲:一场由德国引领的深度变革提到德国制造,很多人脑海里蹦出来的词是“严谨”、“保守”甚至“刻板”。没错,德国人对于工业流程、制造工艺和质量标准的执着,有时近乎偏执。但正是这种对“传统”的极致坚守&a…...
除了综合,DC Shell还能这么用:快速搭建一个轻量级RTL/Netlist查看与调试环境
DC Shell的隐藏技能:打造高效RTL/Netlist交互式调试环境 在数字芯片设计流程中,工程师们经常需要快速查看和分析RTL或网表文件。传统方法要么启动完整的综合流程耗时费力,要么依赖第三方工具可能面临兼容性问题。实际上,Synopsys …...
从‘仿真’到‘半虚拟化’:一文读懂VMware虚拟网卡(E1000/E1000E/VMXNET3)的工作原理与演进史
从仿真到半虚拟化:虚拟网卡技术演进与设计哲学深度解析 虚拟化技术已经成为现代计算架构的基石,而网络虚拟化则是其中最为关键的组成部分之一。在虚拟化环境中,虚拟网卡作为连接虚拟机与外部世界的桥梁,其设计理念直接影响着整个…...
当三维基因组“打结”:从罕见病到癌症,那些被折叠改变的生命密码
当三维基因组“打结”:从罕见病到癌症,那些被折叠改变的生命密码 想象一下,如果把人类基因组比作一条长达两米的毛线,它需要被精巧地折叠进直径仅几微米的细胞核中。这种看似不可能的折叠并非随机——它遵循着严格的拓扑规则&…...
Claude 3.5 Sonnet重磅升级(开发者必看的3个隐藏API调用技巧)
更多请点击: https://intelliparadigm.com 第一章:Claude 3.5 Sonnet重磅升级概览 Anthropic 正式发布 Claude 3.5 Sonnet,作为当前推理模型中响应速度与智能水平的全新标杆,其在多模态理解、长上下文处理及代码生成能力上实现显…...
C++项目集成Tesseract 5.x踩坑实录:从编译选项到内存管理的完整避坑指南
C项目集成Tesseract 5.x踩坑实录:从编译选项到内存管理的完整避坑指南 在计算机视觉和文档处理领域,Tesseract OCR引擎以其开源免费、多语言支持和较高的识别准确率,成为众多C项目的首选集成方案。然而,从源码编译到生产环境部署&…...
