class Concurrent::RubyThreadPoolExecutor

@!macro thread_pool_executor @!macro thread_pool_options @!visibility private

Constants

DEFAULT_MAX_POOL_SIZE

@!macro thread_pool_executor_constant_default_max_pool_size

DEFAULT_MAX_QUEUE_SIZE

@!macro thread_pool_executor_constant_default_max_queue_size

DEFAULT_MIN_POOL_SIZE

@!macro thread_pool_executor_constant_default_min_pool_size

DEFAULT_THREAD_IDLETIMEOUT

@!macro thread_pool_executor_constant_default_thread_timeout

Attributes

idletime[R]

@!macro thread_pool_executor_attr_reader_idletime

max_length[R]

@!macro thread_pool_executor_attr_reader_max_length

max_queue[R]

@!macro thread_pool_executor_attr_reader_max_queue

min_length[R]

@!macro thread_pool_executor_attr_reader_min_length

Public Class Methods

new(opts = {}) click to toggle source

@!macro thread_pool_executor_method_initialize

Calls superclass method Concurrent::RubyExecutorService.new
# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 39
def initialize(opts = {})
  super(opts)
end

Public Instance Methods

can_overflow?() click to toggle source

@!macro executor_service_method_can_overflow_question

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 59
def can_overflow?
  synchronize { ns_limited_queue? }
end
completed_task_count() click to toggle source

@!macro thread_pool_executor_attr_reader_completed_task_count

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 54
def completed_task_count
  synchronize { @completed_task_count }
end
largest_length() click to toggle source

@!macro thread_pool_executor_attr_reader_largest_length

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 44
def largest_length
  synchronize { @largest_length }
end
length() click to toggle source

@!macro thread_pool_executor_attr_reader_length

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 64
def length
  synchronize { @pool.length }
end
queue_length() click to toggle source

@!macro thread_pool_executor_attr_reader_queue_length

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 69
def queue_length
  synchronize { @queue.length }
end
ready_worker(worker) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 90
def ready_worker(worker)
  synchronize { ns_ready_worker worker }
end
remaining_capacity() click to toggle source

@!macro thread_pool_executor_attr_reader_remaining_capacity

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 74
def remaining_capacity
  synchronize do
    if ns_limited_queue?
      @max_queue - @queue.length
    else
      -1
    end
  end
end
remove_busy_worker(worker) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 85
def remove_busy_worker(worker)
  synchronize { ns_remove_busy_worker worker }
end
scheduled_task_count() click to toggle source

@!macro thread_pool_executor_attr_reader_scheduled_task_count

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 49
def scheduled_task_count
  synchronize { @scheduled_task_count }
end
worker_died(worker) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 100
def worker_died(worker)
  synchronize { ns_worker_died worker }
end
worker_not_old_enough(worker) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 95
def worker_not_old_enough(worker)
  synchronize { ns_worker_not_old_enough worker }
end
worker_task_completed() click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 105
def worker_task_completed
  synchronize { @completed_task_count += 1 }
end

Private Instance Methods

ns_add_busy_worker() click to toggle source

creates new worker which has to receive work to do after it's added @return [nil, Worker] nil of max capacity is reached

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 224
def ns_add_busy_worker
  return if @pool.size >= @max_length

  @pool << (worker = Worker.new(self))
  @largest_length = @pool.length if @pool.length > @largest_length
  worker
end
ns_assign_worker(*args, &task) click to toggle source

tries to assign task to a worker, tries to get one from @ready or to create new one @return [true, false] if task is assigned to a worker

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 186
def ns_assign_worker(*args, &task)
  # keep growing if the pool is not at the minimum yet
  worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
  if worker
    worker << [task, args]
    true
  else
    false
  end
rescue ThreadError
  # Raised when the operating system refuses to create the new thread
  return false
end
ns_enqueue(*args, &task) click to toggle source

tries to enqueue task @return [true, false] if enqueued

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 204
def ns_enqueue(*args, &task)
  if !ns_limited_queue? || @queue.size < @max_queue
    @queue << [task, args]
    true
  else
    false
  end
end
ns_execute(*args, &task) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 146
def ns_execute(*args, &task)
  ns_reset_if_forked

  if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
    @scheduled_task_count += 1
  else
    handle_fallback(*args, &task)
  end

  ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
end
ns_initialize(opts) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 112
def ns_initialize(opts)
  @min_length      = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length      = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime        = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @fallback_policy = opts.fetch(:fallback_policy, :abort)
  raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)

  raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
  raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
  raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
  raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length

  self.auto_terminate = opts.fetch(:auto_terminate, true)

  @pool                 = [] # all workers
  @ready                = [] # used as a stash (most idle worker is at the start)
  @queue                = [] # used as queue
  # @ready or @queue is empty at all times
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0
  @ruby_pid             = $$ # detects if Ruby has forked

  @gc_interval  = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
  @next_gc_time = Concurrent.monotonic_time + @gc_interval
end
ns_kill_execution() click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 174
def ns_kill_execution
  # TODO log out unprocessed tasks in queue
  # TODO try to shutdown first?
  @pool.each(&:kill)
  @pool.clear
  @ready.clear
end
ns_limited_queue?() click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 141
def ns_limited_queue?
  @max_queue != 0
end
ns_prune_pool() click to toggle source

try oldest worker if it is idle for enough time, it's returned back at the start

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 270
def ns_prune_pool
  return if @pool.size <= @min_length

  last_used = @ready.shift
  last_used << :idle_test if last_used

  @next_gc_time = Concurrent.monotonic_time + @gc_interval
end
ns_ready_worker(worker, success = true) click to toggle source

handle ready worker, giving it new job or assigning back to @ready

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 235
def ns_ready_worker(worker, success = true)
  task_and_args = @queue.shift
  if task_and_args
    worker << task_and_args
  else
    # stop workers when !running?, do not return them to @ready
    if running?
      @ready.push(worker)
    else
      worker.stop
    end
  end
end
ns_remove_busy_worker(worker) click to toggle source

removes a worker which is not in not tracked in @ready

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 261
def ns_remove_busy_worker(worker)
  @pool.delete(worker)
  stopped_event.set if @pool.empty? && !running?
  true
end
ns_reset_if_forked() click to toggle source
# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 279
def ns_reset_if_forked
  if $$ != @ruby_pid
    @queue.clear
    @ready.clear
    @pool.clear
    @scheduled_task_count = 0
    @completed_task_count = 0
    @largest_length       = 0
    @ruby_pid             = $$
  end
end
ns_shutdown_execution() click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 159
def ns_shutdown_execution
  ns_reset_if_forked

  if @pool.empty?
    # nothing to do
    stopped_event.set
  end

  if @queue.empty?
    # no more tasks will be accepted, just stop all workers
    @pool.each(&:stop)
  end
end
ns_worker_died(worker) click to toggle source

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 214
def ns_worker_died(worker)
  ns_remove_busy_worker worker
  replacement_worker = ns_add_busy_worker
  ns_ready_worker replacement_worker, false if replacement_worker
end
ns_worker_not_old_enough(worker) click to toggle source

returns back worker to @ready which was not idle for enough time

@!visibility private

# File lib/concurrent/executor/ruby_thread_pool_executor.rb, line 252
def ns_worker_not_old_enough(worker)
  # let's put workers coming from idle_test back to the start (as the oldest worker)
  @ready.unshift(worker)
  true
end