微信号:QunarTL

介绍:Qunar技术沙龙是去哪儿网工程师小伙伴以及业界小伙伴们的学习交流平台.我们会分享Qunar和业界最前沿的热门技术趋势和话题;为中高端技术同学提供一个自由的技术交流和学习分享平台.

一个可靠的原创定时任务解决方案

2018-10-23 08:00 王铭鑫

点击上方蓝字关注带来好运哦!!

王铭鑫


个人介绍:王铭鑫,2017 年 7 月入职 Qunar,现任国际机票基础搜索部门 Java 开发工程师,熟悉并发编程和多态编程,对架构设计、性能调优略有涉猎。工作之余,参与修订 C++ 国际标准,发表过多篇 ISO C++ 提案,涉及并发编程和多态编程领域。其中一篇关于多态编程的提案被 C++ 创始人 Bjarne Stroustrup 评价为“has the potential for significantly change the way we write code”,目前正在持续推进其标准化进程。


优雅地写定时任务... 真的很难

在复杂度较高的大规模集群中,定时任务是一种常见的需求;在 Java 语言中,我们通常使用内置的 ScheduledThreadPoolExecutor 类来实现定时任务需求,也可以使用 Quartz 等第三方辅助工具。然而,我们发现在复杂的并发环境中灵活控制定时任务并不容易。

举个例子,假设我们有一个配置中心,提供一个读取配置的 RPC 接口如下:

 
           
  1. public interface RemoteConfigLoader {

  2.    ConfigData load();

  3. }

该接口返回的数据中可能同时存在某些很重要的数据和一些不那么重要的数据。根据需求,客户端程序需要调用该接口以使用远程的配置数据;但由于客户端集群庞大,使用该配置的频率远超过该接口可承受 QPS。所幸,配置中心中的重要数据更新不那么频繁,对于不那么重要的部分我们也不需要远程的实时数据,所以可以使用周期性定时任务的方式内拉取最新配置,例如:

 
           
  1. // 全局数据

  2. RemoteConfigLoader remoteConfigLoader = /*implementation defined*/;

  3. ScheduledExecutorService scheduledExecutorService = /*implementation defined*/;

  4. long updateSeconds = /*implementation defined*/;

  5. volatile ConfigData configData = remoteConfigLoader.load();

  6. // 初始化定时任务

  7. Runnable theTask = () -> configData = remoteConfigLoader.load();

  8. ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(

  9.        theTask, updateSeconds, updateSeconds, TimeUnit.SECONDS);

对于某些重要的数据更新,我们可能更希望尽快同步到每一个客户端,这就需要依赖于分布式消息机制。当然,我们可以将配置作为消息体发送给各个客户端;但当配置数据量较大时,为了降低分布式消息维护开销,我们通常更倾向于让客户端重新拉取。那么,客户端接收到“立即更新”的消息后,应执行怎样的逻辑呢?在上面代码的基础上,可能写出这样的代码:

 
           
  1. // 取消之前的定时任务

  2. scheduledFuture.cancel(true);

  3. // 立即执行一次更新,并更新 Future 对象

  4. scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(

  5.        theTask, 0, updateSeconds, TimeUnit.SECONDS);

乍看上去,这段代码相对还算优雅,如果发布上线也基本可以正常工作;但在某些情况下,这段代码是不可靠的。比如,当接收到消息时,一次更新操作正在进行,此时启动了另一个定时任务就可能出现并发问题——假如 remoteConfigLoader 中保存了一些可变状态(例如一个调用次数计数器或其他数据),或者正在进行的更新操作在第一次新任务完成之后才结束,都可能导致程序运行异常,如下图所示:

另外,假设有多个消息同时到达(可能由于在短时间内更新了多次重要配置),任务的取消和创建也可能会有并发问题,甚至导致线程泄漏和内存泄漏,如下图所示:

那么上述这些问题应该如何避免呢?如果仅使用 ScheduledThreadPoolExecutor 似乎是很难做到的,我们通常需要增加一些并发控制逻辑,例如在上述两个可能产生并发问题的地方分别加锁:

 
           
  1. // 增加两个全局互斥量

  2. final ReentrantLock futureMutex = new ReentrantLock();

  3. final ReentrantLock taskMutex = new ReentrantLock();

  4. // 重定义提交的任务

  5. Runnable theTask = () -> {

  6.    taskMutex.lock();

  7.    try {

  8.        configData = remoteConfigLoader.load();

  9.    } finally {

  10.        taskMutex.unlock();

  11.    }

  12. };

  13. // 触发新定时任务

  14. futureMutex.lock();

  15. try {

  16.    scheduledFuture.cancel(true);

  17.    scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(

  18.            theTask, 0, updateSeconds, TimeUnit.SECONDS);

  19. } finally {

  20.    futureMutex.unlock();

  21. }

可以看出,加入两个互斥量之后,代码逻辑比之前复杂很多,而且如果对于互斥量使用不当还可能导致死锁等问题,这样既不利于测试和维护,甚至可能产生性能瓶颈。注意,如果这里将两个互斥量合并为一个也可能引入并发问题——当定时任务正在执行时,任何触发操作将都被阻塞,从而降低系统吞吐率;假如有两个定时任务同时相互触发,则必然会导致死锁。如果我们借助第三方工具,例如 Quartz,是可以避免这类问题的,但仍需要我们仔细分析并发程序行为来做相应的配置,这里不展开叙述。

进一步,假如我们要新上线一个包含定时任务的功能,为了尽可能不影响线上之前的业务,我们希望为这个定时任务加入一个开关,在开关关闭时不运行定时任务,打开时才运行;如果开关开启后新的代码出现了问题,关闭开关后应可以及时停止定时任务;另外,我们在开关开启后希望根据实际的运行效果来调整定时任务的时间间隔。如果我们基于上面的代码继续开发,则逻辑将会更加复杂:

 
           
  1. // 触发新定时任务

  2. futureMutex.lock();

  3. try {

  4.    if (scheduledFuture != null) {

  5.        scheduledFuture.cancel(true);

  6.    }

  7.    scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(

  8.            theTask, 0, updateSeconds, TimeUnit.SECONDS);

  9. } finally {

  10.    futureMutex.unlock();

  11. }

  12. // 取消定时任务

  13. futureMutex.lock();

  14. try {

  15.    if (scheduledFuture != null) {

  16.        scheduledFuture.cancel(true);

  17.        scheduledFuture = null;

  18.    }

  19. } finally {

  20.    futureMutex.unlock();

  21. }

再举一个简单的例子,假如我们需要开发一个模拟用户操作的自动化测试工具,由于用户点击周期通常不固定,而是在某个区间内类似正态分布,这就要求任意两次定时任务的间隔需要通过更复杂的逻辑计算出来。

这种情况下我们在上面代码的基础上可能写出如下代码:

 
           
  1. // 计算两次任务间隔时间(毫秒)

  2. long getDelayMilliseconds();

  3. // 需要提交的任务

  4. Runnable theTask = () -> {

  5.    taskMutex.lock();

  6.    try {

  7.        runTimedTask();

  8.        futureMutex.lock();

  9.        try {

  10.            scheduledFuture = scheduledExecutorService.schedule(theTask, getDelayMilliseconds(), TimeUnit.MILLISECONDS);

  11.        } finally {

  12.            futureMutex.unlock();

  13.        }

  14.    } finally {

  15.        taskMutex.unlock();

  16.    }

  17. };

  18. // 触发新定时任务

  19. futureMutex.lock();

  20. try {

  21.    if (scheduledFuture != null) {

  22.        scheduledFuture.cancel(true);

  23.    }

  24.    scheduledFuture = scheduledExecutorService.schedule(theTask, 0, TimeUnit.MILLISECONDS);

  25. } finally {

  26.    futureMutex.unlock();

  27. }

此时需要在执行定时任务需要做两次加锁,于是逻辑变得更复杂了。

一站式定时任务解决方案——Circulation Trigger

