// Concurrent Programming - CS 598
// Summer 2003 - Producer-Consumer using Exchanges.
class Buffer {
// The one-reference rule is in effect for buffers, so no synchronization is
// needed.
public boolean full;
}
class Consumer
extends ProducerConsumerTemplate {
Consumer(Exchanger e) {
super(e);
}
protected void
postExchangeOperation(Buffer b) {
System.out.print("c");
assert b.full;
// Process the full buffer.
raggedSleep();
b.full = false;
}
protected void
preExchangeOperation(Buffer b) {
// Do nothing.
}
}
class Exchanger {
private Object fst = null, snd = null;
private boolean interrupted = false;
synchronized Object
exchange(Object o)
throws InterruptedException {
assert o != null;
if (Thread.currentThread().isInterrupted()) {
interrupted = true;
notify();
throw new InterruptedException();
}
if (interrupted)
throw new InterruptedException();
Object obj = null;
if (fst == null) {
fst = o;
try { while (snd == null) wait(); }
catch (InterruptedException ie) {
interrupted = true;
throw ie;
}
if (interrupted)
throw new InterruptedException();
obj = snd;
snd = null;
}
else {
obj = fst;
fst = null;
snd = o;
notify();
}
return obj;
}
}
abstract class ProducerConsumerTemplate
implements Runnable {
private final Exchanger exch;
ProducerConsumerTemplate(Exchanger e) {
exch = e;
}
protected abstract void postExchangeOperation(Buffer b);
protected abstract void preExchangeOperation(Buffer b);
protected final void
raggedSleep() {
try { Thread.sleep(rint(20) + 1); }
catch (InterruptedException e) { }
if (rint(100) < 10)
Thread.currentThread().interrupt();
}
private long rint(int max) {
assert (0 < max) && (max < 1000);
return (long) ((Math.random()*1000) % max);
}
public void
run() {
Buffer buff = new Buffer();
while (true) {
preExchangeOperation(buff);
try { buff = (Buffer) exch.exchange(buff); }
catch (InterruptedException ie) { break; }
postExchangeOperation(buff);
}
buff = null;
}
}
class Producer
extends ProducerConsumerTemplate {
Producer(Exchanger e) {
super(e);
}
protected void
postExchangeOperation(Buffer b) {
// Do nothing.
}
protected void
preExchangeOperation(Buffer b) {
assert !b.full;
// Fill the empty buffer.
raggedSleep();
b.full = true;
System.out.print("p");
}
}
class ProducerConsumerExchanger {
public static void main(String[] args) {
final Exchanger exch = new Exchanger();
(new Thread(new Producer(exch))).start();
(new Thread(new Consumer(exch))).start();
}
}
// $Log:$
syntax highlighted by Code2HTML, v. 0.9