Class MpscRelaxedAtomicArrayQueue<E>

All Implemented Interfaces:
Iterable<E>, Collection<E>, Queue<E>, MessagePassingQueue<E>

public final class MpscRelaxedAtomicArrayQueue<E> extends MpscRelaxedAtomicArrayQueueL3Pad<E> implements MessagePassingQueue<E>
  • Field Details

    • mask

      private final long mask
    • capacity

      private final int capacity
    • cycleLength

      private final int cycleLength
    • cycleLengthLog2

      private final int cycleLengthLog2
    • buffer

      private final AtomicReferenceArray<E> buffer
    • producerCycleClaim

      private final AtomicLongArray producerCycleClaim
    • positionOnCycleMask

      private final int positionOnCycleMask
    • cycleIdBitShift

      private final int cycleIdBitShift
    • maxCycleId

      private final long maxCycleId
  • Constructor Details

    • MpscRelaxedAtomicArrayQueue

      public MpscRelaxedAtomicArrayQueue(int capacity)
  • Method Details

    • iterator

      public Iterator<E> iterator()
      Specified by:
      iterator in interface Collection<E>
      Specified by:
      iterator in interface Iterable<E>
      Specified by:
      iterator in class AbstractCollection<E>
    • positionOnCycle

      private static int positionOnCycle(long producerCycleClaim, int positionOnCycleMask)
    • cycleId

      private static long cycleId(long producerCycleClaim, int cycleIdBitShift)
    • producerPosition

      private static long producerPosition(long cycleId, int positionWithinCycle, int cycleLengthLog2)
    • calcElementOffset

      private static int calcElementOffset(int cycle, int positionWithinCycle, int cycleLengthLog2)
    • calcElementOffset

      private static int calcElementOffset(long position, long mask)
    • isBackPressured

      private boolean isBackPressured(long producerPosition)
    • rotateCycle

      private void rotateCycle(AtomicLongArray producerCycleClaims, int activeCycle, long producerCycleClaim, int cycleIdBitShift, long maxCycleId)
    • validateSlowProducerClaim

      private void validateSlowProducerClaim(long cycleId, int positionOnCycle, int cycleLengthLog2)
    • offer

      public boolean offer(E e)
      Description copied from interface: MessagePassingQueue
      Called from a producer thread subject to the restrictions appropriate to the implementation and according to the Queue.offer(Object) interface.
      Specified by:
      offer in interface MessagePassingQueue<E>
      Specified by:
      offer in interface Queue<E>
      Parameters:
      e - not null, will throw NPE if it is
      Returns:
      true if element was inserted into the queue, false iff full
    • poll

      public E poll()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.poll() interface.
      Specified by:
      poll in interface MessagePassingQueue<E>
      Specified by:
      poll in interface Queue<E>
      Returns:
      a message from the queue if one is available, null iff empty
    • pollMaybeEmpty

      private E pollMaybeEmpty(AtomicReferenceArray<E> buffer, int offset, long consumerPosition)
    • peek

      public E peek()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.peek() interface.
      Specified by:
      peek in interface MessagePassingQueue<E>
      Specified by:
      peek in interface Queue<E>
      Returns:
      a message from the queue if one is available, null iff empty
    • size

      public int size()
      Description copied from interface: MessagePassingQueue
      This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
      Specified by:
      size in interface Collection<E>
      Specified by:
      size in interface MessagePassingQueue<E>
      Specified by:
      size in class AbstractCollection<E>
      Returns:
      number of messages in the queue, between 0 and Integer.MAX_VALUE but less or equals to capacity (if bounded).
    • clear

      public void clear()
      Description copied from interface: MessagePassingQueue
      Removes all items from the queue. Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Collection.clear() interface.
      Specified by:
      clear in interface Collection<E>
      Specified by:
      clear in interface MessagePassingQueue<E>
      Overrides:
      clear in class AbstractQueue<E>
    • isEmpty

      public boolean isEmpty()
      Description copied from interface: MessagePassingQueue
      This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
      Specified by:
      isEmpty in interface Collection<E>
      Specified by:
      isEmpty in interface MessagePassingQueue<E>
      Overrides:
      isEmpty in class AbstractCollection<E>
      Returns:
      true if empty, false otherwise
    • capacity

      public int capacity()
      Specified by:
      capacity in interface MessagePassingQueue<E>
      Returns:
      the capacity of this queue or MessagePassingQueue.UNBOUNDED_CAPACITY if not bounded
    • relaxedOffer

      public boolean relaxedOffer(E e)
      Description copied from interface: MessagePassingQueue
      Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.offer(Object) this method may return false without the queue being full.
      Specified by:
      relaxedOffer in interface MessagePassingQueue<E>
      Parameters:
      e - not null, will throw NPE if it is
      Returns:
      true if element was inserted into the queue, false if unable to offer
    • relaxedPoll

      public E relaxedPoll()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.poll() this method may return null without the queue being empty.
      Specified by:
      relaxedPoll in interface MessagePassingQueue<E>
      Returns:
      a message from the queue if one is available, null if unable to poll
    • relaxedPeek

      public E relaxedPeek()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.peek() this method may return null without the queue being empty.
      Specified by:
      relaxedPeek in interface MessagePassingQueue<E>
      Returns:
      a message from the queue if one is available, null if unable to peek
    • drain

      public int drain(MessagePassingQueue.Consumer<E> c)
      Description copied from interface: MessagePassingQueue
      Remove all available item from the queue and hand to consume. This should be semantically similar to:
       M m;
       while((m = relaxedPoll()) != null){
       c.accept(m);
       }
       
      There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
      Returns:
      the number of polled elements
    • fill

      public int fill(MessagePassingQueue.Supplier<E> s)
      Description copied from interface: MessagePassingQueue
      Stuff the queue with elements from the supplier. Semantically similar to:
       while(relaxedOffer(s.get());
       
      There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation.

      Unbounded queues will fill up the queue with a fixed amount rather than fill up to oblivion. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
      Returns:
      the number of offered elements
    • drain

      public int drain(MessagePassingQueue.Consumer<E> c, int limit)
      Description copied from interface: MessagePassingQueue
      Remove up to limit elements from the queue and hand to consume. This should be semantically similar to:

      
         M m;
         int i = 0;
         for(;i < limit && (m = relaxedPoll()) != null; i++){
           c.accept(m);
         }
         return i;
       

      There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
      Returns:
      the number of polled elements
    • fill

      public int fill(MessagePassingQueue.Supplier<E> s, int limit)
      Description copied from interface: MessagePassingQueue
      Stuff the queue with up to limit elements from the supplier. Semantically similar to:

      
         for(int i=0; i < limit && relaxedOffer(s.get()); i++);
       

      There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
      Returns:
      the number of offered elements
    • drain

      Description copied from interface: MessagePassingQueue
      Remove elements from the queue and hand to consume forever. Semantically similar to:

        int idleCounter = 0;
        while (exit.keepRunning()) {
            E e = relaxedPoll();
            if(e==null){
                idleCounter = wait.idle(idleCounter);
                continue;
            }
            idleCounter = 0;
            c.accept(e);
        }
       

      Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
    • fill

      Description copied from interface: MessagePassingQueue
      Stuff the queue with elements from the supplier forever. Semantically similar to:

       
        int idleCounter = 0;
        while (exit.keepRunning()) {
            E e = s.get();
            while (!relaxedOffer(e)) {
                idleCounter = wait.idle(idleCounter);
                continue;
            }
            idleCounter = 0;
        }
       
       

      Called from a producer thread subject to the restrictions appropriate to the implementation. The main difference being that implementors MUST assure room in the queue is available BEFORE calling MessagePassingQueue.Supplier.get(). WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
    • toString

      public String toString()
      Overrides:
      toString in class AbstractCollection<E>