高防服务器

ReceiverSupervisorImpl实例化怎么实现


ReceiverSupervisorImpl实例化怎么实现

发布时间:2021-12-16 16:39:16 来源:高防服务器网 阅读:104 作者:iii 栏目:云计算

这篇文章主要介绍“ReceiverSupervisorImpl实例化怎么实现”,在日常操作中,相信很多人在ReceiverSupervisorImpl实例化怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ReceiverSupervisorImpl实例化怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

先回顾下 在 Executor执行的具体的方法

  1. 实例化ReceiverSupervisorImpl

  2. start之后等待awaitTermination

// ReceiverTracker.scala line 564  val startReceiverFunc: Iterator[Receiver[_]] => Unit =    (iterator: Iterator[Receiver[_]]) => {      if (!iterator.hasNext) {        throw new SparkException(          "Could not start receiver as object not found.")      }      if (TaskContext.get().attemptNumber() == 0) {        val receiver = iterator.next()        assert(iterator.hasNext == false)        val supervisor = new ReceiverSupervisorImpl(          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)        supervisor.start()        supervisor.awaitTermination()      } else {        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.      }    }

看下ReceiverSupervisorImpl的父类 ReceiverSupervisor的构造。

成员变量赋值、将当前supervisor与receiver关联(  receiver.attachSupervisor(this) )

注释也很清晰:在Worker上负责监督Receiver。提供所需所有 处理从receiver接收到的数据 的接口

// ReceiverSupervisor.scala line 31  /**   * Abstract class that is responsible for supervising a Receiver in the worker.   * It provides all the necessary interfaces for handling the data received by the receiver.   */  private[streaming] abstract class ReceiverSupervisor(      receiver: Receiver[_],      conf: SparkConf    ) extends Logging {      /** Enumeration to identify current state of the Receiver */    object ReceiverState extends Enumeration {      type CheckpointState = Value      val Initialized, Started, Stopped = Value    }    import ReceiverState._      // Attach the supervisor to the receiver    receiver.attachSupervisor(this)               // 将receiver与supervisor关联      private val futureExecutionContext = ExecutionContext.fromExecutorService(      ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))      /** Receiver id */    protected val streamId = receiver.streamId      /** Has the receiver been marked for stop. */    private val stopLatch = new CountDownLatch(1)      /** Time between a receiver is stopped and started again */    private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)      /** The current maximum rate limit for this receiver. */    private[streaming] def getCurrentRateLimit: Long = Long.MaxValue      /** Exception associated with the stopping of the receiver */    @volatile protected var stoppingError: Throwable = null      /** State of the receiver */    @volatile private[streaming] var receiverState = Initialized    // 一些方法,其实就是 数据处理接口  }

ReceiverSupervisorImpl的实例化

  1. 实例化了 BlockManagerBasedBlockHandler,用于将数据发送到BlockManager

  2. 实例化RpcEndpoint

  3. 实例化 BlockGenerator 

  4. 实例化 BlockGeneratorListener 监听器

