class Parallel::JobFactory
Public Class Methods
Source
# File lib/parallel.rb, line 90 def initialize(source, mutex) @lambda = (source.respond_to?(:call) && source) || queue_wrapper(source) @source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array @mutex = mutex @index = -1 @stopped = false end
Public Instance Methods
Source
# File lib/parallel.rb, line 98 def next if producer? # - index and item stay in sync # - do not call lambda after it has returned Stop item, index = @mutex.synchronize do return if @stopped item = @lambda.call @stopped = (item == Parallel::Stop) return if @stopped [item, @index += 1] end else index = @mutex.synchronize { @index += 1 } return if index >= size item = @source[index] end [item, index] end
Source
# File lib/parallel.rb, line 127 def pack(item, index) producer? ? [item, index] : index end
generate item that is sent to workers just index is faster + less likely to blow up with unserializable errors
Source
# File lib/parallel.rb, line 117 def size if producer? Float::INFINITY else @source.size end end
Source
# File lib/parallel.rb, line 132 def unpack(data) producer? ? data : [@source[data], data] end
unpack item that is sent to workers
Private Instance Methods
Source
# File lib/parallel.rb, line 142 def queue_wrapper(array) array.respond_to?(:num_waiting) && array.respond_to?(:pop) && lambda { array.pop(false) } end