7. 便捷的使用封装

一灰灰blog技术组件trace-watch-dog约 2836 字大约 9 分钟

接上文,我们现在实现的耗时分布记录工具路TraceRecoder已基本成型,接下来我们将重点放在使用层面的优化上,看一下如何给调用者提供简洁、舒适的使用体验

1. 使用侧的问题分析

现来看一下前面测试用例中的使用case

@Test
public void testCost2() {
    try (TraceRecorder recorder = new TraceRecorder()) {
        randSleep("前置", 20);
        recorder.sync(() -> randSleep("task1", 200), "task1");
        recorder.async(() -> randSleep("task2", 100), "task2");
    }
}

那么上面这种使用姿势,有什么问题呢?

对于单一方法的代码块而言,并没有什么问题,假设再randSleep方法中,我也希望统计某些代码块的执行耗时,那应该怎么做呢?

1.1 嵌套的耗时记录方式

再需要的地方再创建一个 TraceRecoder,实现链路内的代码块耗时记录

public void subRun(String task, int max) {
    try (TraceRecoder recorder = new TraceRecoder(AsyncUtil.executorService, task)) {
        recorder.async(() -> randSleep(task, max), task + "-1");
        recorder.async(() -> randSleep(task, max), task + "-2");
    }
}

@Test
public void testCost3() {
    try (TraceRecoder recorder = new TraceRecoder()) {
        randSleep("前置", 20);
        recorder.sync(() -> subRun("task1", 200), "task1");
        recorder.async(() -> randSleep("task2", 100), "task2");
    }
}

从上面的输出也可以看出,外层testCost3TraceRecoder 与内部方法subRunTraceRecoder 各打印了自己的输出

而我更希望的是将所有的耗时都集成在一个输出中

1.2 传参方式

既然我们希望所有的耗时都放在一个TraceRecoder中,那么可以考虑将这个对象透传给需要的地方,如

public void subRun(String task, int max, TraceRecoder traceRecoder) {
    traceRecoder.async(() -> randSleep(task, max), task + "-1");
    traceRecoder.async(() -> randSleep(task, max), task + "-2");
}

@Test
public void testCost3() {
    try (TraceRecoder recorder = new TraceRecoder()) {
        randSleep("前置", 20);
        recorder.sync(() -> subRun("task1", 200, recorder), "task1");
        recorder.async(() -> randSleep("task2", 100), "task2");
    }
}

使用同一个TraceRecoder作为参数传递给需要记录耗时的方法之后,我们可以看到所有的耗时都集成再一起了

虽然这种方式可以满足诉求,但是对业务代码的改动就有点大了,需要修改调用方法的传参,那么有其他的解决方法么?

1.3 上下文维护TraceRecoder

借助上下文来保存TraceRecoder,然后再需要的地方直接从上下文中获取

ThreadLocal<TraceRecoder> threadLocal = new TransmittableThreadLocal<>();


public void subRun(String task, int max) {
    TraceRecoder recorder = threadLocal.get();
    recorder.async(() -> randSleep(task, max), task + "-1");
    recorder.async(() -> randSleep(task, max), task + "-2");
}


@Test
public void testCost3() {
    try (TraceRecoder recorder = new TraceRecoder()) {
        threadLocal.set(recorder);
        randSleep("前置", 20);
        recorder.sync(() -> subRun("task1", 200), "task1");
        recorder.async(() -> randSleep("task2", 100), "task2");
    } finally {
        threadLocal.remove();
    }
}

上面是一个基于上下文的简单使用示例,再整个执行链路中,需要记录耗时的地方,直接从上下文中获取TraceRecoder即可

但是这里依然存在一个问题,如果上面的这个 subRun 方法,被另外一个入口调用,但是这个入口的调用链路中,没有开启耗时记录,即上下文中没有TraceRecoder,那岂不是就会导致NPE?

因此我们需要再上下文获取TraceRecoder时,做一个保护

public void subRun(String task, int max) {
    TraceRecoder recorder = threadLocal.get();
    if (recoder == null) {
        randSleep(task, max);
        randSleep(task, max);
    } else {
        recorder.async(() -> randSleep(task, max), task + "-1");
        recorder.async(() -> randSleep(task, max), task + "-2");
    }
}

2. 使用优化