// ReceiverSupervisorImpl.scala line 43  /**   * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]   * which provides all the necessary functionality for handling the data received by   * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]   * object that is used to divide the received data stream into blocks of data.   */  private[streaming] class ReceiverSupervisorImpl(      receiver: Receiver[_],      env: SparkEnv,      hadoopConf: Configuration,      checkpointDirOption: Option[String]    ) extends ReceiverSupervisor(receiver, env.conf) with Logging {      private val host = SparkEnv.get.blockManager.blockManagerId.host    private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId      private val receivedBlockHandler: ReceivedBlockHandler = {      if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {  // 默认是不开启        if (checkpointDirOption.isEmpty) {          throw new SparkException(            "Cannot enable receiver write-ahead log without checkpoint directory set. " +              "Please use streamingContext.checkpoint() to set the checkpoint directory. " +              "See documentation for more details.")        }        new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,          receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)      } else {        new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)        }    }      /** Remote RpcEndpointRef for the ReceiverTracker */    private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)      /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */    private val endpoint = env.rpcEnv.setupEndpoint(      "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {        override val rpcEnv: RpcEnv = env.rpcEnv          override def receive: PartialFunction[Any, Unit] = {          case StopReceiver =>            logInfo("Received stop signal")            ReceiverSupervisorImpl.this.stop("Stopped by driver", None)          case CleanupOldBlocks(threshTime) =>            logDebug("Received delete old batch signal")            cleanupOldBlocks(threshTime)          case UpdateRateLimit(eps) =>            logInfo(s"Received a new rate limit: $eps.")            registeredBlockGenerators.foreach { bg =>              bg.updateRate(eps)            }        }      })      /** Unique block ids if one wants to add blocks directly */    private val newBlockId = new AtomicLong(System.currentTimeMillis())      private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] // 典型的面包模式      with mutable.SynchronizedBuffer[BlockGenerator]      /** Divides received data records into data blocks for pushing in BlockManager. */    private val defaultBlockGeneratorListener = new BlockGeneratorListener {      def onAddData(data: Any, metadata: Any): Unit = { }        def onGenerateBlock(blockId: StreamBlockId): Unit = { }        def onError(message: String, throwable: Throwable) {        reportError(message, throwable)      }        def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {        pushArrayBuffer(arrayBuffer, None, Some(blockId))      }    }    private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)    // ... 一些方法    /** Store an ArrayBuffer of received data as a data block into Spark's memory. */  def pushArrayBuffer(      arrayBuffer: ArrayBuffer[_],      metadataOption: Option[Any],      blockIdOption: Option[StreamBlockId]    ) {    pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)  }    /** Store block and report it to driver */  def pushAndReportBlock(      receivedBlock: ReceivedBlock,      metadataOption: Option[Any],      blockIdOption: Option[StreamBlockId]    ) {    val blockId = blockIdOption.getOrElse(nextBlockId)    val time = System.currentTimeMillis    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")    val numRecords = blockStoreResult.numRecords    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))    logDebug(s"Reported block $blockId")  }    }

看看BlockGenerator

注释很清晰,有两个线程

  1. 周期性的 将上一批数据 作为一个block,并新建下一个批次的数据;RecurringTimer类,内部有Thread

  2. 将数据push到BlockManager

//  /**   * Generates batches of objects received by a   * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately   * named blocks at regular intervals. This class starts two threads,   * one to periodically start a new batch and prepare the previous batch of as a block,   * the other to push the blocks into the block manager.   *   * Note: Do not create BlockGenerator instances directly inside receivers. Use   * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.   */  private[streaming] class BlockGenerator(      listener: BlockGeneratorListener,      receiverId: Int,      conf: SparkConf,      clock: Clock = new SystemClock()    ) extends RateLimiter(conf) with Logging{    private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])    /**   * The BlockGenerator can be in 5 possible states, in the order as follows.   *   *  - Initialized: Nothing has been started   *  - Active: start() has been called, and it is generating blocks on added data.   *  - StoppedAddingData: stop() has been called, the adding of data has been stopped,   *                       but blocks are still being generated and pushed.   *  - StoppedGeneratingBlocks: Generating of blocks has been stopped, but   *                             they are still being pushed.   *  - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.   */  private object GeneratorState extends Enumeration {    type GeneratorState = Value    val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value  }  import GeneratorState._    private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")    private val blockIntervalTimer =    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")  // 周期性线程  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 负责将数据push的    @volatile private var currentBuffer = new ArrayBuffer[Any]  @volatile private var state = Initialized  //...  }

至此,ReceiverSupervisorImpl实例化完成。不过,截至目前为止Receiver还未启动。

到此,关于“ReceiverSupervisorImpl实例化怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注高防服务器网网站,小编会继续努力为大家带来更多实用的文章!

[微信提示:高防服务器能助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。

[图文来源于网络,不代表本站立场,如有侵权,请联系高防服务器网删除]
[