https://github.com/mperham/sidekiq
Raw File
Tip revision: 179d30e712fc23cc00ab91f23896e814c7c05e92 authored by Mike Perham on 01 November 2016, 18:41:54 UTC
latest changes
Tip revision: 179d30e
manager.rb
# frozen_string_literal: true
# encoding: utf-8
require 'sidekiq/util'
require 'sidekiq/processor'
require 'sidekiq/fetch'
require 'thread'
require 'set'

module Sidekiq

  ##
  # The Manager is the central coordination point in Sidekiq, controlling
  # the lifecycle of the Processors and feeding them jobs as necessary.
  #
  # Tasks:
  #
  # 1. start: Spin up Processors.
  # 3. processor_died: Handle job failure, throw away Processor, create new one.
  # 4. quiet: shutdown idle Processors.
  # 5. stop: hard stop the Processors by deadline.
  #
  # Note that only the last task requires its own Thread since it has to monitor
  # the shutdown process.  The other tasks are performed by other threads.
  #
  class Manager
    include Util

    attr_reader :workers
    attr_reader :options

    def initialize(options={})
      logger.debug { options.inspect }
      @options = options
      @count = options[:concurrency] || 25
      raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

      @done = false
      @workers = Set.new
      @count.times do
        @workers << Processor.new(self)
      end
      @plock = Mutex.new
    end

    def start
      @workers.each do |x|
        x.start
      end
    end

    def quiet
      return if @done
      @done = true

      logger.info { "Terminating quiet workers" }
      @workers.each { |x| x.terminate }
      fire_event(:quiet, true)
    end

    # hack for quicker development / testing environment #2774
    PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5

    def stop(deadline)
      quiet
      fire_event(:shutdown, true)

      # some of the shutdown events can be async,
      # we don't have any way to know when they're done but
      # give them a little time to take effect
      sleep PAUSE_TIME
      return if @workers.empty?

      logger.info { "Pausing to allow workers to finish..." }
      remaining = deadline - Time.now
      while remaining > PAUSE_TIME
        return if @workers.empty?
        sleep PAUSE_TIME
        remaining = deadline - Time.now
      end
      return if @workers.empty?

      hard_shutdown
    end

    def processor_stopped(processor)
      @plock.synchronize do
        @workers.delete(processor)
      end
    end

    def processor_died(processor, reason)
      @plock.synchronize do
        @workers.delete(processor)
        unless @done
          p = Processor.new(self)
          @workers << p
          p.start
        end
      end
    end

    def stopped?
      @done
    end

    private

    def hard_shutdown
      # We've reached the timeout and we still have busy workers.
      # They must die but their jobs shall live on.
      cleanup = nil
      @plock.synchronize do
        cleanup = @workers.dup
      end

      if cleanup.size > 0
        jobs = cleanup.map {|p| p.job }.compact

        logger.warn { "Terminating #{cleanup.size} busy worker threads" }
        logger.warn { "Work still in progress #{jobs.inspect}" }

        # Re-enqueue unfinished jobs
        # NOTE: You may notice that we may push a job back to redis before
        # the worker thread is terminated. This is ok because Sidekiq's
        # contract says that jobs are run AT LEAST once. Process termination
        # is delayed until we're certain the jobs are back in Redis because
        # it is worse to lose a job than to run it twice.
        strategy = (@options[:fetch] || Sidekiq::BasicFetch)
        strategy.bulk_requeue(jobs, @options)
      end

      cleanup.each do |processor|
        processor.kill
      end
    end

  end
end
back to top