后文将为大家介绍一个原创的定时任务辅助工具,叫做 Circulation Trigger。在正式介绍其具体设计与实现之前,我们首先看一下如何用这个工具解决上面的那些问题。

首先,对于“定时拉取远程配置”,我们依然保留一些所需的全局数据:

 
           
  1. // 全局数据

  2. RemoteConfigLoader remoteConfigLoader = /*implementation defined*/;

  3. ScheduledExecutorService scheduledExecutorService = /*implementation defined*/;

  4. long updateSeconds = /*implementation defined*/;

  5. volatile ConfigData configData = remoteConfigLoader.load();

使用 Circulation Trigger 定时拉取配置的代码如下:

 
           
  1. // 以固定的周期更新

  2. Duration fixedUpdateDuration = Duration.ofSeconds(updateSeconds);

  3. // 创建 CirculationTrigger,但不启动

  4. final CirculationTrigger trigger = CirculationTrigger.bind(scheduledExecutorService, () -> {

  5.    configData = remoteConfigLoader.load();

  6.    return Optional.of(fixedUpdateDuration);

  7. });

  8. // 触发定时任务

  9. trigger.fire(fixedUpdateDuration);

  10. // 立即触发定时任务

  11. trigger.fire();

  12. // 终止定时任务

  13. trigger.suspend();

当客户端接收到“立即更新”的消息时,只需要调用一次 trigger.fire() 即可(如果需要延迟触发可在 fire 方法中传入一个代表时长的 Duration 对象),然后 Circulation Trigger 就会帮助你自动取消之前的定时计划并尽快执行一次定时任务而不引入任何并发问题

Circulation Trigger 保证多次触发的定时任务一定不会并发执行:假如在消息到来时正在执行一次定时任务,那么 Circulation Trigger 将会在该次定时任务结束后立即再执行一次,这样也保证加载的数据一定是最新的。假如有多个消息同时到达,并发调用 fire 方法时,只会有一次生效,而不会引起任何内存泄漏或线程泄漏的问题。

进一步,假如我们要新上线一个包含定时任务的功能,我们可以随时调用 trigger.fire() 和 trigger.suspend() 触发或终止定时任务;同样地,Circulation Trigger 保证不引入任何并发问题

另外,即使定时任务触发的周期不固定,Circulation Trigger 也提供了足够的扩展性——任何一次触发的周期都由用户指定,例如所述以类似正态分布的方式,周期性模拟用户的点击过程:

 
           
  1. // 触发器

  2. CirculationTrigger trigger = CirculationTrigger.bind(scheduledExecutorService, () -> {

  3.    runTimedTask();

  4.    return Optional.of(Duration.ofMillis(getDelayMilliseconds()));

  5. });

  6. // 触发新定时任务

  7. trigger.fire();

设计过程中的一些思考

设计这个工具之前我阅读过一些业务中实现定时任务的程序,但其中大多数都或多或少存在潜在的并发问题——不仅这些 bug 难以复现,而且相关并发控制代码可复用性不高。所以,这个工具设计的首要目标就是彻底杜绝并发问题,除此之外我还有一些其他的考量。

1.杜绝并发问题

这个目标说起来容易,但在实现的时候需要考虑很多边界情况。举个例子,假如我们顺序地做两次触发,第一次在 5 分钟后执行定时任务,第二次在 1 分钟后执行定时任务,那么通常情况下我们更希望以第二次为准;进一步,在一次定时任务运行时,并发地进行多次不同延时的触发操作,下一次任务应该在什么时候运行呢?根据需求调研,我们规定,当某次触发的任务尚未运行时,之后的触发信号应覆盖该信号,这样既可以避免不必要的触发,又保证了最新的有效触发一定成功。

2.支持主动终止

细心的同学应该会发现,构造 Circulation Trigger 时传入的 lambda 表达式的返回值类型是 Optional 。当其有值时,其语义是下一次运行定时任务的时间间隔;反之,当其没有值时则不进行下一次定时任务。这样设计 API 是为了让用户可以优雅地主动终止定时任务。

