https://github.com/mperham/sidekiq
Raw File
Tip revision: 022c059c7b417d24cf1f892fa71f8e98d19ca93f authored by Mike Perham on 25 May 2023, 14:13:45 UTC
bump, prep
Tip revision: 022c059
sidekiqload
#!/usr/bin/env ruby

# Quiet some warnings we see when running in warning mode:
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false

# require "ruby-prof"
require "bundler/setup"
Bundler.require(:default, :load_test)

require_relative "../lib/sidekiq/cli"
require_relative "../lib/sidekiq/launcher"

if ENV["SIDEKIQ_REDIS_CLIENT"]
  Sidekiq::RedisConnection.adapter = :redis_client
end

Sidekiq.configure_server do |config|
  config.options[:concurrency] = 10
  config.redis = {db: 13, port: 6380}
  # config.redis = { db: 13, port: 6380, driver: :hiredis}
  config.options[:queues] << "default"
  config.logger.level = Logger::ERROR
  config.average_scheduled_poll_interval = 2
  config.reliable! if defined?(Sidekiq::Pro)
end

class LoadWorker
  include Sidekiq::Worker
  sidekiq_options retry: 1
  sidekiq_retry_in do |x|
    1
  end

  def perform(idx, ts = nil)
    puts(Time.now.to_f - ts) if !ts.nil?
    # raise idx.to_s if idx % 100 == 1
  end
end

# brew tap shopify/shopify
# brew install toxiproxy
# run `toxiproxy-server` in a separate terminal window.
require "toxiproxy"
# simulate a non-localhost network for realer-world conditions.
# adding 1ms of network latency has an ENORMOUS impact on benchmarks
Toxiproxy.populate([{
  name: "redis",
  listen: "127.0.0.1:6380",
  upstream: "127.0.0.1:6379"
}])

self_read, self_write = IO.pipe
%w[INT TERM TSTP TTIN].each do |sig|
  trap sig do
    self_write.puts(sig)
  end
rescue ArgumentError
  puts "Signal #{sig} not supported"
end

Sidekiq.redis { |c| c.flushdb }
def handle_signal(launcher, sig)
  Sidekiq.logger.debug "Got #{sig} signal"
  case sig
  when "INT"
    # Handle Ctrl-C in JRuby like MRI
    # http://jira.codehaus.org/browse/JRUBY-4637
    raise Interrupt
  when "TERM"
    # Heroku sends TERM and then waits 30 seconds for process to exit.
    raise Interrupt
  when "TSTP"
    Sidekiq.logger.info "Received TSTP, no longer accepting new work"
    launcher.quiet
  when "TTIN"
    Thread.list.each do |thread|
      Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}"
      if thread.backtrace
        Sidekiq.logger.warn thread.backtrace.join("\n")
      else
        Sidekiq.logger.warn "<no backtrace available>"
      end
    end
  end
end

def Process.rss
  `ps -o rss= -p #{Process.pid}`.chomp.to_i
end

iter = 10
count = 10_000

iter.times do
  arr = Array.new(count) { |idx| [idx] }
  Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr)
end
Sidekiq.logger.error "Created #{count * iter} jobs"

start = Time.now

Monitoring = Thread.new do
  while true
    sleep 0.2
    qsize = Sidekiq.redis do |conn|
      conn.llen "queue:default"
    end
    total = qsize
    # Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}")
    if total == 0
      Sidekiq.logger.error("Done, #{iter * count} jobs in #{Time.now - start} sec")
      Sidekiq.logger.error("Now here's the latency for three jobs")

      LoadWorker.perform_async(1, Time.now.to_f)
      LoadWorker.perform_async(2, Time.now.to_f)
      LoadWorker.perform_async(3, Time.now.to_f)

      sleep 0.2
      exit(0)
    end
  end
end

def with_latency(latency, &block)
  Sidekiq.logger.error "Simulating #{latency}ms of latency between Sidekiq and redis"
  if latency > 0
    Toxiproxy[:redis].downstream(:latency, latency: latency).apply(&block)
  else
    yield
  end
end

begin
  # RubyProf::exclude_threads = [ Monitoring ]
  # RubyProf.start
  events = Sidekiq.options[:lifecycle_events][:startup]
  events.each(&:call)
  events.clear
  
  with_latency(Integer(ENV.fetch("LATENCY", "1"))) do
    launcher = Sidekiq::Launcher.new(Sidekiq)
    launcher.run

    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(launcher, signal)
    end
  end
rescue SystemExit => e
  # Sidekiq.logger.error("Profiling...")
  # result = RubyProf.stop
  # printer = RubyProf::GraphHtmlPrinter.new(result)
  # printer.print(File.new("output.html", "w"), :min_percent => 1)
  # normal
rescue => e
  raise e if $DEBUG
  warn e.message
  warn e.backtrace.join("\n")
  exit 1
end
back to top