当前位置: 首页 > news >正文

基于本地缓存制作一个分库分表的分布式ID生成器

引言:

代码在 https://gitee.com/lbmb/mb-live-app 中 【mb-live-id-generate-provider】 模块里面 如果喜欢 希望大家给给star 项目还在持续更新中。

背景介绍

项目整体架构是 基于springboot 3.0 开发 rpc 调用采用 dubbo
注册配置中心 使用 nacos 采用sharding-jdbc 来实现分库分表。
基于以上情况 我想生成分布式id。再根据生成的分布式id 存到不同的表中
例如 id 1000 存在 user01表 id 1001 存到 user02表,然后sharding-jdbc会根据我们

基础成长

  1. 可以学习到多线程、线程池的使用和设计
  2. 分布式id器的优化策略(预加载、类似hashmap扩容)

首先我们需要设计一张id策略表

CREATE TABLE `t_id_generate_config` (`id` int NOT NULL AUTO_INCREMENT COMMENT '主键 id',`remark` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '描述',`next_threshold` bigint DEFAULT NULL COMMENT '当前 id 所在阶段的阈\n值',`init_num` bigint DEFAULT NULL COMMENT '初始化值',`current_start` bigint DEFAULT NULL COMMENT '当前 id 所在阶段的开始\n值',`step` int DEFAULT NULL COMMENT 'id 递增区间',`is_seq` tinyint DEFAULT NULL COMMENT '是否有序(0 无序,1 有序)',`id_prefix` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '业务前缀码,如果没有则返回\n时不携带',`version` int NOT NULL DEFAULT '0' COMMENT '乐观锁版本号',`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时\n间',`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;INSERT INTO `t_id_generate_config` (`id`, `remark`,
`next_threshold`, `init_num`, `current_start`, `step`, `is_seq`,`id_prefix`, `version`, `create_time`, `update_time`)
VALUES
(1, '用户 id 生成策略', 10050, 10000, 10000, 50, 0,
'user_id', 0, '2023-05-23 12:38:21', '2023-05-23 23:31:45');

定义全局变量
变量解析

  1. localSeqIdBOMap 缓存中可分配的分布式id(有序id)
  2. localUnSeqIdBOMap 缓存中可分配的分布式id(无序id)
  3. SEQ_ID = 1; 判断是否为有序id 的操作(扩容 存取 等)
  4. threadPoolExecutor 移步线程池(用来异步动态扩容缓存的可分配id 池)
  5. semaphoreMap 信号量存放map 防止多线程环境下 多次重复触发异步扩容线程池。参考 ConcurrentHashMap 的扩容 实现(ConcurrentHashMap : )。
  6. UNDATE_RATE:动态扩容阀值
private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateService.class);private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap = new ConcurrentHashMap<Integer, LocalSeqIdBO>();private static Map<Integer, LocalUnSeqIdBO> localUnSeqIdBOMap = new ConcurrentHashMap<Integer, LocalUnSeqIdBO>();private static final Integer SEQ_ID = 1;private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("id-generate-thread-" + ThreadLocalRandom.current().nextInt(1000));return null;}});/*** 使用Semaphore 信号量来防止多线程并发 多次刷新id段*/private static Map<Integer, Semaphore> semaphoreMap = new ConcurrentHashMap<>();/*** id段刷新优化 阈值为0.75 达到百分之75 执行异步任务创建 优化*/private static final float UNDATE_RATE = 0.75f;