3.时间表示方法

ScheduledThreadPoolExecutor 的 API 支持两种提交周期性任务方式,一种是按固定频率调度(对应 scheduleAtFixedRate 方法),另一种是按固定间隔时间调度(对应 scheduleWithFixedDelay 方法);当然,我们还可以选择按精确时刻调度,这样似乎更符合“定时任务”的定义。虽然这些表示方法都可以相互无损转化,但是经过调研,我们发现周期性定时任务通常更关注时间间隔,所以我们选择按照间隔时间调度的方式。

Java 8 之后已经有了标准的时间库,例如我们可以使用 Instant 表示一个精确时刻,使用 Duration 和 Period 表示一个时间段。由于 Duration 提供了更精确的时间表示方法,为了此工具更加通用,我们在设计 API 是选择了 Duration。

4.底层组件选取

为了让此工具更易使用,我们选取 Java 自带的 ScheduledExecutorService 接口作为本工具的底层组件。事实上,由于 ScheduledExecutorService 本身不支持定时任务的自提交以减少临界区的竞争,对于此工具而言仍存在性能可提升空间;但由于该开销不会产生性能瓶颈,为了降低学习成本,我们没有加入其它提升扩展性的中间层。但是为了充分测试此工具,我们也提供了一个相应的 C++ 实现,其线程池被设计为支持定时任务的自提交的运行模式;代码已经过充分测试,欢迎大家使用。

5.继承 VS 组合

在 API 设计上,我们考虑过使用继承的方式,即提供一个抽象类,包含一个承载定时任务逻辑的抽象方法。经分析,我们认为使用 Java 实现此机制时,两种方法的学习和使用成本相当;但在其他语言中,例如 C++,使用组合要明显优于继承的方式。为了增加可移植性,我们最终选择了组合而非继承的方式。

6.异常处理

异常是 Java 的核心语言机制,理论上任何方法的调用都可能抛出异常。然而,由于定时任务在线程池中异步运行,按照串行的方式捕获异常是不可行的。我们规定,当一次定时任务的运行抛出异常时,该异常应湮没以不影响其他运行在线程池中的定时任务运行;所以用户应尽量避免在执行定时任务时抛出异常。如果用户希望终止后续的定时任务,应显示返回有明确语义的 Optional.empty()。

API 设计

本工具共包含两个部件,分别是 Interface CirculatingRunnable 和 Class CirculationTrigger。

1.Interface CirculatingRunnable

public interface CirculatingRunnable

此接口语义是适用于 Circulation Trigger 的周期性定时任务。

   Optional runOneIteration()

  • 功能:执行一次定时任务。

  • 返回:若返回值存在,则表示当前时刻到下一次定时任务时间间隔;否则表示不再执行下一次定时任务。

2.Class CirculationTrigger

public final class CirculationTrigger

此类是定时任务触发器。

   public static CirculationTrigger bind(ScheduledExecutorService executor, CirculatingRunnable task)

  • 功能:将一个 ScheduledExecutorService 和定时任务实例绑定为一个定时任务触发器。

  • 后置条件:创建的触发器处于未触发状态。

   public void fire()

  • 功能:立即触发一次定时任务,等价于在 this 上调用 fire(Duration.ZERO)。

   public void fire(Duration delay)

  • 功能:以 delay 的延时触发一次定时任务。

  • 同步:所有对于 fire 和 suspend 方法的调用顺序执行,最后一次对 fire 或 suspend 方法的调用将无效化之前的所有在 this 上对 fire 或 suspend 方法的调用。

   public void suspend()

  • 功能:停止定时任务。若定时任务正在运行,则忽略该次定时任务对于下次执行时间间隔的指定。

  • 同步:所有对于 fire 和 suspend 方法的调用顺序执行,最后一次对 fire 或 suspend 方法的调用将无效化之前的所有在 this 上对 fire 或 suspend 方法的调用。

算法实现