基于上面的几个分析步骤,我们对使用侧的优化就有了一个基本的方向:

  1. 借助ThreadLocal来持有TraceRecoder
  2. 再开始记录的入口,初始化TraceRecoder,并保存到上下文
  3. 再链路的过程中,需要记录耗时的,可以直接从上下文中获取

2.1 抽象TraceRecoder

基于上面提到的几点实现思路,面临一个现实的问题就是从上下文获取TraceRecoder记录耗时时,若返回null,需要再使用侧做一个兼容,为了不让业务代码变得恶心,我们就需要考虑再使用侧做一个保护

我们抽象一个ITraceRecoder的接口类,将前面的工具类作为一个具体实现DefaultTraceRecoder,然后再提供一个同步的实现类SyncTraceRecoder,用于从上下文中获取不到DefaultTraceRecoder时,就返回SyncTraceRecoder,这样对于使用侧而言就不需要做if/elsenull保护了

接口定义:

public interface ITraceRecoder extends Closeable {
    /**
     * 待返回结果的同步执行
     *
     * @param supplier 执行内容
     * @param name     耗时标记
     * @param <T>      返回类型
     * @return 返回结果
     */
    <T> T sync(Supplier<T> supplier, String name);

    /**
     * 无返回结果的同步执行
     *
     * @param run  执行内容
     * @param name 耗时标记
     */
    void sync(Runnable run, String name);

    /**
     * 异步执行
     *
     * @param supplier 异步任务
     * @param name     耗时标记
     * @param <T>      返回类型
     * @return 返回结果
     */
    <T> CompletableFuture<T> async(Supplier<T> supplier, String name);


    /**
     * 异步执行
     *
     * @param run  异步任务
     * @param name 耗时标记
     * @return 返回结果
     */
    CompletableFuture<Void> async(Runnable run, String name);

    /**
     * 等待全部任务执行完毕
     *
     * @return 返回结果
     */
    default ITraceRecoder allExecuted() {
        return this;
    }

    /**
     * 日志打印
     *
     * @return 各任务耗时情况
     */
    default Map<String, Long> prettyPrint() {
        return Collections.emptyMap();
    }

    @Override
    default void close() {
    }
}

兜底的同步实现SyncTraceRecoder

public class SyncTraceRecoder implements ITraceRecoder {
    public static SyncTraceRecoder SYNC_RECODER = new SyncTraceRecoder();

    /**
     * 待返回结果的同步执行
     *
     * @param supplier 执行内容
     * @param name     耗时标记
     * @param <T>      返回类型
     * @return 返回结果
     */
    @Override
    public <T> T sync(Supplier<T> supplier, String name) {
        return supplier.get();
    }

    /**
     * 无返回结果的同步执行
     *
     * @param run  执行内容
     * @param name 耗时标记
     */
    @Override
    public void sync(Runnable run, String name) {
        run.run();
    }

    /**
     * 依然是同步执行,会直接返回结果
     *
     * @param supplier 异步任务
     * @param name     耗时标记
     * @param <T>      返回类型
     * @return 返回结果
     */
    @Override
    public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
        return CompletableFuture.completedFuture(supplier.get());
    }


    /**
     * 依然是同步执行,会直接返回结果
     *
     * @param run  异步任务
     * @param name 耗时标记
     * @return 返回结果
     */
    @Override
    public CompletableFuture<Void> async(Runnable run, String name) {
        run.run();
        return CompletableFuture.completedFuture(null);
    }
}

前面封装的耗时统计实现DefaultTraceRecoder

public class DefaultTraceRecoder implements ITraceRecoder {
    private static final Logger log = LoggerFactory.getLogger(DefaultTraceRecoder.class);

    /**
     * trace记录名
     */
    private final String traceName;

    /**
     * 异步任务执行的结果
     */
    private final List<CompletableFuture<?>> list;
    /**
     * 一个子任务的执行耗时
     */
    private final Map<String, Long> cost;

    /**
     * 异步调度的线程池
     */
    private final ExecutorService executorService;

    /**
     * 用于标记是否所有的任务执行完毕
     * 执行完毕之后,不在支持继续添加记录
     */
    private volatile boolean markExecuteOver;

    /**
     * 控制是否打印日志的条件
     */
    private Supplier<Boolean> logCondition;

    /**
     * 结束的回调钩子
     */
    private Runnable endHook;

