https://github.com/mperham/sidekiq
Raw File
Tip revision: 16e170cc1a6b7d5fc928be2915057aa80f3a4448 authored by Mike Perham on 06 February 2015, 06:03:07 UTC
Merge branch 'master' into 4_0
Tip revision: 16e170c
job.rb
require 'sidekiq/client'
require 'sidekiq/core_ext'

module Sidekiq

  ##
  # Include this module in your job class and you can easily create
  # asynchronous jobs:
  #
  # class HardJob
  #   include Sidekiq::Job
  #
  #   def perform(*args)
  #     # do some work
  #   end
  # end
  #
  # Then in your Rails app, you can do this:
  #
  #   HardJob.perform_async(1, 2, 3)
  #
  # Note that perform_async is a class method, perform is an instance method.
  module Job
    attr_accessor :jid

    def self.included(base)
      base.extend(ClassMethods)
      base.class_attribute :sidekiq_options_hash
      base.class_attribute :sidekiq_retry_in_block
      base.class_attribute :sidekiq_retries_exhausted_block
    end

    def logger
      Sidekiq.logger
    end

    module ClassMethods

      def perform_async(*args)
        client_push('class' => self, 'args' => args)
      end

      def perform_in(interval, *args)
        int = interval.to_f
        now = Time.now.to_f
        ts = (int < 1_000_000_000 ? now + int : int)

        item = { 'class' => self, 'args' => args, 'at' => ts }

        # Optimization to enqueue something now that is scheduled to go out now or in the past
        item.delete('at'.freeze) if ts <= now

        client_push(item)
      end
      alias_method :perform_at, :perform_in

      ##
      # Allows customization for this type of Job.
      # Legal options:
      #
      #   :queue - use a named queue for this Job, default 'default'
      #   :retry - enable the RetryJobs middleware for this Job, default *true*
      #   :backtrace - whether to save any error backtrace in the retry payload to display in web UI,
      #      can be true, false or an integer number of lines to save, default *false*
      #   :pool - use the given Redis connection pool to push this type of job to a given shard.
      def sidekiq_options(opts={})
        self.sidekiq_options_hash = get_sidekiq_options.merge((opts || {}).stringify_keys)
      end

      def sidekiq_retry_in(&block)
        self.sidekiq_retry_in_block = block
      end

      def sidekiq_retries_exhausted(&block)
        self.sidekiq_retries_exhausted_block = block
      end

      def get_sidekiq_options # :nodoc:
        self.sidekiq_options_hash ||= Sidekiq.default_job_options
      end

      def client_push(item) # :nodoc:
        pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
        Sidekiq::Client.new(pool).push(item.stringify_keys)
      end

    end
  end

  # Backwards compatibility
  Worker = Job
end
back to top