7. 便捷的使用封装
接上文,我们现在实现的耗时分布记录工具路TraceRecoder
已基本成型,接下来我们将重点放在使用层面的优化上,看一下如何给调用者提供简洁、舒适的使用体验
1. 使用侧的问题分析
现来看一下前面测试用例中的使用case
@Test
public void testCost2() {
try (TraceRecorder recorder = new TraceRecorder()) {
randSleep("前置", 20);
recorder.sync(() -> randSleep("task1", 200), "task1");
recorder.async(() -> randSleep("task2", 100), "task2");
}
}
那么上面这种使用姿势,有什么问题呢?
对于单一方法的代码块而言,并没有什么问题,假设再randSleep
方法中,我也希望统计某些代码块的执行耗时,那应该怎么做呢?
1.1 嵌套的耗时记录方式
再需要的地方再创建一个 TraceRecoder
,实现链路内的代码块耗时记录
public void subRun(String task, int max) {
try (TraceRecoder recorder = new TraceRecoder(AsyncUtil.executorService, task)) {
recorder.async(() -> randSleep(task, max), task + "-1");
recorder.async(() -> randSleep(task, max), task + "-2");
}
}
@Test
public void testCost3() {
try (TraceRecoder recorder = new TraceRecoder()) {
randSleep("前置", 20);
recorder.sync(() -> subRun("task1", 200), "task1");
recorder.async(() -> randSleep("task2", 100), "task2");
}
}
从上面的输出也可以看出,外层testCost3
的 TraceRecoder
与内部方法subRun
的 TraceRecoder
各打印了自己的输出
而我更希望的是将所有的耗时都集成在一个输出中
1.2 传参方式
既然我们希望所有的耗时都放在一个TraceRecoder
中,那么可以考虑将这个对象透传给需要的地方,如
public void subRun(String task, int max, TraceRecoder traceRecoder) {
traceRecoder.async(() -> randSleep(task, max), task + "-1");
traceRecoder.async(() -> randSleep(task, max), task + "-2");
}
@Test
public void testCost3() {
try (TraceRecoder recorder = new TraceRecoder()) {
randSleep("前置", 20);
recorder.sync(() -> subRun("task1", 200, recorder), "task1");
recorder.async(() -> randSleep("task2", 100), "task2");
}
}
使用同一个TraceRecoder
作为参数传递给需要记录耗时的方法之后,我们可以看到所有的耗时都集成再一起了
虽然这种方式可以满足诉求,但是对业务代码的改动就有点大了,需要修改调用方法的传参,那么有其他的解决方法么?
1.3 上下文维护TraceRecoder
借助上下文来保存TraceRecoder
,然后再需要的地方直接从上下文中获取
ThreadLocal<TraceRecoder> threadLocal = new TransmittableThreadLocal<>();
public void subRun(String task, int max) {
TraceRecoder recorder = threadLocal.get();
recorder.async(() -> randSleep(task, max), task + "-1");
recorder.async(() -> randSleep(task, max), task + "-2");
}
@Test
public void testCost3() {
try (TraceRecoder recorder = new TraceRecoder()) {
threadLocal.set(recorder);
randSleep("前置", 20);
recorder.sync(() -> subRun("task1", 200), "task1");
recorder.async(() -> randSleep("task2", 100), "task2");
} finally {
threadLocal.remove();
}
}
上面是一个基于上下文的简单使用示例,再整个执行链路中,需要记录耗时的地方,直接从上下文中获取TraceRecoder
即可
但是这里依然存在一个问题,如果上面的这个 subRun
方法,被另外一个入口调用,但是这个入口的调用链路中,没有开启耗时记录,即上下文中没有TraceRecoder
,那岂不是就会导致NPE
?
因此我们需要再上下文获取TraceRecoder
时,做一个保护
public void subRun(String task, int max) {
TraceRecoder recorder = threadLocal.get();
if (recoder == null) {
randSleep(task, max);
randSleep(task, max);
} else {
recorder.async(() -> randSleep(task, max), task + "-1");
recorder.async(() -> randSleep(task, max), task + "-2");
}
}
2. 使用优化
基于上面的几个分析步骤,我们对使用侧的优化就有了一个基本的方向:
- 借助ThreadLocal来持有
TraceRecoder
- 再开始记录的入口,初始化
TraceRecoder
,并保存到上下文 - 再链路的过程中,需要记录耗时的,可以直接从上下文中获取
2.1 抽象TraceRecoder
基于上面提到的几点实现思路,面临一个现实的问题就是从上下文获取TraceRecoder
记录耗时时,若返回null,需要再使用侧做一个兼容,为了不让业务代码变得恶心,我们就需要考虑再使用侧做一个保护
我们抽象一个ITraceRecoder
的接口类,将前面的工具类作为一个具体实现DefaultTraceRecoder
,然后再提供一个同步的实现类SyncTraceRecoder
,用于从上下文中获取不到DefaultTraceRecoder
时,就返回SyncTraceRecoder
,这样对于使用侧而言就不需要做if/else
的null
保护了
接口定义:
public interface ITraceRecoder extends Closeable {
/**
* 待返回结果的同步执行
*
* @param supplier 执行内容
* @param name 耗时标记
* @param <T> 返回类型
* @return 返回结果
*/
<T> T sync(Supplier<T> supplier, String name);
/**
* 无返回结果的同步执行
*
* @param run 执行内容
* @param name 耗时标记
*/
void sync(Runnable run, String name);
/**
* 异步执行
*
* @param supplier 异步任务
* @param name 耗时标记
* @param <T> 返回类型
* @return 返回结果
*/
<T> CompletableFuture<T> async(Supplier<T> supplier, String name);
/**
* 异步执行
*
* @param run 异步任务
* @param name 耗时标记
* @return 返回结果
*/
CompletableFuture<Void> async(Runnable run, String name);
/**
* 等待全部任务执行完毕
*
* @return 返回结果
*/
default ITraceRecoder allExecuted() {
return this;
}
/**
* 日志打印
*
* @return 各任务耗时情况
*/
default Map<String, Long> prettyPrint() {
return Collections.emptyMap();
}
@Override
default void close() {
}
}
兜底的同步实现SyncTraceRecoder
public class SyncTraceRecoder implements ITraceRecoder {
public static SyncTraceRecoder SYNC_RECODER = new SyncTraceRecoder();
/**
* 待返回结果的同步执行
*
* @param supplier 执行内容
* @param name 耗时标记
* @param <T> 返回类型
* @return 返回结果
*/
@Override
public <T> T sync(Supplier<T> supplier, String name) {
return supplier.get();
}
/**
* 无返回结果的同步执行
*
* @param run 执行内容
* @param name 耗时标记
*/
@Override
public void sync(Runnable run, String name) {
run.run();
}
/**
* 依然是同步执行,会直接返回结果
*
* @param supplier 异步任务
* @param name 耗时标记
* @param <T> 返回类型
* @return 返回结果
*/
@Override
public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
return CompletableFuture.completedFuture(supplier.get());
}
/**
* 依然是同步执行,会直接返回结果
*
* @param run 异步任务
* @param name 耗时标记
* @return 返回结果
*/
@Override
public CompletableFuture<Void> async(Runnable run, String name) {
run.run();
return CompletableFuture.completedFuture(null);
}
}
前面封装的耗时统计实现DefaultTraceRecoder
public class DefaultTraceRecoder implements ITraceRecoder {
private static final Logger log = LoggerFactory.getLogger(DefaultTraceRecoder.class);
/**
* trace记录名
*/
private final String traceName;
/**
* 异步任务执行的结果
*/
private final List<CompletableFuture<?>> list;
/**
* 一个子任务的执行耗时
*/
private final Map<String, Long> cost;
/**
* 异步调度的线程池
*/
private final ExecutorService executorService;
/**
* 用于标记是否所有的任务执行完毕
* 执行完毕之后,不在支持继续添加记录
*/
private volatile boolean markExecuteOver;
/**
* 控制是否打印日志的条件
*/
private Supplier<Boolean> logCondition;
/**
* 结束的回调钩子
*/
private Runnable endHook;
public DefaultTraceRecoder() {
this(AsyncUtil.executorService, "TraceDog", () -> true);
}
public DefaultTraceRecoder(ExecutorService executorService, String task, Supplier<Boolean> condition) {
this.traceName = task;
list = new CopyOnWriteArrayList<>();
// 支持排序的耗时记录
cost = new ConcurrentSkipListMap<>();
this.executorService = TtlExecutors.getTtlExecutorService(executorService);
this.markExecuteOver = false;
this.logCondition = condition;
start(task);
MdcUtil.setGlobalTraceId(MdcUtil.fetchGlobalMsgIdForTraceRecoder());
}
/**
* 异步执行,带返回结果
*
* @param supplier 执行任务
* @param name 耗时标识
* @return 异步执行返回结果
*/
@Override
public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
list.add(ans);
return ans;
}
/**
* 同步执行,待返回结果
*
* @param supplier 执行任务
* @param name 耗时标识
* @param <T> 返回类型
* @return 任务的执行返回结果
*/
@Override
public <T> T sync(Supplier<T> supplier, String name) {
return supplyWithTime(supplier, name).get();
}
/**
* 异步执行,无返回结果
*
* @param run 执行任务
* @param name 耗时标识
* @return
*/
@Override
public CompletableFuture<Void> async(Runnable run, String name) {
// 添加一个标识,区分同步执行与异步执行
// 异步任务的执行,在整体的耗时占比只能作为参考
CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
list.add(future);
return future;
}
/**
* 同步执行,无返回结果
*
* @param run 执行任务
* @param name 耗时标识
* @return
*/
@Override
public void sync(Runnable run, String name) {
runWithTime(run, name).run();
}
/**
* 封装一下执行业务逻辑,记录耗时时间
*
* @param run 执行的具体业务逻辑
* @param name 任务名
* @return
*/
private Runnable runWithTime(Runnable run, String name) {
String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
return () -> {
// 将父线程的msgId设置到当前这个执行线程
MdcUtil.setGlobalTraceId(traceId);
start(name);
try {
run.run();
} finally {
end(name);
}
};
}
/**
* 封装一下执行业务逻辑,记录耗时时间
*
* @param call 执行的具体业务逻辑
* @param name 任务名
* @return 返回结果
*/
private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
return () -> {
// 将父线程的msgId设置到当前这个执行线程
MdcUtil.setGlobalTraceId(traceId);
start(name);
try {
return call.get();
} finally {
end(name);
}
};
}
private void start(String name) {
if (markExecuteOver) {
// 所有任务执行完毕,不再新增
if (log.isDebugEnabled()) {
log.debug("allTask ExecuteOver ignore: {}", name);
}
return;
}
cost.put(name, System.currentTimeMillis());
}
private void end(String name) {
long now = System.currentTimeMillis();
long last = cost.getOrDefault(name, now);
if (last >= now / 1000) {
// 之前存储的是时间戳,因此我们需要更新成执行耗时 ms单位
cost.put(name, now - last);
}
}
/**
* 等待所有的任务执行完毕
*
* @return
*/
@Override
public DefaultTraceRecoder allExecuted() {
if (!list.isEmpty()) {
CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})).join();
}
// 记录整体结束
end(this.traceName);
this.markExecuteOver = true;
return this;
}
@Override
public Map<String, Long> prettyPrint() {
// 在格式化输出时,要求所有任务执行完毕
if (!this.markExecuteOver) {
this.allExecuted();
}
if (!logCondition.get()) {
return cost;
}
StringBuilder sb = new StringBuilder();
sb.append('\n');
long totalCost = cost.remove(traceName);
sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
sb.append('\n');
if (cost.isEmpty()) {
sb.append("No task info kept");
} else {
sb.append("---------------------------------------------\n");
sb.append("ms % Task name\n");
sb.append("---------------------------------------------\n");
NumberFormat pf = NumberFormat.getPercentInstance();
pf.setMinimumIntegerDigits(2);
pf.setMinimumFractionDigits(2);
pf.setGroupingUsed(false);
for (Map.Entry<String, Long> entry : cost.entrySet()) {
sb.append(entry.getValue()).append("\t\t");
sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
sb.append(entry.getKey()).append("\n");
}
}
if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
// 若项目中没有Slfj4的实现,则直接使用标准输出
System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
} else if (log.isInfoEnabled()) {
log.info("\n---------------------\n{}\n--------------------\n", sb);
}
return cost;
}
@Override
public void close() {
try {
// 做一个兜底,避免业务侧没有手动结束,导致异步任务没有执行完就提前返回结果
this.allExecuted().prettyPrint();
} catch (Exception e) {
log.error("释放耗时上下文异常! {}", traceName, e);
} finally {
Optional.ofNullable(endHook).ifPresent(Runnable::run);
}
}
/**
* 设置结束的回调钩子
*
* @param endHook
* @return
*/
public DefaultTraceRecoder setEndHook(Runnable endHook) {
this.endHook = endHook;
return this;
}
}
2.2 使用门面
接下来我们就需要对上面的实现做一个使用的门面封装TraceWatch
,对外提供统一的访问姿势
我们可以考虑在TraceWatch
中,持有上下文信息
public class TraceWatch {
private static final TransmittableThreadLocal<ITraceRecoder> THREAD_LOCAL = new TransmittableThreadLocal<>();
public static ITraceRecoder startTrace(String name) {
return startTrace(name, () -> true);
}
public static ITraceRecoder startTrace(String name, Supplier<Boolean> condition) {
return startTrace(AsyncUtil.executorService, name, condition);
}
/**
* 开始trace记录
*
* @param executorService 线程池
* @param name 任务名
* @return
*/
public static ITraceRecoder startTrace(ExecutorService executorService, String name, Supplier<Boolean> condition) {
DefaultTraceRecoder bridge = new DefaultTraceRecoder(executorService, name, condition).setEndHook(TraceWatch::endTrace);
THREAD_LOCAL.set(bridge);
return bridge;
}
/**
* 在使用时,请确保先调用了 startTrace, 一定可以拿到 TraceRecoder,否则请使用 getRecoderOrElseSync() 方法
*
* @return
*/
public static ITraceRecoder getRecoder() {
return THREAD_LOCAL.get();
}
/**
* 获取记录器
* - 如果在请求链路中,有调用过 startTrace,则返回 DefaultTraceRecoder,可以实现链路的耗时统计;
* - 若之前没有调用过 startTrace, 则返回 SyncTraceRecoder, 被记录的函数代码块和直接调用没有区别,不会执行异步、也不会记录耗时
* <p>
* 主要的应用场景就是,同一个方法,会被多个入口调用,当只想记录某几个入口的耗时情况时,使用下面这个方法,就可以保证不会影响其他的调用方
*
* @return
*/
public static ITraceRecoder getRecoderOrElseSync() {
ITraceRecoder recoder = getRecoder();
if (recoder != null) {
return recoder;
}
return SyncTraceRecoder.SYNC_RECODER;
}
public static void endTrace() {
THREAD_LOCAL.remove();
}
}
2.3 使用示例
public void subExecute(String task, int max) {
ITraceRecoder recoder = TraceWatch.getRecoderOrElseSync();
recoder.async(() -> randSleep(task, max), task + "-1");
recoder.async(() -> randSleep(task, max), task + "-2");
}
@Test
public void basicUse() {
try (ITraceRecoder recoder = TraceWatch.startTrace("basicUse")) {
recoder.sync(() -> subExecute("task1", 200), "task1");
recoder.async(() -> {
log.info("task2 内部执行---");
try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); }
}, "task2");
}
System.out.println("\n======== 上下文中没有Recoder | 开始 ===========\n");
subExecute("tt", 10);
System.out.println("\n======== 上下文中没有Recoder | 结束 ===========\n");
}
本文中的过程代码,可以到这里查看 trace-watch-dog
本文中的实现对应的是 trace-watch-dog 核心实现