    public DefaultTraceRecoder() {
        this(AsyncUtil.executorService, "TraceDog", () -> true);
    }

    public DefaultTraceRecoder(ExecutorService executorService, String task, Supplier<Boolean> condition) {
        this.traceName = task;
        list = new CopyOnWriteArrayList<>();
        // 支持排序的耗时记录
        cost = new ConcurrentSkipListMap<>();
        this.executorService = TtlExecutors.getTtlExecutorService(executorService);
        this.markExecuteOver = false;
        this.logCondition = condition;
        start(task);
        MdcUtil.setGlobalTraceId(MdcUtil.fetchGlobalMsgIdForTraceRecoder());
    }

    /**
     * 异步执行,带返回结果
     *
     * @param supplier 执行任务
     * @param name     耗时标识
     * @return 异步执行返回结果
     */
    @Override
    public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
        CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
        list.add(ans);
        return ans;
    }

    /**
     * 同步执行,待返回结果
     *
     * @param supplier 执行任务
     * @param name     耗时标识
     * @param <T>      返回类型
     * @return 任务的执行返回结果
     */
    @Override
    public <T> T sync(Supplier<T> supplier, String name) {
        return supplyWithTime(supplier, name).get();
    }

    /**
     * 异步执行,无返回结果
     *
     * @param run  执行任务
     * @param name 耗时标识
     * @return
     */
    @Override
    public CompletableFuture<Void> async(Runnable run, String name) {
        // 添加一个标识,区分同步执行与异步执行
        // 异步任务的执行,在整体的耗时占比只能作为参考
        CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
        list.add(future);
        return future;
    }

    /**
     * 同步执行,无返回结果
     *
     * @param run  执行任务
     * @param name 耗时标识
     * @return
     */
    @Override
    public void sync(Runnable run, String name) {
        runWithTime(run, name).run();
    }

    /**
     * 封装一下执行业务逻辑,记录耗时时间
     *
     * @param run  执行的具体业务逻辑
     * @param name 任务名
     * @return
     */
    private Runnable runWithTime(Runnable run, String name) {
        String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
        return () -> {
            // 将父线程的msgId设置到当前这个执行线程
            MdcUtil.setGlobalTraceId(traceId);
            start(name);
            try {
                run.run();
            } finally {
                end(name);
            }
        };
    }

    /**
     * 封装一下执行业务逻辑,记录耗时时间
     *
     * @param call 执行的具体业务逻辑
     * @param name 任务名
     * @return 返回结果
     */
    private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
        String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
        return () -> {
            // 将父线程的msgId设置到当前这个执行线程
            MdcUtil.setGlobalTraceId(traceId);
            start(name);
            try {
                return call.get();
            } finally {
                end(name);
            }
        };
    }


    private void start(String name) {
        if (markExecuteOver) {
            // 所有任务执行完毕,不再新增
            if (log.isDebugEnabled()) {
                log.debug("allTask ExecuteOver ignore: {}", name);
            }
            return;
        }
        cost.put(name, System.currentTimeMillis());
    }

    private void end(String name) {
        long now = System.currentTimeMillis();
        long last = cost.getOrDefault(name, now);
        if (last >= now / 1000) {
            // 之前存储的是时间戳,因此我们需要更新成执行耗时 ms单位
            cost.put(name, now - last);
        }
    }


    /**
     * 等待所有的任务执行完毕
     *
     * @return
     */
    @Override
    public DefaultTraceRecoder allExecuted() {
        if (!list.isEmpty()) {
            CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})).join();
        }
        // 记录整体结束
        end(this.traceName);
        this.markExecuteOver = true;
        return this;
    }

    @Override
    public Map<String, Long> prettyPrint() {
        // 在格式化输出时,要求所有任务执行完毕
        if (!this.markExecuteOver) {
            this.allExecuted();
        }

        if (!logCondition.get()) {
            return cost;
        }

        StringBuilder sb = new StringBuilder();
        sb.append('\n');
        long totalCost = cost.remove(traceName);
        sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
        sb.append('\n');
        if (cost.isEmpty()) {
            sb.append("No task info kept");
        } else {
            sb.append("---------------------------------------------\n");
            sb.append("ms         %     Task name\n");
            sb.append("---------------------------------------------\n");
            NumberFormat pf = NumberFormat.getPercentInstance();
            pf.setMinimumIntegerDigits(2);
            pf.setMinimumFractionDigits(2);
            pf.setGroupingUsed(false);
            for (Map.Entry<String, Long> entry : cost.entrySet()) {
                sb.append(entry.getValue()).append("\t\t");
                sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
                sb.append(entry.getKey()).append("\n");
            }
        }

        if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
            // 若项目中没有Slfj4的实现,则直接使用标准输出
            System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
        } else if (log.isInfoEnabled()) {
            log.info("\n---------------------\n{}\n--------------------\n", sb);
        }
        return cost;
    }

    @Override
    public void close() {
        try {
            // 做一个兜底,避免业务侧没有手动结束,导致异步任务没有执行完就提前返回结果
            this.allExecuted().prettyPrint();
        } catch (Exception e) {
            log.error("释放耗时上下文异常! {}", traceName, e);
        } finally {
            Optional.ofNullable(endHook).ifPresent(Runnable::run);
        }
    }

    /**
     * 设置结束的回调钩子
     *
     * @param endHook
     * @return
     */
    public DefaultTraceRecoder setEndHook(Runnable endHook) {
        this.endHook = endHook;
        return this;
    }
}

