Home

Awesome

Parallel

Gem Version Build Status

Run any code in parallel Processes(> use all CPUs), Threads(> speedup blocking operations), or Ractors(> use all CPUs).<br/> Best suited for map-reduce or e.g. parallel downloads/uploads.

Install

gem install parallel

Usage

# 2 CPUs -> work in 2 processes (a,b + c)
results = Parallel.map(['a','b','c']) do |one_letter|
  SomeClass.expensive_calculation(one_letter)
end

# 3 Processes -> finished after 1 run
results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| SomeClass.expensive_calculation(one_letter) }

# 3 Threads -> finished after 1 run
results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| SomeClass.expensive_calculation(one_letter) }

# 3 Ractors -> finished after 1 run
results = Parallel.map(['a','b','c'], in_ractors: 3, ractor: [SomeClass, :expensive_calculation])

Same can be done with each

Parallel.each(['a','b','c']) { |one_letter| ... }

or each_with_index, map_with_index, flat_map

Produce one item at a time with lambda (anything that responds to .call) or Queue.

items = [1,2,3]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }

Also supports any? or all?

Parallel.any?([1,2,3,4,5,6,7]) { |number| number == 4 }
# => true

Parallel.all?([1,2,nil,4,5]) { |number| number != nil }
# => false

Processes/Threads are workers, they grab the next piece of work when they finish.

Processes

Threads

Ractors

ActiveRecord

Connection Lost

# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, in_processes: 8) do |user|
  user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!

# maybe helps: explicitly use connection pool
Parallel.each(User.all, in_threads: 8) do |user|
  ActiveRecord::Base.connection_pool.with_connection do
    user.update_attribute(:some_attribute, some_value)
  end
end

# maybe helps: reconnect once inside every fork
Parallel.each(User.all, in_processes: 8) do |user|
  @reconnected ||= User.connection.reconnect! || true
  user.update_attribute(:some_attribute, some_value)
end

NameError: uninitialized constant

A race happens when ActiveRecord models are autoloaded inside parallel threads in environments that lazy-load, like development, test, or migrations.

To fix, autoloaded classes before the parallel block with either require '<modelname>' or ModelName.class.

Break

Parallel.map([1, 2, 3]) do |i|
  raise Parallel::Break # -> stops after all current items are finished
end
Parallel.map([1, 2, 3]) { |i| raise Parallel::Break, i if i == 2 } == 2

Kill

Only use if whatever is executing in the sub-command is safe to kill at any point

Parallel.map([1,2,3]) do |x|
  raise Parallel::Kill if x == 1# -> stop all sub-processes, killing them instantly
  sleep 100 # Do stuff
end

Progress / ETA

# gem install ruby-progressbar

Parallel.map(1..50, progress: "Doing stuff") { sleep 1 }

# Doing stuff | ETA: 00:00:02 | ====================               | Time: 00:00:10

Use :finish or :start hook to get progress information.

They are called on the main process and protected with a mutex. (To just get the index, use the more performant Parallel.each_with_index)

Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }

Set finish_in_order: true to call the :finish hook in the order of the input (will take longer to see initial output).

Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand }

Worker number

Use Parallel.worker_number to determine the worker slot in which your task is running.

Parallel.each(1..5, in_processes: 2) { |i| puts "Item: #{i}, Worker: #{Parallel.worker_number}" }
Item: 1, Worker: 1
Item: 2, Worker: 0
Item: 3, Worker: 1
Item: 4, Worker: 0
Item: 5, Worker: 1

Dynamically generating jobs

Example: wait for work to arrive or sleep

queue = []
Thread.new { loop { queue << rand(100); sleep 2 } } # job producer
Parallel.map(Proc.new { queue.pop }, in_processes: 3) { |f| f ? puts("#{f} received") : sleep(1) }

Tips

TODO

Authors

Contributors

Michael Grosser<br/> michael@grosser.it<br/> License: MIT<br/>