有序 id 生成器

 /*** 有序id生成器** @param id* @return*/@Overridepublic Long geSeqId(Integer id) {if (id == null) {LOGGER.error("[geSeqId] id is error,id is{}", id);return null;}LocalSeqIdBO localSeqIdBO = localSeqIdBOMap.get(id);if (localSeqIdBO == null) {LOGGER.error("[geSeqId] localSeqIdBO is null,id is{}", id);return null;}/*** 异步 预执行刷新id段*/this.refreshLocalSeqId(localSeqIdBO);long andIncrement = localSeqIdBO.getCurrentNum().getAndIncrement();if (andIncrement> localSeqIdBO.getNextThreshold()) {LOGGER.error("[geSeqId] id  is over limit,id is{}", id);return null;}// 获取当前id 直增return andIncrement;}

代码解读 从数据库读取到对应的方案(有序id 和无序id 方案 会有当前 可用的 id段 开始值 和 结束值 以及步长等信息)
LocalSeqIdBO localSeqIdBO = localSeqIdBOMap.get(id);
预扩容:例如当前 可用id段是 1000-1500 判断 1000-1500 的id 被使用超过了 百分之75 就动态将id池 进行扩容

this.refreshLocalSeqId(localSeqIdBO);

取出当前已使用的id 最大值 并且进行+1
long andIncrement = localSeqIdBO.getCurrentNum().getAndIncrement();

// 优化逻辑 如果当前 已经用的id +1 后 超过了当前id池的最大值 则不会生成id。例: 当前id池最大是 1500 但是取出的当前已用的id 为 1500 则加一后是1501 超过了id池最大值1500 则不会生成id 继而下一次操作 会扩容id池
if (andIncrement> localSeqIdBO.getNextThreshold()) {
LOGGER.error(“[geSeqId] id is over limit,id is{}”, id);
return null;
}

异步刷新本地 id池

 /*** 刷新本地有序的id段** @param localSeqIdBO*/private void refreshLocalSeqId(LocalSeqIdBO localSeqIdBO) {// 当前 id字段区间值long step = localSeqIdBO.getNextThreshold() - localSeqIdBO.getCurrentStart();/*** 使用Semaphore 信号量来防止多线程并发 多次刷新id段* 防止没扩容完成的时候过多线程进入到 if里面*/if (localSeqIdBO.getCurrentNum().get() - localSeqIdBO.getCurrentStart() > step * UNDATE_RATE) {Semaphore semaphore = semaphoreMap.get(localSeqIdBO.getId());if (semaphore == null) {LOGGER.error("semaphore is null ,id is{}", localSeqIdBO.getId());return;}boolean acquireStatus = semaphore.tryAcquire();if (acquireStatus) {// 异步进行同步id字段的操作LOGGER.info("尝试开始进行同步id段的同步操作");threadPoolExecutor.execute(new Runnable() {@Overridepublic void run() {try {IdGeneratePO idGeneratePO = mapper.selectById(localSeqIdBO.getId());tryUpdateMysqlRecord(idGeneratePO);// 释放semaphore资源} catch (Exception e) {LOGGER.error("[refreshLocalSeqId] error is {}", e);} finally {semaphoreMap.get(localSeqIdBO.getId()).release();LOGGER.info("有序id段同步完成,id is {}", localSeqIdBO.getId());}}});}}}

初次落第一批id数据到id池

spring 容器启动的时候 在初始化Bean后 会回调这个方法

   //spring 启动的时候 bean 初始化的时候会回调这里@Overridepublic void afterPropertiesSet() throws Exception {List<IdGeneratePO> idGeneratePOList = mapper.selectAll();for (IdGeneratePO idGeneratePO : idGeneratePOList) {tryUpdateMysqlRecord(idGeneratePO);semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));}}

更新数据库里面的 id字段占用位置信息 并且尝试将 已更新的id 段写入到缓存

   /*** 更新mysql里面的分布式id的配置信息,占用对应id段** @param idGeneratePO*/private void tryUpdateMysqlRecord(IdGeneratePO idGeneratePO) {int updateResult = mapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());if (updateResult > 0) {localIdBoHandler(idGeneratePO);return;}for (int i = 0; i < 3; i++) {IdGeneratePO newIdGeneratePO = mapper.selectById(idGeneratePO.getId());updateResult = mapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());if (updateResult > 0) {localIdBoHandler(idGeneratePO);
//                LocalSeqIdBO localSeqIdBO = new LocalSeqIdBO();
//                AtomicLong atomicLong = new AtomicLong(idGeneratePO.getCurrentStart());
//                localSeqIdBO.setId(idGeneratePO.getId() );
//                localSeqIdBO.setCurrentNum(atomicLong );
//                localSeqIdBO.setCurrentStart(idGeneratePO.getCurrentStart() );
//                localSeqIdBO.setNextThreshold(idGeneratePO.getNextThreshold()  );
//                localSeqIdBO.setCurrentNum(atomicLong );
//                localSeqIdBOMap.put(localSeqIdBO.getId(),localSeqIdBO);return;}}throw new RuntimeException("表id字段占用失败, 竞争过于激烈 id is :" + idGeneratePO.getId());}

将更新的id段 实际落到缓存

/*** 专门处理如何将id对象放入本地缓存中** @param idGeneratePO*/private void localIdBoHandler(IdGeneratePO idGeneratePO) {long currentStart = idGeneratePO.getCurrentStart();long nextThreshold = idGeneratePO.getNextThreshold();long currentNum = currentStart;// 判断数据库取出来的id配置是有序还是无序 1 有序 非 1 无序if (idGeneratePO.getIsSeq() == SEQ_ID) {// 有序存储LocalSeqIdBO localSeqIdBO = new LocalSeqIdBO();AtomicLong atomicLong = new AtomicLong(currentStart);localSeqIdBO.setId(idGeneratePO.getId());localSeqIdBO.setCurrentStart(currentStart);localSeqIdBO.setNextThreshold(nextThreshold);localSeqIdBO.setCurrentNum(atomicLong);localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);} else {LocalUnSeqIdBO localUnSeqIdBO = new LocalUnSeqIdBO();localUnSeqIdBO.setId(idGeneratePO.getId());localUnSeqIdBO.setCurrentStart(currentStart);localUnSeqIdBO.setNextThreshold(nextThreshold);long begin = idGeneratePO.getCurrentStart();long end = idGeneratePO.getNextThreshold();ConcurrentLinkedQueue idQueue = new ConcurrentLinkedQueue();ArrayList<Long> idList = new ArrayList<>();for (long i = begin; i < end; i++) {idList.add(i);}// 无序操作将有序集合打乱Collections.shuffle(idList);idQueue.addAll(idList);localUnSeqIdBO.setIdQueue(idQueue);localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);}}

mapper 内容

@Mapper
public interface IdGenerateMapper extends BaseMapper<IdGeneratePO> {//    @Update("update t_id_generate_config set next_threshold = next_threshold + step,current_start=current_start + step , version = version + 1 where id = #{id} and version = #{version}")
//    int updateNewIdCountAndVersion(@Param("id") int id, @Param("version") int version);@Update("update t_id_generate_config set next_threshold=next_threshold+step," +"current_start=current_start+step,version=version+1 where id =#{id} and version=#{version}")int updateNewIdCountAndVersion(@Param("id") int id, @Param("version") int version);@Select("select * from t_id_generate_config")List<IdGeneratePO> selectAll();
}

相关文章:

基于本地缓存制作一个分库分表的分布式ID生成器

引言&#xff1a; 代码在 https://gitee.com/lbmb/mb-live-app 中 【mb-live-id-generate-provider】 模块里面 如果喜欢 希望大家给给star 项目还在持续更新中。 背景介绍 项目整体架构是 基于springboot 3.0 开发 rpc 调用采用 dubbo 注册配置中心 使用 nacos 采用shardin…...

美易平台:金融市场的晴雨表与创新服务的融合

在金融市场中&#xff0c;利率的微妙变动往往预示着经济活动的脉动&#xff0c;而美国纽约联储发布的最新数据显示&#xff0c;上个交易日&#xff08;1月25日&#xff09;担保隔夜融资利率&#xff08;SOFR&#xff09;小幅上升至5.32%&#xff0c;而同期有效的联邦基金利率保…...

文旅项目包括什么?

文旅项目是指与文化和旅游相结合的项目&#xff0c;旨在通过提供丰富的文化体验和旅游服务来吸引游客&#xff0c;促进地方经济发展。 文旅项目通常包括多个方面&#xff0c;以下是对每块内容的详细介绍&#xff1a; 文化旅游景区&#xff1a;这类项目以展示人类文化和历史遗产…...

Pointnet++改进优化器系列:全网首发AdamW优化器 |即插即用,实现有效涨点

简介:1.该教程提供大量的首发改进的方式,降低上手难度,多种结构改进,助力寻找创新点!2.本篇文章对Pointnet++特征提取模块进行改进,加入AdamW优化器,提升性能。3.专栏持续更新,紧随最新的研究内容。 目录 1.理论介绍 2.修改步骤 2.1 步骤一 2.2 步骤二 2.3 步...

stm32 FOC 电机介绍

今年开始学习foc控制无刷电机&#xff0c;这几天把所学整理一下&#xff0c;记录一下知识内容。 前言: 为什么要学习FOC? 1.电机控制是自动化控制领域重要一环。 2.目前直流无刷电机应用越来越广泛&#xff0c;如无人机、机械臂、云台、仿生机器人等等。 需要什么基础&…...

【Linux】进程通信——管道

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;折纸花满衣 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;【LeetCode】winter vacation training 目录 &#x1f4cb;进程通信的目的&#x1f4cb;管道匿名管道pipe函数创…...

3d gaussian splatting笔记(paper部分翻译)

本文为3DGS paper的部分翻译。 基于点的&#x1d6fc;混合和 NeRF 风格的体积渲染本质上共享相同的图像形成模型。 具体来说&#xff0c;颜色 &#x1d436; 由沿射线的体积渲染给出&#xff1a; 其中密度 &#x1d70e;、透射率 &#x1d447; 和颜色 c 的样本是沿着射线以…...

TCP 三次握手以及滑动窗口

TCP 三次握手 简介&#xff1a; TCP 是一种面向连接的单播协议&#xff0c;在发送数据前&#xff0c;通信双方必须在彼此间建立一条连接。所谓的 “ 连接” &#xff0c;其实是客户端和服务器的内存里保存的一份关于对方的信息&#xff0c;如 IP 地址、端口号等。 TCP 可以…...

Vue3 Cli5按需导入ElementPlus

1、安装环境 node&#xff1a;16.20.0 vue&#xff1a;3.2.36 vue/cli&#xff1a;5.0.0 element-plus&#xff1a;2.2.25 element-plus/icons-vue&#xff1a;2.0.10 unplugin-auto-import&#xff1a;0.16.1 // 当前环境用这个包&#xff0c;不然会提示各种错误 unplugin-vu…...

playwright自动化项目搭建

具备功能 关键技术&#xff1a; pylaywright测试库pytest单元测试框架pytest-playwright插件 非关键技术&#xff1a; pytest-html插件pytest-rerunfailures插件seldom 测试框架 实现功能&#xff1a; 元素定位与操作分离失败自动截图并保存到HTML报告失败重跑可配置不同…...

mysql字符集

一、查看字符集 //查看数据库字符集 SHOW CREATE DATABASE databasename; //查看表字符集 SHOW CREATE TABLE tablename; //查看指定表全部字段字符集 show full columns from table; 二、修改字符集 将超出utf8字符集范围的字符比如&#x2aa27;插入到utf8字符集的字…...

Elasticsearch:聊天机器人、人工智能和人力资源:电信公司和企业组织的成功组合

作者&#xff1a;来自 Elastic Jrgen Obermann, Piotr Kobziakowski 让我们来谈谈大型企业人力资源领域中一些很酷且改变游戏规则的东西&#xff1a;生成式 AI 和 Elastic Stack 的绝佳组合。 现在&#xff0c;想象一下大型电信公司的典型人力资源部门 — 他们正在处理一百万件…...

[AIGC大数据基础] Flink: 大数据流处理的未来

Flink 是一个分布式流处理引擎&#xff0c;它被广泛应用于大数据领域&#xff0c;具有高效、可扩展和容错的特性。它是由 Apache 软件基金会开发和维护的开源项目&#xff0c;并且在业界中受到了广泛认可和使用。 文章目录 什么是 FlinkFlink 的特点真正的流处理高性能和低延迟…...

数据结构之线性表(一般的线性表)

前言 接下来就开始正式进入数据结构环节了&#xff0c;我们先从线性表开始。 线性表 线性表&#xff08;linear list&#xff09;也叫线性存储结构&#xff0c;即数据元素的逻辑结构为线性的数据表&#xff0c;它是数据结构中最简单和最常用的一种存储结构&#xff0c;专门存…...

uniapp安卓android离线打包本地打包整理

离线打包准备 下载Android studio 1.准备资源hbuilder 2.准备离线SDK 最新android平台SDK下载最新android平台SDK下载 3.离线打包key申请 4.直接导入HBuilder-Integrate-AS工程,直接运行simpleDemo项目即可 5.安装java 1.8 jdk-8u151-windows-x64 6.遇到这个报错报错Caus…...

vmware安装centos8-stream

VMware与CentOS8-stream的配置教程【2022-9-5】_centos stream 8-CSDN博客 启动进入后配置网络&#xff0c;/etc/sysconfig/network-scripts/网卡 vmware上的centos8没有网络_主机时wifi上网,centos 8 安装后无法连接网络 解决办法-CSDN博客 centos8配置网络_centos8网络配置…...

使用HttpServletRequestWrapper解决web项目request数据流无法重复读取的问题

在做web项目开发时&#xff0c;我们有时候需要做一些前置的拦截判断处理&#xff0c;比如非法参数校验&#xff0c;防攻击拦截&#xff0c;统一日志处理等&#xff0c;而请求参数如果是form表单提交还好处理&#xff1b;对于json这种输入流的数据就会有问题&#xff0c;统一处理…...

从CNN ,LSTM 到Transformer的综述

前情提要&#xff1a;文本大量参照了以下的博客&#xff0c;本文创作的初衷是为了分享博主自己的学习和理解。对于刚开始接触NLP的同学来说&#xff0c;可以结合唐宇迪老师的B站视频【【NLP精华版教程】强推&#xff01;不愧是的最完整的NLP教程和学习路线图从原理构成开始学&a…...

Git学习笔记:1 基础命令详解

文章目录 Git基础命令详解&#xff1a; Git基础命令详解&#xff1a; git commit 用法&#xff1a;git commit -m "commit message"功能&#xff1a;将暂存区&#xff08;stage&#xff09;中的所有更改提交到本地仓库的当前分支&#xff0c;同时提供一个简短的提交信…...

【服务器】安装宝塔面板

目录 &#x1f33a;【前言】 &#x1f33c;【前提】连接服务器 &#x1f337;方式一 使用工具登录服务器如Xshell &#x1f337;方式二 阿里云直接连接 &#x1f33c; 1. 安装宝塔 &#x1f337;获取安装脚本 方式一 使用下面提供的脚本安装 方式二 使用官网提供的脚本…...

开源模型应用落地-业务优化篇(一)

一、前言 通过参与“开源模型应用落地-业务整合系列篇”的学习,我们已经成功建立了基本的业务流程。然而,这只是迈出了万里长征的第一步。现在我们要对整个项目进行优化,以提高效率。我们计划利用线程池来加快处理速度,使用redis来实现排队需求,以及通过多级环境来减轻负载…...

【遥感专题系列】影像信息提取之——基于专家知识的决策树分类

可以将多源数据用于影像分类当中&#xff0c;这就是专家知识的决策树分类器&#xff0c;本专题以ENVI中Decision Tree为例来叙述这一分类器。 本专题包括以下内容&#xff1a; 专家知识分类器概述知识&#xff08;规则&#xff09;定义ENVI中Decision Tree的使用 概述 基于知…...

lqb日志08

一只小蒟蒻备考蓝桥杯的日志 文章目录 笔记坐标相遇判断工作调度问题&#xff08;抽象时间轴绘制&#xff09; 刷题心得小结 笔记 坐标相遇判断 我是小懒虫&#xff0c;碰了一下运气&#xff0c;开了个“恰当”的数&#xff08;7000&#xff09;如果&#xff0c;7000次还不能…...

SAP EXCEL上传如何实现指定读取某一个sheet页(ALSM_EXCEL_TO_INTERNAL_TABLE)

如何读取指定的EXCEL sheet 页签&#xff0c;比如要读取下图中第二个输出sheet页签 具体实现方法如下&#xff1a; 拷贝标准的函数ALSM_EXCEL_TO_INTERNAL_TABLE封装成一个自定义函数ZCALSM_EXCEL_TO_INTERNAL_TABLE 在自定义函数导入参数页签新增一个参数SHEET_NAME 在源代码…...

奇怪问题说 - 测试篇

文章目录 1.什么是软件测试2.软件测试和开发的区别3.软件测试的发展&#xff1a;4.软件测试岗位5.软件测试在不同类型公司的定位6.一个优秀的软件测试人员具备的素质6.1综合能力6.2掌握自动化测试技术6.3优秀的测试用例设计能力6.4探索性思维6.5有责任感和一定的压力 7.软件测试…...

中国新能源汽车持续跑出发展“加速度”,比亚迪迎来向上突破

2023年已经过去&#xff0c;对于汽车圈而言&#xff0c;2023年是中国车市的分水岭&#xff0c;在这一年&#xff0c;中国汽车工业70年以来首次进入全球序列&#xff0c;自主品牌强势霸榜&#xff0c;销量首次超过合资车。要知道&#xff0c;这是自大众于1984年进入中国市场成立…...

chatGPT辅助写硕士毕业论文

一、写作顺序 1.标题、研究问题、研究方法 2.文献综述&#xff08;占比1/5-1/6&#xff09; 3.论证章节 4.结论、不足、启示 5.处理图表、参考文献的格式 6.绪论或引言 7.摘要、关键词 8.查重、装订 http://【硕士毕业论文写不下去&#xff0c;多亏听了张博士的论文写…...

搭建nginx图片服务器

&#xff08;1&#xff09;将图片存储于/home/data/images目录&#xff1b; &#xff08;2&#xff09;配置nginx.conf user nginx; worker_processes 4;error_log /var/log/nginx/error.log notice; pid /var/run/nginx.pid;events {worker_connections 10000; }ht…...

大数据学习之Flink算子、了解DataStream API(基础篇一)

DataStream API &#xff08;基础篇&#xff09; 注&#xff1a; 本文只涉及DataStream 原因&#xff1a;随着大数据和流式计算需求的增长&#xff0c;处理实时数据流变得越来越重要。因此&#xff0c;DataStream由于其处理实时数据流的特性和能力&#xff0c;逐渐替代了DataSe…...

js中字符串string,遍历json/Object【匹配url、邮箱、电话,版本号,千位分割,判断回文】

目录 正则 合法的URL 邮箱、电话 字符串方法 千位分割&#xff1a;num.slice(render, len).match(/\d{3}/g).join(,) 版本号比较 判断回文 json/Object 遍历 自身属性 for...inhasOwnProperty(key) Object.获取数组(obj)&#xff1a;Object.keys&#xff0c;Object…...