在实现一个并发机制之前,我们通常都会先提取其在不同上下文中的状态。对于这个工具来说,显然至少需要三个维度的状态:

  1. 定时任务运行过程中,新触发的定时任务是否被阻塞;

  2. 定时任务是否在运行;

  3. 最新的定时任务标识。

对于任务标识,我们可以直接使用 Java 的引用,但这样就不可避免使用互斥量或者 Immutable 模式,开销较大;如果我们采用版本号的方式,就可以使用无锁的方式实现这个算法。由于触发定时任务的操作通常不会特别频繁,我们认为使用一个 32 位整数(后文称之为“状态”)来存储这三个维度的信息就足够了;具体地,1 和 2 可以分别使用一个比特位,剩余 30 位存储版本号,空间为 0x00000000~0x3FFFFFFF,可循环使用。为了便于叙述,我们后文将上述三个维度分别称为:预约位、运行位和最新版本号,依次排列,如下图所示:

为便于大家理解此算法,此处提取出三个原子核心过程:

过程 1,更新版本号:

此过程较为简单,如果存在预约则清除预约位,然后将版本号循环加一即可。注意,版本号的最大值为 0x3FFFFFFF,加一之后应该变为 0x00000000。伪代码如下:

 
           
  1. 原子地执行 [

  2.  var 新版本号 = (状态 + 1) & 0x3FFFFFFF

  3.  状态 = (状态 & 0x40000000) | 新版本号

  4.  return 新版本号

  5. ]

过程 2,定时任务预处理:

此过程有两个独立步骤,首先检查版本号是否是最新的,然后设置运行位。令该过程输出“该次定时任务是否执行”,伪代码如下:

 
           
  1. 原子地执行 [

  2.  var 最新版本号 = 状态 & 0x3FFFFFFF

  3.  if (当前定时任务版本号 != 最新版本号)

  4.  then

  5.    return false

  6.  endif

  7.  var 当前运行位 = 状态 & 0x40000000

  8.  if (当前运行位 != 0)

  9.  then

  10.    状态 = 状态 | 0x80000000

  11.    return false

  12.  else

  13.    状态 = 状态 | 0x40000000

  14.    return true

  15.  endif

  16. ]

过程 3,定时任务后处理:

此过程需检查是否存在新版本定时任务预约。令此过程输出“{是否继续运行, 新版本号}”,伪代码如下:

 
           
  1. 原子地执行 [

  2.  var 预约位 = 状态 & 0x80000000

  3.  if (预约位 != 0)

  4.  then

  5.    状态 = 状态 & 0x7FFFFFFF

  6.    var 新版本号 = 状态 & 0x3FFFFFFF

  7.    return {true, 新版本号}

  8.  else

  9.    状态 = 状态 & 0x3FFFFFFF

  10.    return {false, 无}

  11.  endif

  12. ]

