5. 上下文信息传递
前面完成的TraceRecoder
支持了异步代码块的调度,接下来我们就需要重点解决一下多线程下的数据传递问题,确保异步代码块的执行过程中,不会出现各种难以理解的并发问题
1. 并发问题复现
首先我们先来看一下,TraceRecoder
会在什么场景出现问题
1.1 上下文再线程池场景下的共享异常
既然我们的工具类是支持异步代码块封装,考虑到上下文的共享,我们第一想到就是使用InheritableThreadLocal
来替代 ThreadLocal
来存储上下文信息
但是有过了解的小伙伴会知道这个东西,在线程池的场景是可能出现共享异常的
我们可以构造一个简单的demo来验证一下
// 一个公共的随机休眠的方法
private Random random = new Random();
private void sleep(int max) {
try {
Thread.sleep(random.nextInt(max));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private ThreadLocal<String> local = new InheritableThreadLocal<>();
@Test
public void testConcurrent() throws InterruptedException {
// 外部链路执行线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// TraceRecoder的异步线程池
ExecutorService traceExecutors = Executors.newFixedThreadPool(2);
executorService.submit(() -> {
local.set("t1");
try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t1")) {
recorder.async(() -> {
sleep(10);
System.out.println("1-1 上下文:" + Thread.currentThread().getName() + "_" + local.get());
}, "1-1");
recorder.async(() -> {
sleep(10);
System.out.println("1-2 上下文:" + Thread.currentThread().getName() + "_" + local.get());
}, "1-2");
recorder.async(() -> {
sleep(100);
System.out.println("1-3 上下文:" + Thread.currentThread().getName() + "_" + local.get());
}, "1-3");
recorder.sync(() -> sleep(100), "1-4同步");
} finally {
local.remove();
}
});
executorService.submit(() -> {
local.set("t2");
try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t2")) {
recorder.async(() -> {
sleep(100);
System.out.println("2-1 上下文:" + Thread.currentThread().getName() + "_" + local.get());
}, "2-1");
recorder.async(() -> {
sleep(100);
System.out.println("2-2 上下文:" + Thread.currentThread().getName() + "_" + local.get());
}, "2-2");
recorder.async(() -> {
sleep(100);
System.out.println("2-3 上下文:" + Thread.currentThread().getName() + "_" + local.get());
}, "2-3");
recorder.sync(() -> sleep(100), "2-4同步");
} finally {
local.remove();
}
});
Thread.sleep(3000);
}
上面的实现,主要是模拟TraceRecoder
的线程池中的线程复用场景,从而诱导线程池复用、导致引用其他任务的上下文出现的概率
当我们执行上面的测试用例,将会可能得到下面的输出
从上面的输出,也可以直观的看到 t2
执行链路中 获取到了t1
执行链路的上下文;
那么怎么解决这种问题呢?
1.2 解决方案
借助阿里开源的transmittable-thread-local
来替换默认的上下文,从而解决并发的上下文共享问题
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.5</version>
</dependency>
2. 并发问题改造
2.1 异步工具类适配
首先我们对线程池进行改造,使用TtlExecutors
进行包裹,先调整默认的线程池
public class AsyncUtil {
public static ExecutorService executorService;
static {
executorService = initExecutorService(Runtime.getRuntime().availableProcessors() * 2, 50);
}
public static ExecutorService initExecutorService(int core, int max) {
// 异步工具类的默认线程池构建
max = Math.max(core, max);
ThreadFactory THREAD_FACTORY = new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread thread = this.defaultFactory.newThread(r);
if (!thread.isDaemon()) {
thread.setDaemon(true);
}
thread.setName("trace-watch-dog-" + this.threadNumber.getAndIncrement());
return thread;
}
};
ExecutorService executorService = new ThreadPoolExecutor(core, max, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
// 包装一下线程池,避免出现上下文复用场景
return TtlExecutors.getTtlExecutorService(executorService);
}
}
接着对用户传递线程池进行保护
public TraceRecoder(ExecutorService executorService, String task) {
this.traceName = task;
list = new CopyOnWriteArrayList<>();
// 支持排序的耗时记录
cost = new ConcurrentSkipListMap<>();
start(task);
this.executorService = TtlExecutors.getTtlExecutorService(executorService);
this.markExecuteOver = false;
}
2.2 要求使用TransmittableThreadLocal上下文
接下来就是对使用侧进行约束,对于有异步使用的场景,请使用TransmittableThreadLocal
替换jdk的ThreadLocal
// 推荐
private ThreadLocal<String> local = new TransmittableThreadLocal<>();
// 不推荐
private ThreadLocal<String> local = new InheritableThreadLocal<>();
再调整之后,可以继续执行前面的测试用例,我们稍微调整一下,用于多次执行判断是否有异常情况
private ThreadLocal<String> transLocal = new TransmittableThreadLocal<>();
public void testConcurrentV2() throws InterruptedException {
// 外部链路执行线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// TraceRecoder的异步线程池
ExecutorService traceExecutors = AsyncUtil.initExecutorService(2, 2);
executorService.submit(() -> {
local.set("t1");
try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t1")) {
recorder.async(() -> {
sleep(10);
Assert.assertTrue("t1上下文获取异常", "t1".equals(local.get()));
}, "1-1");
recorder.async(() -> {
sleep(10);
Assert.assertTrue("t1上下文获取异常", "t1".equals(local.get()));
}, "1-2");
recorder.async(() -> {
sleep(100);
Assert.assertTrue("t1上下文获取异常", "t1".equals(local.get()));
}, "1-3");
recorder.sync(() -> sleep(200), "1-4同步");
} finally {
local.remove();
}
});
executorService.submit(() -> {
local.set("t2");
try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t2")) {
recorder.async(() -> {
sleep(100);
Assert.assertTrue("t2上下文获取异常", "t2".equals(local.get()));
}, "2-1");
recorder.async(() -> {
sleep(100);
Assert.assertTrue("t2上下文获取异常", "t2".equals(local.get()));
}, "2-2");
recorder.async(() -> {
sleep(100);
Assert.assertTrue("t2上下文获取异常", "t2".equals(local.get()));
}, "2-3");
recorder.sync(() -> sleep(200), "2-4同步");
} finally {
local.remove();
}
});
// 确保任务都执行完毕
Thread.sleep(1000);
}
@Test
public void testCon() throws InterruptedException {
for (int i = 0; i < 100; i++) {
testConcurrentV2();
}
System.out.println("全部通过!");
}
2.3 小结
本文主要介绍了 TraceRecoder
在异步执行场景执行下可能出现的上下文共享问题,为了解决并发问题,在使用层,我们需要注意使用TransmittableThreadLocal
来存储上下文信息
本文中的相关代码,可以到这里查看 trace-watch-dog