Class Bucket
- All Implemented Interfaces:
Serializable
Shortcut to the Constructor and Method Summaries.
Description
A bucket is a non-deterministic cousin of aBarrier
. A bucket is
somewhere to fallInto
when a process needs somewhere to park
itself. There is no limit on the number of processes that can fallInto
a bucket - and all are blocked when they do.
Release happens when a process (and it will have
to be another process) chooses to flush
that bucket.
When that happens, all processes in the bucket (which may be none) are
rescheduled for execution.
Buckets are a non-deterministic primitive, since the decision to flush is a free (internal) choice of the process concerned and the scheduling of that flush impacts on the semantics. Usually, only one process is given responsibility for flushing a bucket (or set of buckets). Flushing a bucket does not block the flusher.
Note: this notion of bucket corresponds to the BUCKET synchronisation primitive added to the KRoC occam language system.
Implementation Note
ThefallInto
and flush
methods of Bucket are just a re-badging of the
wait
and
notifyAll
methods of
Object
- but without the need to gain a monitor lock and
without the need to look out for the wait being interrupted.
Currently, though, this is how they are implemented. Beware that a notifyAll carries an O(n) overhead (where n is the number of processes being notified), since each notified process must regain the monitor lock before it can exit the synchronized region. Future JCSP implementations of Bucket will look to follow occam kernels and reduce the overheads of both fallInto and flush to O(1).
A Simple Example
This consists of 10 workers, one bucket and one flusher:
import org.jcsp.lang.*;
public class BucketExample1 {
public static void main (String[] args) {
final int nWorkers = 10;
final int second = 1000;
// JCSP timer units are milliseconds
final int interval = 5*second;
final int maxWork = 10*second;
final long seed = new CSTimer ().read ();
// for the random number generators
final Bucket bucket = new Bucket ();
final Flusher flusher = new Flusher (interval, bucket);
final Worker[] workers = new Worker[nWorkers];
for (int i = 0; i invalid input: '<' workers.length; i++) {
workers[i] = new Worker (i, i + seed, maxWork, bucket);
}
System.out.println ("*** Flusher: interval = " + interval
+ " milliseconds");
new Parallel (
new CSProcess[] {
flusher,
new Parallel (workers)
}
).run ();
}
}
A Worker cycle consists of one shift (which takes a random amount of time)
and, then, falling into the bucket:
import org.jcsp.lang.*; import java.util.*; public class Worker implements CSProcess { private final int id; private final long seed; private final int maxWork; private final Bucket bucket; public Worker (int id, long seed, int maxWork, Bucket bucket) { this.id = id; this.seed = seed; this.maxWork = maxWork; this.bucket = bucket; } public void run () { final Random random = new Random (seed); // each process gets a different seed final CSTimer tim = new CSTimer (); final int second = 1000; // JCSP timer units are milliseconds final String working = "\t... Worker " + id + " working ..."; final String falling = "\t\t\t ... Worker " + id + " falling ..."; final String flushed = "\t\t\t\t\t\t ... Worker " + id + " flushed ..."; while (true) { System.out.println (working); tim.sleep (random.nextInt (maxWork)); //These lines represent one unit of work System.out.println (falling); bucket.fallInto (); System.out.println (flushed); } } }The Flusher just flushes the bucket at preset time intervals:
import org.jcsp.lang.*; import java.util.*; public class Flusher implements CSProcess { private final int interval; private final Bucket bucket; public Flusher (int interval, Bucket bucket) { this.interval = interval; this.bucket = bucket; } public void run () { final CSTimer tim = new CSTimer (); long timeout = tim.read () + interval; while (true) { tim.after (timeout); System.out.println ("*** Flusher: about to flush ..."); final int n = bucket.flush (); System.out.println ("*** Flusher: number flushed = " + n); timeout += interval; } } }
The Flying Dingbats
This consists of many buckets, a single bucket keeper (responsible for flushing the buckets) and flock of Dingbats (who regularly fall into various buckets). Here is the system diagram:
import org.jcsp.lang.*;
public class BucketExample2 {
public static void main (String[] args) {
final int minDingbat = 2;
final int maxDingbat = 10;
final int nDingbats = (maxDingbat - minDingbat) + 1;
final int nBuckets = 2*maxDingbat;
final Bucket[] bucket = Bucket.create (nBuckets);
final int second = 1000;
// JCSP timer units are milliseconds
final int tick = second;
final BucketKeeper bucketKeeper = new BucketKeeper (tick, bucket);
final Dingbat[] dingbats = new Dingbat[nDingbats];
for (int i = 0; i invalid input: '<' dingbats.length; i++) {
dingbats[i] = new Dingbat (i + minDingbat, bucket);
}
new Parallel (
new CSProcess[] {
bucketKeeper,
new Parallel (dingbats)
}
).run ();
}
}
The BucketKeeper keeps time, flushing buckets in sequence at a steady rate.
When the last one has been flushed, it starts again with the first:
import org.jcsp.lang.*;
class BucketKeeper implements CSProcess {
private final long interval;
private final Bucket[] bucket;
public BucketKeeper (long interval, Bucket[] bucket) {
this.interval = interval;
this.bucket = bucket;
}
public void run () {
String[] spacer = new String[bucket.length];
spacer[0] = "";
for (int i = 1; i invalid input: '<' spacer.length; i++) {
spacer[i] = spacer[i - 1] + " ";
}
final CSTimer tim = new CSTimer ();
long timeout = tim.read ();
int index = 0;
while (true) {
final int n = bucket[index].flush ();
if (n == 0) {
System.out.println (spacer[index] + "*** bucket " +
index + " was empty ...");
}
index = (index + 1) % bucket.length;
timeout += interval;
tim.after (timeout);
}
}
}
So the buckets represent time values. A process falling into one of them will sleep
until the prescribed time when the BucketKeeper next flushes it.
Dingbats live the following cycle. First, they do some work (rather brief in the following code). Then, they work out which bucket to fall into and fall into it - that is all. In this case, Dingbats just fly on id buckets from whence they were just flushed (where id is the Dingbat number indicated in the above network diagram):
import org.jcsp.lang.*; public class Dingbat implements CSProcess { private final int id; private final Bucket[] bucket; public Dingbat (int id, Bucket[] bucket) { this.id = id; this.bucket = bucket; } public void run () { int logicalTime = 0; String[] spacer = new String[bucket.length]; spacer[0] = ""; for (int i = 1; i invalid input: '<' spacer.length; i++) { spacer[i] = spacer[i - 1] + " "; } String message = "Hello world from " + id + " ==> time = "; while (true) { logicalTime += id; final int slot = logicalTime % bucket.length; // assume: id invalid input: '<'= bucket.length bucket[slot].fallInto (); System.out.println (spacer[slot] + message + logicalTime); // one unit of work } } }
Danger - Race Hazard
This example contains a race hazard whose elimination is left as an exercise for the reader. The problem is to ensure that all flushed Dingbats have settled in their next chosen buckets before the BucketKeeper next flushes it. If a Dingbat is a bit slow, it may fall into its chosen bucket too late - the BucketKeeper has already flushed it and the creature will have to remain there until the next cycle.With the rate of flushing used in the above system, this is unlikely to happen. But it is just possible - if something suspended execution of the system for a few seconds immediately following a flush, then the BucketKeeper could be rescheduled before the flying Dingbats.
Acknowledgement
This example is from a discrete event modelling approach due to Jon Kerridge (Napier University, Scotland). The Dingbats could easilly model their own timeouts for themselves. However, setting a timeout is an O(n) operation (where n is the number of processes setting them). Here, there is only one process setting timeouts (the BucketKeeper) and the bucket operations fallInto and flush have O(1) costs (at least, that is the case for occam).Of course, removal of the above race hazard means that the timeout by the BucketKeeper can also be eliminated. The buckets can be flushed just as soon as it knows that the previously flushed Dingbats have settled. In this way, the event model can proceed at full speed, maintaining correct simulated time - each bucket representing one time unit - without needing any timeouts in the simulation itself.
- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate int
Barrier uses an even/odd flag because the barrier cannot sync without every process Bucket can happily keep working while old processes are waiting around, so a flag is not enough Instead, a count must be used.private final Object
The monitor lock used for synchronizationprivate int
The number of processes currently enrolled on this bucket. -
Constructor Summary
Constructors -
Method Summary
-
Field Details
-
nHolding
private int nHoldingThe number of processes currently enrolled on this bucket. -
bucketLock
The monitor lock used for synchronization -
bucketCycle
private int bucketCycleBarrier uses an even/odd flag because the barrier cannot sync without every process Bucket can happily keep working while old processes are waiting around, so a flag is not enough Instead, a count must be used. Theoretically this is unsafe, but the likelihood of the bucket completing 4 *billion* cycles before the process wakes up is somewhat slim.
-
-
Constructor Details
-
Bucket
public Bucket()
-
-
Method Details
-
fallInto
public void fallInto()Fall into the bucket. The process doing this will be blocked until the nextflush()
. -
flush
public int flush()Flush the bucket. All held processes will be released. It returns the number that were released.- Returns:
- the number of processes flushed.
-
holding
public int holding()This returns the number of processes currently held in the bucket. Note that this number is volatile - for information only! By the time the invoker of this method receives it, it might have changed (because of further processes falling into the bucket or someone flushing it).- Returns:
- the number of processes currently held in the bucket.
-
create
Creates an array of Buckets.- Parameters:
n
- the number of Buckets to create in the array- Returns:
- the array of Buckets
-