更多实现细节请参照该算法的完整实现:

 
           
  1. import java.time.Duration;

  2. import java.util.Optional;

  3. import java.util.concurrent.ScheduledExecutorService;

  4. import java.util.concurrent.TimeUnit;

  5. import java.util.concurrent.atomic.AtomicInteger;

  6. public final class CirculationTrigger {

  7.    private static final int VERSION_MASK = 0x3FFFFFFF;

  8.    private static final int RUNNING_FLAG_MASK = 0x40000000;

  9.    private static final int RESERVATION_FLAG_MASK = 0x80000000;

  10.    private final AtomicInteger state;

  11.    private final ScheduledExecutorService executor;

  12.    private final CirculatingRunnable task;

  13.    public static CirculationTrigger bind(ScheduledExecutorService executor, CirculatingRunnable task) {

  14.        return new CirculationTrigger(new AtomicInteger(0), executor, task);

  15.    }

  16.    public void fire() {

  17.        fire(Duration.ZERO);

  18.    }

  19.    public void fire(Duration delay) {

  20.        schedule(new Task(advanceVersion()), delay);

  21.    }

  22.    public void suspend() {

  23.        advanceVersion();

  24.    }

  25.    private CirculationTrigger(AtomicInteger state, ScheduledExecutorService executor, CirculatingRunnable task) {

  26.        this.state = state;

  27.        this.executor = executor;

  28.        this.task = task;

  29.    }

  30.    private int advanceVersion() {

  31.        int s, v;

  32.        do {

  33.            s = state.get();

  34.            v = s + 1 & VERSION_MASK;

  35.        } while (!state.weakCompareAndSet(s, s & RUNNING_FLAG_MASK | v));  // Cancel any reservation

  36.        return v;

  37.    }

  38.    private void schedule(Task task, Duration duration) {

  39.        executor.schedule(task, duration.toNanos(), TimeUnit.NANOSECONDS);

  40.    }

  41.    private final class Task implements Runnable {

  42.        private int version;

  43.        private Task(int version) {

  44.            this.version = version;

  45.        }

  46.        @Override

  47.        public void run() {

  48.            int s;

  49.            for (;;) {

  50.                s = state.get();

  51.                if ((s & VERSION_MASK) != version) {

  52.                    return;

  53.                }

  54.                if ((s & RUNNING_FLAG_MASK) == 0) {  // No other instances are running

  55.                    if (state.weakCompareAndSet(s, s | RUNNING_FLAG_MASK)) {

  56.                        break;

  57.                    }

  58.                } else {

  59.                    if (state.weakCompareAndSet(s, s | RESERVATION_FLAG_MASK)) {

  60.                        return;

  61.                    }

  62.                }

  63.            }

  64.            for (;;) {

  65.                Duration nextDelay;

  66.                try {

  67.                    Optional<Duration> nextDelayOptional = task.runOneIteration();

  68.                    nextDelay = nextDelayOptional.orElse(null);

  69.                } catch (Throwable ignore) {

  70.                    nextDelay = null;

  71.                }

  72.                for (;;) {

  73.                    s = state.get();

  74.                    if ((s & RESERVATION_FLAG_MASK) == 0) {

  75.                        if (state.weakCompareAndSet(s, s & ~RUNNING_FLAG_MASK)) {  // Reset running state

  76.                            if (version == (s & VERSION_MASK) && nextDelay != null) {

  77.                                schedule(this, nextDelay);

  78.                            }

  79.                            return;

  80.                        }

  81.                    } else {

  82.                        if (state.weakCompareAndSet(s, s & ~RESERVATION_FLAG_MASK)) {  // Reset reservation

  83.                            version = s & VERSION_MASK;  // Update version

  84.                            break;

  85.                        }

  86.                    }

  87.                }

  88.            }

  89.        }

  90.    }

  91. }

其中,接口 CirculatingRunnable 代码如下:

 
           
  1. import java.time.Duration;

  2. import java.util.Optional;

  3. @FunctionalInterface

  4. public interface CirculatingRunnable {

  5.    Optional<Duration> runOneIteration();

  6. }

以上代码可以使用 Java 8 及以上版本编译。事实上,在 Java 9 之后,Java 对原子操作的 API 做了一些升级,允许用户指定原子操作的内存顺序(Memory Order)。上述代码中所使用的 weakCompareAndSet 方法已被弃用,但为了兼容 Java 8 的程序,我给出的实现中并没有使用 Java 9 之后的的 API。理论上,上述程序中只需要指定两个操作的内存顺序即可,不需要保持所有操作的串行一致性,感兴趣的同学可以参照我们给出的此工具 C++ 实现(https://github.com/wmx16835/wang/blob/master/src/main/experimental/concurrent.h#L393-L517),该实现中已经使用了最优的内存同步配置。

 
Qunar技术沙龙 更多文章 【基本功】 前端安全系列之二:如何防止 CSRF 攻击? 【基本功】 前端安全系列之一:如何防止XSS攻击? Jest 前端单元测试框架 react-router v4 源码分析 机器学习之 scikit-learn 开发入门(5)
猜您喜欢 面试编程题:单链表和之恋;和海枯石烂题之分析 探究如何给Python程序做hotfix log4net 写数据到sql数据库 Redis Cluster 架构优化 游戏是怎么赚钱的 - 聊聊留存