6. 日志输出与全链路traceId透传
前面封装耗时任务分布工具类的输出,主要是通过System.out.println
进行控制台输出,这显然不符合实际的生产使用,接下来我们使用Slf4j
进行输出的替换,额外需要注意的就是异步场景下,避免出现全链路的traceId的丢失
1. 日志集成
1.1 slf4j日志输出
在项目中使用日志比较简单,先添加依赖
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
然后就是更新下prettyPrint
的日志输出
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TraceRecoder implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TraceRecoder.class);
public Map<String, Long> prettyPrint() {
// 在格式化输出时,要求所有任务执行完毕
if (!this.markExecuteOver) {
this.allExecuted();
}
StringBuilder sb = new StringBuilder();
sb.append('\n');
long totalCost = cost.remove(traceName);
sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
sb.append('\n');
if (cost.isEmpty()) {
sb.append("No task info kept");
} else {
sb.append("---------------------------------------------\n");
sb.append("ms % Task name\n");
sb.append("---------------------------------------------\n");
NumberFormat pf = NumberFormat.getPercentInstance();
pf.setMinimumIntegerDigits(2);
pf.setMinimumFractionDigits(2);
pf.setGroupingUsed(false);
for (Map.Entry<String, Long> entry : cost.entrySet()) {
sb.append(entry.getValue()).append("\t\t");
sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
sb.append(entry.getKey()).append("\n");
}
}
if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
// 若项目中没有Slfj4的实现,则直接使用标准输出
System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
} else if (log.isInfoEnabled()) {
log.info("\n---------------------\n{}\n--------------------\n", sb);
}
return cost;
}
}
重点关注下上面的实现,LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory
通过这一行来判断当前使用的项目中,是否已经集成了日志打印,如果是就利用 log.info()
打印日志,若没有集成类似logback/log4j
之类的日志框架,那就依然使用System.out.println
来输出耗时情况
1.2 全链路的日志集成
对全链路有过了解的小伙伴应该知晓,在整个执行链路中,即便是出现了异步(线程池调度)、跨进程(RPC)、跨服务等场景,仍然需要一个traceId从头到尾一直携带到最后
那么我们的工具类中的异步代码块执行,如何将这个traceId
携带进去呢?
- 借助
MDC
来实现
作为一个工具提供方,我们需要知晓如何从MDC中获取全链路的traceId
,如果没有自定义的全链路traceId生成携带策略,我们也可以提供一个默认的实现进行支持
因此我们先封装一个MdcUtil
工具类,来读写上下文中的traceId
public class MdcUtil {
private static final Logger log = LoggerFactory.getLogger(MdcUtil.class);
private static final String DEFAULT_TRACE_ID_TAG_KEY = "globalMsgId";
/**
* 生成msgId的方法
*/
private static Supplier<String> genIdFunc = null;
/**
* 获取MDC上下文中持有msgId的tagKey
*/
private static String traceIdTagKey = DEFAULT_TRACE_ID_TAG_KEY;
/**
* true 表示上下文中没有全链路traceId时,使用默认的生成策略来记录全链路id; 有则使用上下文中的全链路id
* false 表示只有上下文中存在traceId时,才进行子线程的透传,不会额外生成
*/
private static volatile Boolean traceIdAutoGen = false;
/**
* 注册traceId生成规则
*
* @param gen
*/
public static void registerIdGenFunc(Supplier<String> gen) {
genIdFunc = gen;
}
/**
* 注册全链路traceId存储的key
*
* @param tagKey
*/
public static void registerTraceTagKeyGetFunc(String tagKey) {
traceIdTagKey = tagKey;
}
/**
* 控制是否使用上下文的traceId
*
* @param traceIdAutoGen true 表示上下文中没有全链路traceId时,使用默认的生成策略来记录全链路id; 有则使用上下文中的全链路id
* false 表示只有上下文中存在traceId时,才进行子线程的透传,不会额外生成
*/
public static void setTraceIdAutoGen(Boolean traceIdAutoGen) {
MdcUtil.traceIdAutoGen = traceIdAutoGen;
}
private static void autoInit() {
if (genIdFunc == null) {
registerIdGenFunc(MdcUtil::defaultGenGlobalTraceId);
}
}
/**
* 根据配置,来判断没有traceId时,是直接返回还是新创建一个
*
* @return traceId
*/
public static String fetchGlobalMsgIdForTraceRecoder() {
if (Objects.equals(Boolean.TRUE, traceIdAutoGen)) {
return getOrInitGlobalTraceId();
} else {
return getGlobalTraceId();
}
}
private static String getGlobalTraceId() {
return MDC.get(traceIdTagKey);
}
/**
* 获取全局的traceId,若不存在,则进行初始化
*
* @return traceId
*/
private static String getOrInitGlobalTraceId() {
String traceId = getGlobalTraceId();
if (traceId == null || traceId.isEmpty()) {
return newGlobalTraceId();
}
return traceId;
}
public static void setGlobalTraceId(String msgId) {
if (msgId == null) {
return;
}
try {
MDC.put(traceIdTagKey, msgId);
} catch (Exception e) {
log.error("failed to init MDC globalMsgId! msgId:{}", msgId, e);
}
}
public static String newGlobalTraceId() {
autoInit();
String id = genIdFunc.get();
MDC.put(traceIdTagKey, id);
return id;
}
public static void clear() {
MDC.clear();
}
/**
* 默认的全链路id生成规则
*
* @return traceId
*/
public static String defaultGenGlobalTraceId() {
// fixme: 对于已经有自己的一套全链路的监控的场景,需要在这里进行替换
return UUID.randomUUID().toString().replace("-", "");
}
}
上面的实现,这里主要提供了一个全局的配置traceIdAutoGen
来控制,当上下文中拿不到traceId
时,我们应该是重新生成一个还是压根就不管它
然后我们就需要在之前的实现层TraceRecoder
,做一些改造,以实现异步执行时的traceId透传
- 在执行代码块的封装层,在业务代码执行前初始化
traceId
(需要注意,不要将获取traceId的逻辑放在代码块中了)
/**
* 封装一下执行业务逻辑,记录耗时时间
*
* @param run 执行的具体业务逻辑
* @param name 任务名
* @return
*/
private Runnable runWithTime(Runnable run, String name) {
String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
return () -> {
MdcUtil.setGlobalTraceId(traceId);
start(name);
try {
run.run();
} finally {
end(name);
MdcUtil.clear();
}
};
}
/**
* 封装一下执行业务逻辑,记录耗时时间
*
* @param call 执行的具体业务逻辑
* @param name 任务名
* @return 返回结果
*/
private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
return () -> {
MdcUtil.setGlobalTraceId(traceId);
start(name);
try {
return call.get();
} finally {
end(name);
MdcUtil.clear();
}
};
}
1.3 全链路traceId测试
接下来就需要我们来验证一下集成情况了,首先再看一下完整修改后的TraceRecoder
工具类
public class TraceRecoder implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TraceRecoder.class);
/**
* trace记录名
*/
private final String traceName;
/**
* 异步任务执行的结果
*/
private final List<CompletableFuture<?>> list;
/**
* 一个子任务的执行耗时
*/
private final Map<String, Long> cost;
/**
* 异步调度的线程池
*/
private final ExecutorService executorService;
/**
* 用于标记是否所有的任务执行完毕
* 执行完毕之后,不在支持继续添加记录
*/
private volatile boolean markExecuteOver;
public TraceRecoder() {
this(AsyncUtil.executorService, "TraceDog");
}
public TraceRecoder(ExecutorService executorService, String task) {
this.traceName = task;
list = new CopyOnWriteArrayList<>();
// 支持排序的耗时记录
cost = new ConcurrentSkipListMap<>();
start(task);
this.executorService = executorService;
this.markExecuteOver = false;
// ===> 这里是新增逻辑
MdcUtil.setGlobalTraceId(MdcUtil.fetchGlobalMsgIdForTraceRecoder());
}
/**
* 异步执行,带返回结果
*
* @param supplier 执行任务
* @param name 耗时标识
* @return
*/
public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
list.add(ans);
return ans;
}
/**
* 同步执行,待返回结果
*
* @param supplier 执行任务
* @param name 耗时标识
* @param <T> 返回类型
* @return 任务的执行返回结果
*/
public <T> T sync(Supplier<T> supplier, String name) {
return supplyWithTime(supplier, name).get();
}
/**
* 异步执行,无返回结果
*
* @param run 执行任务
* @param name 耗时标识
* @return
*/
public CompletableFuture<Void> async(Runnable run, String name) {
// 添加一个标识,区分同步执行与异步执行
// 异步任务的执行,在整体的耗时占比只能作为参考
CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
list.add(future);
return future;
}
/**
* 同步执行,无返回结果
*
* @param run 执行任务
* @param name 耗时标识
* @return
*/
public void sync(Runnable run, String name) {
runWithTime(run, name).run();
}
/**
* 封装一下执行业务逻辑,记录耗时时间
*
* @param run 执行的具体业务逻辑
* @param name 任务名
* @return
*/
private Runnable runWithTime(Runnable run, String name) {
// ===> 这里是新增逻辑
String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
return () -> {
MdcUtil.setGlobalTraceId(traceId);
start(name);
try {
run.run();
} finally {
end(name);
}
};
}
/**
* 封装一下执行业务逻辑,记录耗时时间
*
* @param call 执行的具体业务逻辑
* @param name 任务名
* @return 返回结果
*/
private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
// ===> 这里是新增逻辑
String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
return () -> {
MdcUtil.setGlobalTraceId(traceId);
start(name);
try {
return call.get();
} finally {
end(name);
}
};
}
/**
* 等待所有的任务执行完毕
*
* @return
*/
public TraceRecoder allExecuted() {
if (!list.isEmpty()) {
CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})).join();
}
// 记录整体结束
end(this.traceName);
this.markExecuteOver = true;
return this;
}
private void start(String name) {
if (markExecuteOver) {
// 所有任务执行完毕,不再新增
System.out.println("allTask ExecuteOver ignore:" + name);
return;
}
cost.put(name, System.currentTimeMillis());
}
private void end(String name) {
long now = System.currentTimeMillis();
long last = cost.getOrDefault(name, now);
if (last >= now / 1000) {
// 之前存储的是时间戳,因此我们需要更新成执行耗时 ms单位
cost.put(name, now - last);
}
}
public Map<String, Long> prettyPrint() {
// 在格式化输出时,要求所有任务执行完毕
if (!this.markExecuteOver) {
this.allExecuted();
}
StringBuilder sb = new StringBuilder();
sb.append('\n');
long totalCost = cost.remove(traceName);
sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
sb.append('\n');
if (cost.isEmpty()) {
sb.append("No task info kept");
} else {
sb.append("---------------------------------------------\n");
sb.append("ms % Task name\n");
sb.append("---------------------------------------------\n");
NumberFormat pf = NumberFormat.getPercentInstance();
pf.setMinimumIntegerDigits(2);
pf.setMinimumFractionDigits(2);
pf.setGroupingUsed(false);
for (Map.Entry<String, Long> entry : cost.entrySet()) {
sb.append(entry.getValue()).append("\t\t");
sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
sb.append(entry.getKey()).append("\n");
}
}
if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
// 若项目中没有Slfj4的实现,则直接使用标准输出
System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
} else if (log.isInfoEnabled()) {
log.info("\n---------------------\n{}\n--------------------\n", sb);
}
return cost;
}
@Override
public void close() {
// 做一个兜底,避免业务侧没有手动结束,导致异步任务没有执行完就提前返回结果
this.allExecuted().prettyPrint();
}
}
上面将本次新增的逻辑标记了出来,对原有的改造较小,接下来,再写个测试用例,基本使用姿势与之前无异,唯一的区别在于我们再异步代码块中,使用日志输出看看traceId
是否能打印出来
要使用slf4j,先添加一个具体的日志实现,比如logback
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
<scope>test</scope>
</dependency>
然后再资源目录下,添加配置文件 resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
<!-- appender是configuration的子节点,是负责写日志的组件。 -->
<!-- ConsoleAppender:把日志输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%t] %-5level %mdc{globalMsgId} %logger{36}.%M\(%file:%line\) - %msg%n</pattern>
<!-- 控制台也要使用UTF-8,不要使用GBK,否则会中文乱码 -->
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 -->
<!-- 级别依次为【从高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE -->
<!-- additivity=false 表示匹配之后,不再继续传递给其他的logger-->
<logger name="com.github.liuyueyi.hhui" level="INFO" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<!-- 控制台输出日志级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
然后就是测试用例
private static Random random = new Random();
/**
* 随机睡眠一段时间
*
* @param max
*/
private static void randSleep(String task, int max) {
randSleepAndRes(task, max);
}
private static int randSleepAndRes(String task, int max) {
int sleepMillSecond = random.nextInt(max);
try {
System.out.println(task + "==> 随机休眠 " + sleepMillSecond + "ms");
Thread.sleep(sleepMillSecond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return sleepMillSecond;
}
@Test
public void testCost1() {
MdcUtil.setGlobalTraceId("666-666");
try (TraceRecoder recorder = new TraceRecoder()) {
randSleep("前置", 20);
int ans = recorder.sync(() -> {
int r = randSleepAndRes("task1", 200);
log.info("task1 内部执行 --> {}", r);
return r;
}, "task1");
recorder.async(() -> {
int r = randSleepAndRes("task2", 100);
log.info("task2 异步执行 --->{}", r);
}, "task2");
recorder.sync(() -> randSleep("task3", 40), "task3");
}
}
先看下,直接借助已有的traceId
场景
再看下,上下文中没有traceId
,使用默认的的traceId
生成策略
@Test
public void testCost2() {
MdcUtil.initTraceIdAutoGen(true);
try (TraceRecoder recorder = new TraceRecoder()) {
randSleep("前置", 20);
int ans = recorder.sync(() -> {
int r = randSleepAndRes("task1", 200);
log.info("task1 内部执行 --> {}", r);
return r;
}, "task1");
recorder.async(() -> {
int r = randSleepAndRes("task2", 100);
log.info("task2 异步执行 --->{}", r);
}, "task2");
recorder.sync(() -> randSleep("task3", 40), "task3");
}
}
1.4 小结
到这里我们就已经实现了trace-watch-dog
的日志集成了,并且为全链路的traceId
透传也提供了相应的解决方案
从上面的实现来看,我们还有有个更推荐的写法,再任务的执行前后各添加一个钩子,然后再钩子中进行任务执行前后的执行单元注册,将全链路的traceId
透传放在钩子中执行,这样也可以提供更强的扩展能力
至于这个如何设计后续再来介绍
本文中的相关代码,可以到这里查看 trace-watch-dog