5. 上下文信息传递

一灰灰blog技术组件trace-watch-dog约 1251 字大约 4 分钟

前面完成的TraceRecoder支持了异步代码块的调度,接下来我们就需要重点解决一下多线程下的数据传递问题,确保异步代码块的执行过程中,不会出现各种难以理解的并发问题

1. 并发问题复现

首先我们先来看一下,TraceRecoder 会在什么场景出现问题

1.1 上下文再线程池场景下的共享异常

既然我们的工具类是支持异步代码块封装,考虑到上下文的共享,我们第一想到就是使用InheritableThreadLocal 来替代 ThreadLocal 来存储上下文信息

但是有过了解的小伙伴会知道这个东西,在线程池的场景是可能出现共享异常的

我们可以构造一个简单的demo来验证一下

// 一个公共的随机休眠的方法
private Random random = new Random();
private void sleep(int max) {
    try {
        Thread.sleep(random.nextInt(max));
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

private ThreadLocal<String> local = new InheritableThreadLocal<>();
@Test
public void testConcurrent() throws InterruptedException {
    // 外部链路执行线程池
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    // TraceRecoder的异步线程池
    ExecutorService traceExecutors = Executors.newFixedThreadPool(2);

    executorService.submit(() -> {
        local.set("t1");
        try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t1")) {
            recorder.async(() -> {
                sleep(10);
                System.out.println("1-1 上下文:" + Thread.currentThread().getName() + "_" + local.get());
            }, "1-1");
            recorder.async(() -> {
                sleep(10);
                System.out.println("1-2 上下文:" + Thread.currentThread().getName() + "_" + local.get());
            }, "1-2");
            recorder.async(() -> {
                sleep(100);
                System.out.println("1-3 上下文:" + Thread.currentThread().getName() + "_" + local.get());
            }, "1-3");
            recorder.sync(() -> sleep(100), "1-4同步");
        } finally {
            local.remove();
        }
    });
    executorService.submit(() -> {
        local.set("t2");
        try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t2")) {
            recorder.async(() -> {
                sleep(100);
                System.out.println("2-1 上下文:" + Thread.currentThread().getName() + "_" + local.get());
            }, "2-1");
            recorder.async(() -> {
                sleep(100);
                System.out.println("2-2 上下文:" + Thread.currentThread().getName() + "_" + local.get());
            }, "2-2");
            recorder.async(() -> {
                sleep(100);
                System.out.println("2-3 上下文:" + Thread.currentThread().getName() + "_" + local.get());
            }, "2-3");

            recorder.sync(() -> sleep(100), "2-4同步");
        } finally {
            local.remove();
        }
    });

    Thread.sleep(3000);
}

上面的实现,主要是模拟TraceRecoder的线程池中的线程复用场景,从而诱导线程池复用、导致引用其他任务的上下文出现的概率

当我们执行上面的测试用例,将会可能得到下面的输出

从上面的输出,也可以直观的看到 t2 执行链路中 获取到了t1执行链路的上下文;

那么怎么解决这种问题呢?

1.2 解决方案

借助阿里开源的transmittable-thread-local来替换默认的上下文,从而解决并发的上下文共享问题

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.5</version>
</dependency>

2. 并发问题改造

2.1 异步工具类适配

首先我们对线程池进行改造,使用TtlExecutors进行包裹,先调整默认的线程池

public class AsyncUtil {
    public static ExecutorService executorService;

    static {
        executorService = initExecutorService(Runtime.getRuntime().availableProcessors() * 2, 50);
    }


    public static ExecutorService initExecutorService(int core, int max) {
        // 异步工具类的默认线程池构建
        max = Math.max(core, max);
        ThreadFactory THREAD_FACTORY = new ThreadFactory() {
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
            private final AtomicInteger threadNumber = new AtomicInteger(1);

            public Thread newThread(Runnable r) {
                Thread thread = this.defaultFactory.newThread(r);
                if (!thread.isDaemon()) {
                    thread.setDaemon(true);
                }

                thread.setName("trace-watch-dog-" + this.threadNumber.getAndIncrement());
                return thread;
            }
        };
        ExecutorService executorService = new ThreadPoolExecutor(core, max, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
        // 包装一下线程池,避免出现上下文复用场景
        return TtlExecutors.getTtlExecutorService(executorService);
    }
}

接着对用户传递线程池进行保护

public TraceRecoder(ExecutorService executorService, String task) {
    this.traceName = task;
    list = new CopyOnWriteArrayList<>();
    // 支持排序的耗时记录
    cost = new ConcurrentSkipListMap<>();
    start(task);
    this.executorService = TtlExecutors.getTtlExecutorService(executorService);
    this.markExecuteOver = false;
}

2.2 要求使用TransmittableThreadLocal上下文

接下来就是对使用侧进行约束,对于有异步使用的场景,请使用TransmittableThreadLocal替换jdk的ThreadLocal

// 推荐
private ThreadLocal<String> local = new TransmittableThreadLocal<>();

// 不推荐
private ThreadLocal<String> local = new InheritableThreadLocal<>();

再调整之后,可以继续执行前面的测试用例,我们稍微调整一下,用于多次执行判断是否有异常情况

private ThreadLocal<String> transLocal = new TransmittableThreadLocal<>();
public void testConcurrentV2() throws InterruptedException {
    // 外部链路执行线程池
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    // TraceRecoder的异步线程池
    ExecutorService traceExecutors = AsyncUtil.initExecutorService(2, 2);

    executorService.submit(() -> {
        local.set("t1");
        try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t1")) {
            recorder.async(() -> {
                sleep(10);
                Assert.assertTrue("t1上下文获取异常", "t1".equals(local.get()));
            }, "1-1");
            recorder.async(() -> {
                sleep(10);
                Assert.assertTrue("t1上下文获取异常", "t1".equals(local.get()));
            }, "1-2");
            recorder.async(() -> {
                sleep(100);
                Assert.assertTrue("t1上下文获取异常", "t1".equals(local.get()));
            }, "1-3");
            recorder.sync(() -> sleep(200), "1-4同步");
        } finally {
            local.remove();
        }
    });
    executorService.submit(() -> {
        local.set("t2");
        try (TraceRecorder recorder = new TraceRecorder(traceExecutors, "t2")) {
            recorder.async(() -> {
                sleep(100);
                Assert.assertTrue("t2上下文获取异常", "t2".equals(local.get()));
            }, "2-1");
            recorder.async(() -> {
                sleep(100);
                Assert.assertTrue("t2上下文获取异常", "t2".equals(local.get()));
            }, "2-2");
            recorder.async(() -> {
                sleep(100);
                Assert.assertTrue("t2上下文获取异常", "t2".equals(local.get()));
            }, "2-3");

            recorder.sync(() -> sleep(200), "2-4同步");
        } finally {
            local.remove();
        }
    });

    // 确保任务都执行完毕
    Thread.sleep(1000);
}

@Test
public void testCon() throws InterruptedException {
    for (int i = 0; i < 100; i++) {
        testConcurrentV2();
    }
    System.out.println("全部通过!");
}

2.3 小结

本文主要介绍了 TraceRecoder 在异步执行场景执行下可能出现的上下文共享问题,为了解决并发问题,在使用层,我们需要注意使用TransmittableThreadLocal 来存储上下文信息

本文中的相关代码,可以到这里查看 trace-watch-dogopen in new window

Loading...