【JUC进阶】14. TransmittableThreadLocal
目录
1、前言
2、TransmittableThreadLocal
2.1、使用场景
2.2、基本使用
3、实现原理
4、小结
1、前言
书接上回《【JUC进阶】13. InheritableThreadLocal》,提到了InheritableThreadLocal虽然能进行父子线程的值传递,但是如果在线程池中,就无法达到预期的效果了。为了更好的解决该问题,TransmittableThreadLocal诞生了。
2、TransmittableThreadLocal
TransmittableThreadLocal 是Alibaba开源的、用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展。既然是扩展,那么自然具备InheritableThreadLocal不同线程间值传递的能力。但是他也是专门为了解决InheritableThreadLocal在线程池中出现的问题的。
官网地址:https://github.com/alibaba/transmittable-thread-local
2.1、使用场景
- 分布式跟踪系统 或 全链路压测(即链路打标)
- 日志收集记录系统上下文
- Session级Cache
- 应用容器或上层框架跨应用代码给下层SDK传递信息
2.2、基本使用
我们拿《【JUC进阶】13. InheritableThreadLocal》文中最后的demo进行改造。这里需要配合TtlExecutors一起使用。这里先讲述使用方法,具体为什么下面细说。
首先,我们需要添加依赖:
<!-- https://mvnrepository.com/artifact/com.alibaba/transmittable-thread-local -->
<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.14.2</version>
</dependency>
其次,ThreadLocal的实现改为TransmittableThreadLocal。
static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
最后创建线程池的时候,使用TTL装饰器:
static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());
完整代码如下:
// threadlocal改为TransmittableThreadLocal
static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();// 线程池添加TtlExecutors
static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());public static void main(String[] args) throws InterruptedException {//threadLocal.set("我是主线程的threadlocal变量,变量值为:000000");// 线程池执行子线程executorService.submit(() -> {System.out.println("-----> 子线程" + Thread.currentThread() + " <----- 获取threadlocal变量:" + threadLocal.get());});// 主线程睡眠3s,模拟运行Thread.sleep(3000);// 将变量修改为11111,在InheritableThreadLocal中修改是无效的threadLocal.set("我是主线程的threadlocal变量,变量值为:11111");// 这里线程池重新执行线程任务executorService.submit(() -> {System.out.println("-----> 子线程" + Thread.currentThread() + " <----- 获取threadlocal变量:" + threadLocal.get());});// 线程池关闭executorService.shutdown();
}
执行看下效果:

已经成功获取到threadlocal变量。
该方式也解决了因为线程被重复利用,而threadlocal重新赋值失效的问题。
3、实现原理
首先可以看到TransmittableThreadLocal继承InheritableThreadLocal,同时实现了TtlCopier接口。TtlCopier接口只提供了一个方法copy()。看到这里,可能有人大概猜出来他的实现原理了,既然实现了copy()方法,那么大概率是将父线程的变量复制一份存起来,接着找个地方存起来,然后找个适当的时机再还回去。没错,其实就是这样。
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
}
知道了TransmittableThreadLocal类的定义之后,我们再来看一个重要的属性holder:
// Note about the holder:
// 1. holder self is a InheritableThreadLocal(a *ThreadLocal*).
// 2. The type of value in the holder is WeakHashMap<TransmittableThreadLocal<Object>, ?>.
// 2.1 but the WeakHashMap is used as a *Set*:
// the value of WeakHashMap is *always* null, and never used.
// 2.2 WeakHashMap support *null* value.
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {@Overrideprotected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {return new WeakHashMap<>();}@Overrideprotected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {return new WeakHashMap<>(parentValue);}};
这里存放的是一个全局的WeakMap(同ThreadLocal一样,weakMap也是为了解决内存泄漏的问题),里面存放了TransmittableThreadLocal对象并且重写了initialValue和childValue方法,尤其是childValue,可以看到在即将异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象。引入holder变量后,也就不必对外暴露Thread中的 inheritableThreadLocals,保持ThreadLocal.ThreadLocalMap的封装性。
而TransmittableThreadLocal中的get()和set()方法,都是从该holder中获取或添加该map。
重点来了,前面不是提到了需要借助于TtlExecutors.getTtlExecutorService()包装线程池才能达到效果吗?我们来看看这里做了什么事。
我们从TtlExecutors.getTtlExecutorService()方法跟进可以发现一个线程池的ttl包装类ExecutorServiceTtlWrapper。其中包含了我们执行线程的方法submit()和execute()。我们进入submit()方法:
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {return executorService.submit(TtlCallable.get(task, false, idempotent));
}
可以发现在线程池进行任务执行时,对我们提交的任务进行了一层预处理,TtlCallable.get()。TtlCallable也是Callable的装饰类,同样还有TtlRunnable,也是同样道理。我们跟进该方法偷瞄一眼:
@Nullable
@Contract(value = "null, _, _ -> null; !null, _, _ -> !null", pure = true)
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {if (callable == null) return null;if (callable instanceof TtlEnhanced) {// avoid redundant decoration, and ensure idempotencyif (idempotent) return (TtlCallable<T>) callable;else throw new IllegalStateException("Already TtlCallable!");}return new TtlCallable<>(callable, releaseTtlValueReferenceAfterCall);
}
上面判断下当前线程的类型是否已经是TtlEnhanced,如果是直接返回,否则创建一个TtlCallable。接着进入new TtlCallable()方法:
private TtlCallable(@NonNull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {this.capturedRef = new AtomicReference<>(capture());this.callable = callable;this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
}
可以看到在初始化线程的时候,调用了一个capture()方法,并将该方法得到的值存放在capturedRef中。没错,这里就是上面我们提到的将父线程的本地变量复制一份快照,存放起来。跟进capture():
@NonNull
public static Object capture() {final HashMap<Transmittee<Object, Object>, Object> transmittee2Value = newHashMap(transmitteeSet.size());for (Transmittee<Object, Object> transmittee : transmitteeSet) {try {transmittee2Value.put(transmittee, transmittee.capture());} catch (Throwable t) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, "exception when Transmitter.capture for transmittee " + transmittee +"(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);}}}return new Snapshot(transmittee2Value);
}
这里的transmitteeSet是一个存放Transmitteedede 集合,在初始化中会将我们 前面提到的holder注册进去:
private static final Set<Transmittee<Object, Object>> transmitteeSet = new CopyOnWriteArraySet<>();static {registerTransmittee(ttlTransmittee);registerTransmittee(threadLocalTransmittee);
}@SuppressWarnings("unchecked")
public static <C, B> boolean registerTransmittee(@NonNull Transmittee<C, B> transmittee) {return transmitteeSet.add((Transmittee<Object, Object>) transmittee);
}
跟进transmittee.capture()方法,该方法由静态内部类Transmitter实现并重写,com.alibaba.ttl.TransmittableThreadLocal.Transmitter.Transmittee#capture
private static final Transmittee<HashMap<TransmittableThreadLocal<Object>, Object>, HashMap<TransmittableThreadLocal<Object>, Object>> ttlTransmittee =new Transmittee<HashMap<TransmittableThreadLocal<Object>, Object>, HashMap<TransmittableThreadLocal<Object>, Object>>() {@NonNull@Overridepublic HashMap<TransmittableThreadLocal<Object>, Object> capture() {final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = newHashMap(holder.get().size());for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {ttl2Value.put(threadLocal, threadLocal.copyValue());}return ttl2Value;}
}
transmittee.capture()扫描holder里目前存放的k-v里的key,就是需要传给子线程的TTL对象,其中调用的threadLocal.copyValue()便是前面看到的TtlCopier接口提供的方法。
看到这里已经大致符合我们前面的猜想,将变量复制一份存起来。那么不出意外接下来应该就是要找个适当的机会还回去。我们接着看。
接下来我们看真正执行线程的时候,也就是call()方法。由于前面线程被TtlCallable包装过,以为这里的call()方法肯定是TtlCallable.call():
@Override
@SuppressFBWarnings("THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION")
public V call() throws Exception {// 获取由之前捕获到的父线程变量集final Object captured = capturedRef.get();if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {throw new IllegalStateException("TTL value reference is released after call!");}// 这里的backup是当前线程原有的变量,这里进行备份,等线程执行完毕后,会将该变量进行恢复final Object backup = replay(captured);try {// 任务执行return callable.call();} finally {// 恢复上述提到的backup原有变量restore(backup);}
}
果然,在执行线程时,先获取之前存放起来的变量。然后调用replay():
@NonNull
public static Object replay(@NonNull Object captured) {final Snapshot capturedSnapshot = (Snapshot) captured;final HashMap<Transmittee<Object, Object>, Object> transmittee2Value = newHashMap(capturedSnapshot.transmittee2Value.size());for (Map.Entry<Transmittee<Object, Object>, Object> entry : capturedSnapshot.transmittee2Value.entrySet()) {Transmittee<Object, Object> transmittee = entry.getKey();try {Object transmitteeCaptured = entry.getValue();transmittee2Value.put(transmittee, transmittee.replay(transmitteeCaptured));} catch (Throwable t) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, "exception when Transmitter.replay for transmittee " + transmittee +"(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);}}}return new Snapshot(transmittee2Value);
}
继续跟进transmittee.replay(transmitteeCaptured):
@NonNull
@Override
public HashMap<TransmittableThreadLocal<Object>, Object> replay(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {final HashMap<TransmittableThreadLocal<Object>, Object> backup = newHashMap(holder.get().size());for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {TransmittableThreadLocal<Object> threadLocal = iterator.next();// 这里便是所有原生的本地变量都暂时存储在backup里,用于之后恢复用backup.put(threadLocal, threadLocal.get());// clear the TTL values that is not in captured// avoid the extra TTL values after replay when run task// 这里检查如果当前变量不存在于捕获到的线程变量,那么就将他清除掉,对应线程的本地变量也清理掉// 为什么要清除?因为从使用这个子线程做异步那里,捕获到的本地变量并不包含原生的变量,当前线程// 在做任务时的首要目标,是将父线程里的变量完全传递给任务,如果不清除这个子线程原生的本地变量,// 意味着很可能会影响到任务里取值的准确性。这也就是为什么上面需要做备份的原因。if (!captured.containsKey(threadLocal)) {iterator.remove();threadLocal.superRemove();}}// set TTL values to capturedsetTtlValuesTo(captured);// call beforeExecute callbackdoExecuteCallback(true);return backup;
}
继续跟进setTtlValuesTo(captured),这里就是把父线程本地变量赋值给当前线程了:
private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {TransmittableThreadLocal<Object> threadLocal = entry.getKey();threadLocal.set(entry.getValue());}
}
到这里基本的实现原理也差不多了,基本和我们前面猜想的一致。但是这里还少了前面提到的backup变量如何恢复的步骤,既然到这里了,一起看一下,跟进restore(backup):
public static void restore(@NonNull Object backup) {for (Map.Entry<Transmittee<Object, Object>, Object> entry : ((Snapshot) backup).transmittee2Value.entrySet()) {Transmittee<Object, Object> transmittee = entry.getKey();try {Object transmitteeBackup = entry.getValue();transmittee.restore(transmitteeBackup);} catch (Throwable t) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, "exception when Transmitter.restore for transmittee " + transmittee +"(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);}}}
}
继续看transmittee.restore(transmitteeBackup):
@Override
public void restore(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {// call afterExecute callbackdoExecuteCallback(false);for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {TransmittableThreadLocal<Object> threadLocal = iterator.next();// clear the TTL values that is not in backup// avoid the extra TTL values after restoreif (!backup.containsKey(threadLocal)) {iterator.remove();threadLocal.superRemove();}}// restore TTL valuessetTtlValuesTo(backup);
}
与replay类似,只是重复进行了将backup赋给当前线程的步骤。到此基本结束。附上官网的时序图帮助理解:

4、小结
所以总结下来,TransmittableThreadLocal的实现原理主要就是依赖于TtlRunnable或TtlCallable装饰类的预处理方法,TtlExecutors是将普通线程转换成Ttl包装的线程,而ttl包装的线程会进行本地变量的预处理,也就是capture()拷贝一份快照到内存中,然后通过replay方法将父线程的变量赋值给当前线程。
相关文章:
【JUC进阶】14. TransmittableThreadLocal
目录 1、前言 2、TransmittableThreadLocal 2.1、使用场景 2.2、基本使用 3、实现原理 4、小结 1、前言 书接上回《【JUC进阶】13. InheritableThreadLocal》,提到了InheritableThreadLocal虽然能进行父子线程的值传递,但是如果在线程池中&#x…...
基于C++的ORM框架sqlpp11入门介绍(附MySQL运行实例)
基本介绍 sqlpp11 是 C 的类型安全的 SQL 模版库。 Sqlpp11的官方下载地址是, GitHub - rbock/sqlpp11: A type safe SQL template library for C 在这里,可以找到官方的详细介绍文档, https://github.com/rbock/sqlpp11/tree/main/docs…...
对写文章的想法
一些思考 思考初心现在错觉想说的话 最后 思考 在CSDN里面写文章已经快半年了啊,虽然更得不多,但每一篇都花费很多时间,写的时候能帮自己查漏补缺,这边找找资料补充一下,都能去拓展自己的知识面,让自己的文…...
Istio安装和基础原理
1、Istio简介 Istio 是一个开源服务网格,它透明地分层到现有的分布式应用程序上。 Istio 强大的特性提供了一种统一和更有效的方式来保护、连接和监视服务。 Istio 是实现负载平衡、服务到服务身份验证和监视的路径——只需要很少或不需要更改服务代码。它强大的控…...
C++核心编程——基于多态的企业职工系统
本专栏记录C学习过程包括C基础以及数据结构和算法,其中第一部分计划时间一个月,主要跟着黑马视频教程,学习路线如下,不定时更新,欢迎关注。 当前章节处于: ---------第1阶段-C基础入门 ---------第2阶段实战…...
Nginx服务安装
Nginx(发音为[engine x])专为性能优化而开发,其最知名的优点是它的稳定性和低系统资源消 耗,以及对HTTP并发连接的高处理能力(单台物理服务器可支持30000~50000个并发请求)。正因 为如此,大量提供社交网络、…...
微信小程序canvas画布实现矩形元素自由缩放、移动功能
一、获取画布信息并绘制背景 .whml <canvas class="canvas" type="2d" id="myCanvas" bindtouchstart="get_rect_touch_position" bindtouchmove="move_or_scale" bind:tap="finish_edit_check"/> 定义c…...
一文搞懂 Python 3 中的数据类型
介绍 在 Python 中,与所有编程语言一样,数据类型用于对一种特定类型的数据进行分类。这很重要,因为您使用的特定数据类型将决定您可以为其分配哪些值以及您可以对其执行哪些操作(包括可以对其执行哪些操作)。 1. 数字…...
学习笔记之——3D Gaussian Splatting源码解读
之前博客对3DGS进行了学习与调研 学习笔记之——3D Gaussian Splatting及其在SLAM与自动驾驶上的应用调研-CSDN博客文章浏览阅读450次。论文主页3D Gaussian Splatting是最近NeRF方面的突破性工作,它的特点在于重建质量高的情况下还能接入传统光栅化,优…...
Flink standalone集群部署配置
文章目录 简介软件依赖部署方案二、安装1.下载并解压2.ssh免密登录3.修改配置文件3.启动集群4.访问 Web UI 简介 Flink独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件, 都只是操作系统上运行…...
Python: + 运算符、append() 方法和 extend()方法的区别和用法
在Python中,有几种常见的方式可以向列表中添加元素,其中包括使用 运算符、append() 方法和 extend() 方法。 使用 运算符: 运算符用于合并两个列表。 通过创建一个新列表,包含两个被合并的列表的元素。不会修改原始列表&…...
【MySQL】mysql集群
文章目录 一、mysql日志错误日志查询日志二进制日志慢查询日志redo log和undo log 二、mysql集群主从复制原理介绍配置命令 读写分离原理介绍配置命令 三、mysql分库分表垂直拆分水平拆分 一、mysql日志 MySQL日志 是记录 MySQL 数据库系统运行过程中不同事件和操作的信息的文件…...
zabbix监控windows主机
下载安装zabbix agent安装包 Zabbix官网下载地址: https://www.zabbix.com/cn/download_agents?version5.0LTS&release5.0.40&osWindows&os_versionAny&hardwareamd64&encryptionOpenSSL&packagingMSI&show_legacy0 这里使用zabbix agent2 安装 …...
单例模式的八种写法、单例和并发的关系
文章目录 1.单例模式的作用2.单例模式的适用场景3.饿汉式静态常量(可用)静态代码块(可用) 4.懒汉式线程不安全(不可用)同步方法(线程安全,但不推荐用)同步代码块…...
基于实时Linux+FPGA实现NI CompactRIO系统详解
利用集成的软件工具链,结合信号调理I/O模块,轻松构建和部署实时应用程序。 什么是CompactRIO? CompactRIO系统提供了高处理性能、传感器专用I/O和紧密集成的软件工具,使其成为工业物联网、监测和控制应用的理想之选。实时处理器提…...
Webhook端口中的自定义签名身份认证
概述 如果需要通过 Webhook 端口从交易伙伴处接收数据,但该交易伙伴可能对于安全性有着较高的要求,而不仅仅是用于验证入站 Webhook 要求的基本身份验证用户名/密码,或者用户可能只想在入站 Webhook 消息上增加额外的安全层。 使用 Webhook…...
用Linux的视角来理解缓冲区概念
缓冲区的认识 缓冲区(buffer)是存储数据的临时存储区域。当我们用C语言向文件中写入数据时,数据并不会直接的写到文件中,中途还经过了缓冲区,而我们需要对缓冲区的数据进行刷新,那么数据才算写到文件当中。…...
C#中Enumerable.Range(Int32, Int32) 方法用于计算
目录 一、关于Enumerable.Range(Int32, Int32) 方法 1.定义 2.Enumerable.Range()用于数学计算的操作方法 3.实例1:显示整型数1~9的平方 4.实例2:显示整型数0~9 5.实例3:Enumerable.Range()vs for循环 &#x…...
Linux和windows进程同步与线程同步那些事儿(四):windows 下进程同步
Linux和windows进程同步与线程同步那些事儿(一) Linux和windows进程同步与线程同步那些事儿(二): windows线程同步详解示例 Linux和windows进程同步与线程同步那些事儿(三): Linux线…...
1. Logback介绍
Logback介绍 Logback旨在成为流行的log4j项目的继任者。它由Ceki Glc设计,他是log4j的创始人。它基于十年在设计工业级日志系统方 面的经验。结果产品,即logback,比所有现有的日志系统更快,具有更小的占用空间,有时差距…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
Elastic 获得 AWS 教育 ISV 合作伙伴资质,进一步增强教育解决方案产品组合
作者:来自 Elastic Udayasimha Theepireddy (Uday), Brian Bergholm, Marianna Jonsdottir 通过搜索 AI 和云创新推动教育领域的数字化转型。 我们非常高兴地宣布,Elastic 已获得 AWS 教育 ISV 合作伙伴资质。这一重要认证表明,Elastic 作为 …...
在Spring Boot中集成RabbitMQ的完整指南
前言 在现代微服务架构中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个流行的消息中间件,支持多种消息协议,具有高可靠性和可扩展性。 本博客将详细介绍如何在 Spring Boot 项目…...
java 局域网 rtsp 取流 WebSocket 推送到前端显示 低延迟
众所周知 摄像头取流推流显示前端延迟大 传统方法是服务器取摄像头的rtsp流 然后客户端连服务器 中转多了,延迟一定不小。 假设相机没有专网 公网 1相机自带推流 直接推送到云服务器 然后客户端拉去 2相机只有rtsp ,边缘服务器拉流推送到云服务器 …...
前端打包工具简单介绍
前端打包工具简单介绍 一、Webpack 架构与插件机制 1. Webpack 架构核心组成 Entry(入口) 指定应用的起点文件,比如 src/index.js。 Module(模块) Webpack 把项目当作模块图,模块可以是 JS、CSS、图片等…...
JVM——对象模型:JVM对象的内部机制和存在方式是怎样的?
引入 在Java的编程宇宙中,“Everything is object”是最核心的哲学纲领。当我们写下new Book()这样简单的代码时,JVM正在幕后构建一个复杂而精妙的“数据实体”——对象。这个看似普通的对象,实则是JVM内存管理、类型系统和多态机制的基石。…...
