Class MpscRelaxedArrayQueue<E>

All Implemented Interfaces:
Iterable<E>, Collection<E>, Queue<E>, MessagePassingQueue<E>

public class MpscRelaxedArrayQueue<E> extends MpscRelaxedArrayQueueL4Pad<E> implements MessagePassingQueue<E>
This class is still work in progress, please do not pick up for production use just yet.
  • Field Details

    • mask

      private final long mask
      Note on terminology: - position/id: overall progress indicator, not an array index or offset at which to lookup/write. - index: for looking up within an array (including the inlined producerCycleClaim array) - offset: for pointer like access using Unsafe The producer in this queue operates on cycleId and the producerCycleClaim array: - The cycleId grow monotonically, and the parity bit (cycleIndex) indicated which claim to use - The producerCycleClaim indicate position in a cycle as well as the originating cycleId. From a claim we can calculate the producer overall position as well as the position within a cycle. The buffer is split into 2 cycles (matching cycleIndex 0 and 1), allowing the above indicators to control producer progress on separate counters while maintaining the appearance of a contiguous buffer to the consumer.
    • cycleLength

      private final int cycleLength
    • cycleLengthLog2

      private final int cycleLengthLog2
    • buffer

      private final E[] buffer
    • positionWithinCycleMask

      private final int positionWithinCycleMask
    • cycleIdBitShift

      private final int cycleIdBitShift
    • maxCycleId

      private final long maxCycleId
  • Constructor Details

    • MpscRelaxedArrayQueue

      public MpscRelaxedArrayQueue(int capacity)
  • Method Details

    • iterator

      public Iterator<E> iterator()
      Specified by:
      iterator in interface Collection<E>
      Specified by:
      iterator in interface Iterable<E>
      Specified by:
      iterator in class AbstractCollection<E>
    • offer

      public boolean offer(E e)
      Description copied from interface: MessagePassingQueue
      Called from a producer thread subject to the restrictions appropriate to the implementation and according to the Queue.offer(Object) interface.
      Specified by:
      offer in interface MessagePassingQueue<E>
      Specified by:
      offer in interface Queue<E>
      Parameters:
      e - not null, will throw NPE if it is
      Returns:
      true if element was inserted into the queue, false iff full
    • isFull

      private boolean isFull(long producerPosition)
      Given the nature of getAndAdd progress on producerPosition and given the potential risk for over claiming it is quite possible for this method to report a queue which is not full as full.
    • rotateCycle

      private void rotateCycle(long claimCycleId, int cycleIdBitShift, long maxCycleId)
    • detectSlowRotation

      private long detectSlowRotation(long claimCycleId, long nextCycleId)
    • validateProducerClaim

      private boolean validateProducerClaim(int activeCycleIndex, long producerCycleClaim, long cycleId, int positionOnCycle, int cycleLengthLog2, boolean slowProducer)
      Validate a producer claim to find out if is an overclaim (beyond the producer limit).
      Returns:
      true if the claim is valid, false otherwise.
    • fixProducerOverClaim

      private boolean fixProducerOverClaim(int activeCycleIndex, long producerCycleClaim, boolean slowProducer)
      It tries to fix a producer overclaim.
      Returns:
      true if the claim is now safe to be used,false otherwise and is needed to retry the claim.
    • validateSlowProducerOverClaim

      private void validateSlowProducerOverClaim(int activeCycleIndex, long producerCycleClaim)
      Validates a slow producer over-claim throwing IllegalStateException if the offer on it can't continue.
    • soCycleElement

      private void soCycleElement(E[] buffer, E e, int activeCycleIndex, int positionWithinCycle, int cycleLengthLog2)
    • poll

      public E poll()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.poll() interface.
      Specified by:
      poll in interface MessagePassingQueue<E>
      Specified by:
      poll in interface Queue<E>
      Returns:
      a message from the queue if one is available, null iff empty
    • signalConsumerProgress

      private void signalConsumerProgress(long consumerPosition, E[] buffer, long offset)
    • pollSlowPath

      private E pollSlowPath(E[] buffer, long offset, long consumerPosition)
    • peek

      public E peek()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.peek() interface.
      Specified by:
      peek in interface MessagePassingQueue<E>
      Specified by:
      peek in interface Queue<E>
      Returns:
      a message from the queue if one is available, null iff empty
    • peekSlowPath

      private E peekSlowPath(E[] buffer, long consumerPosition, long offset)
    • spinForElement

      private E spinForElement(E[] buffer, long offset)
    • size

      public int size()
      Description copied from interface: MessagePassingQueue
      This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
      Specified by:
      size in interface Collection<E>
      Specified by:
      size in interface MessagePassingQueue<E>
      Specified by:
      size in class AbstractCollection<E>
      Returns:
      number of messages in the queue, between 0 and Integer.MAX_VALUE but less or equals to capacity (if bounded).
    • clear

      public void clear()
      Description copied from interface: MessagePassingQueue
      Removes all items from the queue. Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Collection.clear() interface.
      Specified by:
      clear in interface Collection<E>
      Specified by:
      clear in interface MessagePassingQueue<E>
      Overrides:
      clear in class AbstractQueue<E>
    • isEmpty

      public boolean isEmpty()
      Description copied from interface: MessagePassingQueue
      This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
      Specified by:
      isEmpty in interface Collection<E>
      Specified by:
      isEmpty in interface MessagePassingQueue<E>
      Overrides:
      isEmpty in class AbstractCollection<E>
      Returns:
      true if empty, false otherwise
    • capacity

      public int capacity()
      Specified by:
      capacity in interface MessagePassingQueue<E>
      Returns:
      the capacity of this queue or MessagePassingQueue.UNBOUNDED_CAPACITY if not bounded
    • relaxedOffer

      public boolean relaxedOffer(E e)
      Description copied from interface: MessagePassingQueue
      Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.offer(Object) this method may return false without the queue being full.
      Specified by:
      relaxedOffer in interface MessagePassingQueue<E>
      Parameters:
      e - not null, will throw NPE if it is
      Returns:
      true if element was inserted into the queue, false if unable to offer
    • relaxedPoll

      public E relaxedPoll()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.poll() this method may return null without the queue being empty.
      Specified by:
      relaxedPoll in interface MessagePassingQueue<E>
      Returns:
      a message from the queue if one is available, null if unable to poll
    • relaxedPeek

      public E relaxedPeek()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.peek() this method may return null without the queue being empty.
      Specified by:
      relaxedPeek in interface MessagePassingQueue<E>
      Returns:
      a message from the queue if one is available, null if unable to peek
    • drain

      public int drain(MessagePassingQueue.Consumer<E> c)
      Description copied from interface: MessagePassingQueue
      Remove all available item from the queue and hand to consume. This should be semantically similar to:
       M m;
       while((m = relaxedPoll()) != null){
       c.accept(m);
       }
       
      There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
      Returns:
      the number of polled elements
    • fill

      public int fill(MessagePassingQueue.Supplier<E> s)
      Description copied from interface: MessagePassingQueue
      Stuff the queue with elements from the supplier. Semantically similar to:
       while(relaxedOffer(s.get());
       
      There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation.

      Unbounded queues will fill up the queue with a fixed amount rather than fill up to oblivion. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
      Returns:
      the number of offered elements
    • drain

      public int drain(MessagePassingQueue.Consumer<E> c, int limit)
      Description copied from interface: MessagePassingQueue
      Remove up to limit elements from the queue and hand to consume. This should be semantically similar to:

      
         M m;
         int i = 0;
         for(;i < limit && (m = relaxedPoll()) != null; i++){
           c.accept(m);
         }
         return i;
       

      There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
      Returns:
      the number of polled elements
    • fill

      public int fill(MessagePassingQueue.Supplier<E> s, int limit)
      Description copied from interface: MessagePassingQueue
      Stuff the queue with up to limit elements from the supplier. Semantically similar to:

      
         for(int i=0; i < limit && relaxedOffer(s.get()); i++);
       

      There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
      Returns:
      the number of offered elements
    • drain

      Description copied from interface: MessagePassingQueue
      Remove elements from the queue and hand to consume forever. Semantically similar to:

        int idleCounter = 0;
        while (exit.keepRunning()) {
            E e = relaxedPoll();
            if(e==null){
                idleCounter = wait.idle(idleCounter);
                continue;
            }
            idleCounter = 0;
            c.accept(e);
        }
       

      Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
    • fill

      Description copied from interface: MessagePassingQueue
      Stuff the queue with elements from the supplier forever. Semantically similar to:

       
        int idleCounter = 0;
        while (exit.keepRunning()) {
            E e = s.get();
            while (!relaxedOffer(e)) {
                idleCounter = wait.idle(idleCounter);
                continue;
            }
            idleCounter = 0;
        }
       
       

      Called from a producer thread subject to the restrictions appropriate to the implementation. The main difference being that implementors MUST assure room in the queue is available BEFORE calling MessagePassingQueue.Supplier.get(). WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
    • positionWithinCycle

      private static int positionWithinCycle(long producerCycleClaim, int positionOnCycleMask)
    • producerClaimCycleId

      private static long producerClaimCycleId(long producerCycleClaim, int cycleIdBitShift)
    • producerPositionFromClaim

      private static long producerPositionFromClaim(long producerCycleClaim, int positionOnCycleMask, int cycleIdBitShift, int cycleLengthLog2)
    • producerPosition

      private static long producerPosition(int positionWithinCycle, long cycleId, int cycleLengthLog2)
      Convert position in cycle and cycleId into a producer position (monotonically increasing reflection of offers that is comparable with the consumerPosition to determine size/empty/full)
    • calcElementIndexInBuffer

      private static int calcElementIndexInBuffer(int positionWithinCycle, int cycleIndex, int cycleLengthLog2)
      Convert [position within cycle, cycleIndex] to index in buffer.
    • toString

      public String toString()
      Overrides:
      toString in class AbstractCollection<E>