1.实现一个简单的分布式定时任务(应用篇)

一灰灰blogSpringBoot基础系列定时器应用篇Scheduled应用约 1728 字大约 6 分钟

在SpringBoot中,想使用定时器比较简单,一个注解@Scheduled配合上cron表达式即可支持各种定时任务了;

单机任务还是比较简单的,但是当我们的服务有多个实例在运行时,如果只希望一个实例上的定时任务执行,可以怎么出了呢?

I. 场景分析

1. 需求说明

简单来讲,就是我们的应用中,有一个定时任务(如每天八点给用户推送早报),为了保证我们的应用可靠性,这个应用部署了三台实例

因此当我们对定时任务不做任何处理时,每台实例上的定时任务到了八点就执行,用户就会接收到三次推送

所以为了避免上面的场景出现,我们希望,三个实例中只能有一个实例的定时任务生效,其他的两个不执行

2. 方案设计

从上面的场景描述,可以看成是一个分布式的任务调度的问题,当然我们接下来并不是要实现一个功能完备的分布式任务调度(已经有很多相关的优秀框架了... 如果感兴趣的话,也不是不可以再造一个轮子);

我们可以采用一个简单的方案来实现这种排他性,比如我们选择一个公用的配置信息用来存储允许执行定时任务的实例ip,当实例的定时任务执行之前,先判断一下实例ip与配置的ip是否相同,如果是,则可以执行定时任务;否则不执行

注意:上面这种方式可以实现排他性,但是有风险哦(如配置中的ip不对,或者这个ip对应的实例下线了,可能导致定时任务没有实例运行哦)

II. 分布式定时任务实现

0. 分析

在我们上面的设计中,有几个问题是需要我们关心的

  • 配置如何处理?
  • 如何拦截定时器的操作

配置存储

一般来讲,应用部署多个实例的情况下,多半实在微服务的架构下,如果我们有可用的配置中心,这也是一个比较好的存储配置的方案;

当然如果没有配置中心,使用redis/db进行配置的存储,也是ok的,具体的选择取决于我们的应用实际情况;唯一需要注意的是,这个配置对于所有的实例公用,仅存一份

定时器拦截

关于定时器的拦截,最容易想到的就是AOP,拦截目标方法,然后在切面中判断是否需要执行

直接使用AOP拦截所有的@Scheduled方法貌似可以,但是有以下缺陷

  • 定时任务的方法需要是public/package/protected,不然切面不会生效(请注意,定时任务注解@Scheduled是可以修饰private方法,并生效的哦,它的具体实现方式和我们常见的AOP有一些区别)
  • 当系统中存在分布式定时任务和普通定时任务(即每个实例的定时任务都需要正常执行)时,不太合适
    • 因此入股使用AOP,可以考虑新建一个注解,来实现

定时任务扩展

这里走的是另外一条思路,在Spring扫描定时任务的时候,我们扩展一下最终的任务逻辑,在具体的执行之前,做一个过滤规则

1. AOP实现方式

为了简化后续的测试逻辑,我们用一个简单的方式用来替代多实例的测试,将多实例的单任务执行,简化为单实例中,多个定时任务只允许一个执行,通过方法名来判断到底允许哪个任务执行

首先自定义一个注解

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributeTask {

    /**
     * lock key
     *
     * @return
     */
    String key();
}

写一个简单的获取配置的Mock类,用来模拟配置竞争

public class ExecutiveLock {

    private Map<String, String> lockMap = new ConcurrentHashMap<>();

    public static final ExecutiveLock instance = new ExecutiveLock();

    private ExecutiveLock() {
    }

    public boolean tryLock(String key, String lockVal) {
        String old = lockMap.computeIfAbsent(key, (s) -> lockVal);
        return lockVal.equalsIgnoreCase(old);
    }
}

核心的切面逻辑,

@Aspect
@Component
public class DemoAop {

    @Around("@annotation(DistributeTask)")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        String lock = method.getName();
        DistributeTask task = method.getAnnotation(DistributeTask.class);
        if (ExecutiveLock.instance.tryLock(task.key(), lock)) {
            System.out.println("Aop allow: " + lock);
            return joinPoint.proceed();
        } else {
            System.out.println("Aop ignore : " + lock);
            return null;
        }
    }
}

写两个定时任务,请注意两个方法不能是private,否则启动会失败

/**
 * 每s执行一次
 *
 * @throws InterruptedException
 */
@Scheduled(cron = "0/5 * * * * ?")
@DistributeTask(key = "tt")
void scheduleAtFixRate() throws InterruptedException {
    System.out.println("Rate1: " + LocalDateTime.now() + " >>> " + Thread.currentThread().getName());
}


/**
 * 每s执行一次
 *
 * @throws InterruptedException
 */
@Scheduled(cron = "0/5 * * * * ?")
@DistributeTask(key = "tt")
void scheduleAtFixRate2() throws InterruptedException {
    System.out.println("Rate2: " + LocalDateTime.now() + " >>> " + Thread.currentThread().getName());
}

接下来是启动类,请注意看注解

@SpringBootApplication
@EnableScheduling
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

2. 定时任务扩展

上面aop的方式除了要求定时任务方法不能是private之外,没有什么其他的问题;

但如果我就希望定时任务可以是private方法时,可以怎么办?

接下来我们通过自定义的定时任务来替换Spring提供的,使用的Spring版本为5.2.1.RELEASE(不同的版本实现方式会有些区别哦)

自定义ScheduledAnnotationBeanPostProcessor,重写生成定时任务的方法

public class MyScheduledAnnotationBeanPostProcessor extends ScheduledAnnotationBeanPostProcessor {
    @Override
    protected Runnable createRunnable(Object target, Method method) {
        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
        Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
        return new MyScheduledMethodRunnable(target, invocableMethod);
    }
}

在我们自定义的定时任务中,添加判断逻辑MyScheduledMethodRunnable

public class MyScheduledMethodRunnable extends ScheduledMethodRunnable {

    private final DistributeTask distributeTask;

    public MyScheduledMethodRunnable(Object target, Method method) {
        super(target, method);
        distributeTask = method.getAnnotation(DistributeTask.class);
    }

    @Override
    public void run() {
        if (distributeTask == null) {
            super.run();
        } else {
            String lock = getMethod().getName();
            if (ExecutiveLock.instance.tryLock(distributeTask.key(), lock)) {
                System.out.println("扩展 run: " + lock);
                super.run();
            } else {
                System.out.println("扩展 ignore: " + lock);
            }
        }
    }
}

引用我们自定义的定时任务处理类替换默认的实现

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(MyScheduledAnnotationBeanPostProcessor.class)
@Documented
public @interface EnableDistributeScheduling {
}

然后我们的启动类,需要修改一下,将@EnableScheduling替换为@EnableDistributeScheduling

@SpringBootApplication
@EnableDistributeScheduling
//@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

然后将我们的定时任务,添加上private修饰,最终测试输出结果如下图

上面介绍的这种实现方式,主要是接入定时任务的生成逻辑,在这里我们有更多的操作空间,比如某些不满足条件的定时任务直接不生成,避免无用的调度

II. 其他

0. 项目

Loading...