微信号:SparkDaily

介绍:每日播报Spark相关技术及资讯,我们坚信Spark才是未来的通用大数据处理框架.

Spark Streaming程序的优雅停止

2016-07-01 17:32 Spark技术日报

1.  Spark Streaming程序的停止可以是强制停止、异常停止或其他方式停止。首先我们看StreamingContext的stop()方法


    def stop(

       stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)

      ): Unit = synchronized {

     stop(stopSparkContext, false)

    }


    这里定义了两个参数,stopSparkContext可以通过配置文件定义,接着看接收两个参数的stop方法,代码如下


    /**

    * Stop the execution of the streams, with option of ensuring all received data

    * has been processed.

    *

    * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext

    *                         will be stopped regardless of whether this StreamingContext has been

    *                         started.

    * @param stopGracefully if true, stops gracefully by waiting for the processing of all

    *                       received data to be completed

    */

    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {

     var shutdownHookRefToRemove: AnyRef = null

     if (AsynchronousListenerBus.withinListenerThread.value) {

       throw new SparkException("Cannot stop StreamingContext within listener thread of" +

         " AsynchronousListenerBus")

     }

     synchronized {

       try {

         state match {

           case INITIALIZED =>

             logWarning("StreamingContext has not been started yet")

           case STOPPED =>

             logWarning("StreamingContext has already been stopped")

           case ACTIVE =>

             scheduler.stop(stopGracefully)

             // Removing the streamingSource to de-register the metrics on stop()

             env.metricsSystem.removeSource(streamingSource)

             uiTab.foreach(_.detach())

             StreamingContext.setActiveContext(null)

             waiter.notifyStop()

             if (shutdownHookRef != null) {

               shutdownHookRefToRemove = shutdownHookRef

               shutdownHookRef = null

             }

             logInfo("StreamingContext stopped successfully")

         }

       } finally {

         // The state should always be Stopped after calling `stop()`, even if we haven't started yet

         state = STOPPED

       }

     }

     if (shutdownHookRefToRemove != null) {

       ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)

     }

     // Even if we have already stopped, we still need to attempt to stop the SparkContext because

     // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).

     if (stopSparkContext) sc.stop()

    }


    注释中说明要停止程序时,正确的方式是需要所有接收的数据被处理完成后再停止,那么就需要我们传入的stopGracefully参数为true,然后停止时会等待所有任务执行完成


2. Spark Streaming提供了一个优雅停止的方法,在StreamingContext里面有一个stopOnShutdown()方法,代码如下


    private def stopOnShutdown(): Unit = {

     val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)

     logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")

     // Do not stop SparkContext, let its own shutdown hook stop it

     stop(stopSparkContext = false, stopGracefully = stopGracefully)

    }


    stopOnShutdown()方法是什么意思呢,在我们的程序退出时,不管是正常退出或异常退出,stopOnShutdown()方法都会被回调,然后调用stop方法。stopGracefully 可以通过配置项spark.streaming.stopGracefullyOnShutdown配置,生产环境需要配置为true.


3.     stopOnShutdown()方法是怎样被调用的呢?在StreamingContext的start方法中有一行代码


    shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)


    添加stopOnShutdown函数到ShutdownHookManager中,addShutdownHook代码如下


    def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {

     shutdownHooks.add(priority, hook)

    }


    看SparkShutdownHookManager 里都有什么,看代码注释了解SparkShutdownHookManager的功能,不一一介绍


    private [util] class SparkShutdownHookManager {


    // 优先级队列,优先级越大,越优先执行

    private val hooks = new PriorityQueue[SparkShutdownHook]()

    @volatile private var shuttingDown = false


    /**

    * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not

    * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for

    * the best.

    */

    // 这里实例化一个线程,添加到jvm的关闭钩子中,等到jvm退出时才会被调用

    def install(): Unit = {

     val hookTask = new Runnable() {

       override def run(): Unit = runAll()

     }    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {

       case Success(shmClass) =>

         val fsPriority = classOf[FileSystem]

           .getField("SHUTDOWN_HOOK_PRIORITY")

           .get(null) // static field, the value is not used

           .asInstanceOf[Int]

         val shm = shmClass.getMethod("get").invoke(null)

         shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])

           .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))

       case Failure(_) =>

         Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));

     }

    }

    // jvm退出时钩子回调此函数

    def runAll(): Unit = {

     shuttingDown = true

     var nextHook: SparkShutdownHook = null

     //循环从优先级队列取数据执行,优先级越大,越优先执行

     while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {

       Try(Utils.logUncaughtExceptions(nextHook.run()))

     }

    }

    def add(priority: Int, hook: () => Unit): AnyRef = {

     hooks.synchronized {

       if (shuttingDown) {

         throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")

       }

       val hookRef = new SparkShutdownHook(priority, hook)

       hooks.add(hookRef)

       hookRef

     }

    }

    def remove(ref: AnyRef): Boolean = {

     hooks.synchronized { hooks.remove(ref) }

    }

    }


4. 看到这里就明白了,把stopOnShutdown()函数放入SparkShutdownHookManager 中的优化级队列hooks中,默认优先级为51,jvm退出时启动一个线程,调用runAll()方法,然后从hooks队列中一个一个取数据(函数),然后执行,就调用了stopOnShutdown()函数,接着调用stop()函数,我们的应用程序就可以优雅的执行停止工作了。


来源:http://www.jianshu.com/p/18cd94b5c647

 
Spark技术日报 更多文章 Databricks连城:Spark SQL 1.2的提升和新特性 Spark布道者陈超:Spark Ecosystem & Internals ML Pipelines:Spark 1.2中一个用于MLlib的High-Level API 百度基础架构部架构师孙垚光——百度高性能通用Shuffle服务 陈超:Kafka 0.8.2 新的offset管理
猜您喜欢 中国开源软件推进联盟赴京东调研,共同推进技术开源 知识点归纳(1) 国庆长假综合症?云真机新功能让你迅速重回高效测试状态 java培训需要多少钱,学习java贵不贵 Databricks公司联合创始人、Spark首席架构师辛湜:Spark发展,回顾2015,展望2016