2.2 使用门面

接下来我们就需要对上面的实现做一个使用的门面封装TraceWatch,对外提供统一的访问姿势

我们可以考虑在TraceWatch中,持有上下文信息

public class TraceWatch {

    private static final TransmittableThreadLocal<ITraceRecoder> THREAD_LOCAL = new TransmittableThreadLocal<>();


    public static ITraceRecoder startTrace(String name) {
        return startTrace(name, () -> true);
    }

    public static ITraceRecoder startTrace(String name, Supplier<Boolean> condition) {
        return startTrace(AsyncUtil.executorService, name, condition);
    }

    /**
     * 开始trace记录
     *
     * @param executorService 线程池
     * @param name            任务名
     * @return
     */
    public static ITraceRecoder startTrace(ExecutorService executorService, String name, Supplier<Boolean> condition) {
        DefaultTraceRecoder bridge = new DefaultTraceRecoder(executorService, name, condition).setEndHook(TraceWatch::endTrace);
        THREAD_LOCAL.set(bridge);
        return bridge;
    }

    /**
     * 在使用时,请确保先调用了 startTrace, 一定可以拿到 TraceRecoder,否则请使用 getRecoderOrElseSync() 方法
     *
     * @return
     */
    public static ITraceRecoder getRecoder() {
        return THREAD_LOCAL.get();
    }

    /**
     * 获取记录器
     * - 如果在请求链路中,有调用过 startTrace,则返回 DefaultTraceRecoder,可以实现链路的耗时统计;
     * - 若之前没有调用过 startTrace, 则返回 SyncTraceRecoder, 被记录的函数代码块和直接调用没有区别,不会执行异步、也不会记录耗时
     * <p>
     * 主要的应用场景就是,同一个方法,会被多个入口调用,当只想记录某几个入口的耗时情况时,使用下面这个方法,就可以保证不会影响其他的调用方
     *
     * @return
     */
    public static ITraceRecoder getRecoderOrElseSync() {
        ITraceRecoder recoder = getRecoder();
        if (recoder != null) {
            return recoder;
        }
        return SyncTraceRecoder.SYNC_RECODER;
    }

    public static void endTrace() {
        THREAD_LOCAL.remove();
    }
}

2.3 使用示例

public void subExecute(String task, int max) {
    ITraceRecoder recoder = TraceWatch.getRecoderOrElseSync();
    recoder.async(() -> randSleep(task, max), task + "-1");
    recoder.async(() -> randSleep(task, max), task + "-2");
}

@Test
public void basicUse() {
    try (ITraceRecoder recoder = TraceWatch.startTrace("basicUse")) {
        recoder.sync(() -> subExecute("task1", 200), "task1");
        recoder.async(() -> {
            log.info("task2 内部执行---");
            try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); }
        }, "task2");
    }

    System.out.println("\n======== 上下文中没有Recoder | 开始 ===========\n");
    subExecute("tt", 10);
    System.out.println("\n======== 上下文中没有Recoder | 结束 ===========\n");
}

本文中的过程代码,可以到这里查看 trace-watch-dogopen in new window

本文中的实现对应的是 trace-watch-dogopen in new window 核心实现

Loading...