public class RW {
static int rsteps = 10; // reader thread length
static int wsteps = 3; // writer thread length
static int asteps = 9; // auxil. writer length
static int slots = 3; // number of reg. slots
static int dbase[] = new int[] {0, 9, 9, 9, 0}; // four-slot data base
static Monitor monitor = new Monitor ();
public static void main (String[] a) {
Reader r[] = new Reader[6]; // six readers
for( int j=0; j<6; j++ )
r[j] = new Reader (j+1);
Writer w1[] = new Writer[3]; // three writers of slot 1
for( int j=0; j<3; j++ )
w1[j] = new Writer (j+1, j+1, 1); // (): tid, value, slot
Writer w2[] = new Writer[3]; // three writers of slot 2
for( int j=0; j<3; j++ )
w2[j] = new Writer (j+4, j+4, 2); // (): tid, value, slot
Writer w3[] = new Writer[3]; // three writers of slot 3
for( int j=0; j<3; j++ )
w3[j] = new Writer (j+7, j+7, 3); // (): tid, value, slot
// modify this?
auxWriter aw1 = new auxWriter (1); // first aux. writer
auxWriter aw2 = new auxWriter (2); // second aux. writer
for( int j=0; j<3; j++ ) // start order
w1[j].start ();
for( int j=0; j<3; j++ ) {
w3[j].start ();
r[j].start ();
};
for( int j=0; j<3; j++ ) {
w2[j].start ();
r[j+3].start ();
};
aw1.start ();
aw2.start ();
System.out.println ("Six readers and ten writers started.");
for( int j=0; j<3; j++ ) { // wait for all
try { r[j].join (); r[j+3].join ();
w1[j].join (); w2[j].join (); w3[j].join (); }
catch (InterruptedException e) { };
};
try { aw1.join (); aw2.join (); }
catch (InterruptedException e) { };
System.out.println ("System terminates normally.");
}
}
class Monitor
{
private boolean writing = false; // Flag for top-level write access
private boolean reading = false; // Flag for top-level read access
private int[] slots = new int[RW.slots]; // Keeps track of which phase each slot is in
private int phases[] = new int[2]; // Keeps track of how many workers are in which phase (2 counters)
private boolean[] auxWrite = new boolean[2]; // Keeps track of which aux writer made an update (by tid),
// aux update is complete when both are true
public synchronized void request_read ()
{
// Check for top-level access on database (make sure no one is writing)
while (writing) {
try {
wait();
} catch (InterruptedException e) { }
}
reading = true;
}
public synchronized void end_read ()
{
// Reset all reader values
reading = false;
notifyAll();
}
public synchronized void request_write (int slot, int phase)
{
slot--;
// Make sure we're not reading, have no slots being update in
// the other phase, and that the current slot is in the right phase
while (reading || phases[phase%2] > 0 || slots[slot] >= phase) {
try {
wait();
} catch (InterruptedException e) { }
}
writing = true; // Set top level write permission flag
slots[slot] = phase; // Don't let any more workers access this slot
}
public synchronized void end_write (int phase)
{
phases[phase-1]++;
if (phases[phase-1] == 3) // All three slots updated
{
if (phase == 1) // Phase 1 ended
{
// Reset aux writer values and prepare for aux write
auxWrite = new boolean[2];
}
if (phase == 2) // Phase 2 ended (write complete)
{
// Reset all values and allow access
writing = false;
slots = new int[3];
phases[1] = 0;
}
}
notifyAll();
}
public synchronized void request_auxWrite (int tid)
{
tid--;
while (phases[0] != 3 || auxWrite[tid]) {
try {
wait();
} catch (InterruptedException e) { }
}
// Make sure we don't try to update with the same aux writer again
auxWrite[tid] = true;
}
public synchronized void end_auxWrite (int tid)
{
if (auxWrite[0] && auxWrite[1]) // Both aux updates complete
{
// Mark ourselves as out of phase 1
phases[0] = 0;
}
notifyAll();
}
}
class Reader extends Thread {
private int tid, value1, value2, value3, value4;
Reader (int tid1) {
tid = tid1;
}
public void run () {
System.out.println ("Reader " + tid + " begins execution.");
yield ();
for( int j=0; j<RW.rsteps; j++ ) {
RW.monitor.request_read ();
value1 = RW.dbase[1]; value2 = RW.dbase[2];
value3 = RW.dbase[3]; value4 = RW.dbase[4];
System.out.println ("Reader " + tid + " reads: | " + value1 + " | "
+ value2 + " | " + value3 + " | " + value4 + " |.");
RW.monitor.end_read ();
yield ();
}
System.out.println ("Reader " + tid + " terminates.");
}
}
class Writer extends Thread {
private int tid, value, thread_value, slot;
Writer (int tid1, int value1, int slot1) {
tid = tid1;
value = value1;
slot = slot1;
}
public void run () {
System.out.println ("Writer " + tid + ", assigned to slot " + slot
+ ", begins execution with seed value " + value + ".");
yield ();
for( int j=0; j<RW.wsteps; j++ ) {
RW.monitor.request_write (slot, 1);
if( slot == 3 )
yield ();
thread_value = 100 * tid + value;
RW.dbase[slot] = thread_value;
System.out.println ("Writer " + tid + " writes " + thread_value
+ " in slot " + slot + " in phase 1.");
RW.monitor.end_write (1);
thread_value = thread_value + 10;
RW.monitor.request_write (slot, 2);
RW.dbase[slot] = thread_value;
System.out.println ("Writer " + tid + " writes " + thread_value
+ " in slot " + slot + " in phase 2.");
if( slot == 1 )
yield ();
++value;
RW.monitor.end_write (2);
yield ();
}
System.out.println ("Writer " + tid + " terminates.");
}
}
class auxWriter extends Thread {
private int tid;
private int sum;
auxWriter (int tid1) {
tid = tid1;
}
public void run () {
System.out.println ("Auxiliary writer begins execution.");
yield ();
for( int j=0; j<RW.asteps; j++ ) {
RW.monitor.request_auxWrite (tid);
if (tid == 1)
{
sum = RW.dbase[1] + RW.dbase[3];
}
else if (tid == 2)
{
sum = RW.dbase[2];
}
RW.dbase[4] += sum;
System.out.println ("Auxiliary writer " + tid + " adds " + sum + " to slot 4. (Run " + j + ")");
RW.monitor.end_auxWrite (tid);
yield ();
}
System.out.println ("Auxiliary writer " + tid + " terminates.");
}
} Powered by
GeSHi Syntax Highlighting software.
Author of all (other) material unless otherwise specified:
Loren Segal. Copyright 2005.