|
At its simplest, message passing (communication) comprises one process that is designed to send out a message, and another that is designed to receives it. Message passing using JavaSpaces is described as being loosely coupled in that senders and receivers communicate indirectly through the space. (The opposite of loose coupling is tight coupling.)
Channels (or streams) are "information conduits" that take a series of messages.
Synchronsisation is, of course, important when dealing with distributed systems that use some limitted but shared number of units of resource. Processes requiring these resources use these units if they are availble, if not such processes should wait till an appropriate unit becomes avaialble.
Semaphores are a mechansim for controlling units of resource. A semaphore, at its simplest, is an integer counter that represents the count of available units of resource. It is controlled by two operations:
A disadvantage of semaphores is that some processes, waiting for a resource, may be significantly less efficient at obtaining resources than others. In the worst case a process may entirely fail to obtain any resources --- a problem known as startvation. Two possible solutions to the startvation problem are:
A barrier is a point in a distributed computation which eavh process must reach prior to continuing on to the next phase of the computation. Typically a barrier comprises a shared variable which must equal the total number of processes before the next phase can be commenced. The variable is initialised with the value 0, as each process completes the first phase the varibale is incremebted.
When an entry is written into space it is serialized, i.e. it is turned into a series of bytes. Similarly when an entry is read from a space it is deserialized. There is thus a serialization overhead associated with writing, reading and taking entries.
If we are repeatedly reading the same entry then we can use the snapshot method to reduce this overhead. Given the HelloWorld entey shown in Table 1, we can write an application class as shown in Table 2.
We will illustrate the above by creating an application vomprising a Master process and a number of Client processes. Each process will process some data. When complete the Clients will send their processed data into a space where it can be collated by the Master process.
The Client processes will start processing when they receive a signal to do so from the Master process. This will be encoded as a Message entry of the form described earlier in the HelloWorld and the extended HelloWorld programmes. The Mesage class is presented in Table 1.
import net.jini.core.entry.*;
/* ----------------------------------------- */
/* */
/* MESSAGE CLASS */
/* */
/* ----------------------------------------- */
public class Message implements Entry {
// Fields
public String content;
// Constructors
public Message() {
}
public Message(String content) {
this.content = content;
}
}
|
Table 1: Message entry class
The Master process will also create a synchronisation array (one element per Client process) with all elements set to 0. When each Client completes processing and sending its data it will take the synchronsistaion element whose index corresponds with its Client number out of the space, replace the value with 1 signalling completion, and send the element back into the space. When the entire synchronisation array is "peopled" with 1s the Master process will read the data entries from the space and collate with its own entries. The sysnchronisastion array will be encoded as a distributed array structure as described in the distributed array application example. The Element class used is pesented in table 2.
import net.jini.core.entry.*;
/* ------------------------------------------------ */
/* */
/* ARRAY ELEMENT CLASS */
/* */
/* ------------------------------------------------ */
public class Element implements Entry {
/* --- FIELDS --- */
public String name;
public Integer index;
public Integer value;
/* --- CONSTRUCTORS --- */
public Element() {
}
public Element(String name) {
this.name = name;
}
public Element(String name, int index) {
this.name = name;
this.index = new Integer(index);
}
public Element(String name, int index, int value) {
this.name = name;
this.index = new Integer(index);
this.value = new Integer(value);
}
}
|
Table 2: Element entry class
The Master and Client processes are described by two sub-classes, MasterProcess and ClientProcess, of a common parent class (Process). These three classes are presented in Tables 3, 4 and 5.
/* ------------------------------------------- */
/* */
/* PROCESS */
/* */
/* ------------------------------------------- */
// JavaSpacesUtil package
import JavaSpacesUtils.SpaceAccessor;
// Jini core packages
import net.jini.core.lease.*;
// Jini extension package
import net.jini.space.JavaSpace;
public class Process {
/* ------ FIELDS ------ */
protected JavaSpace space;
protected int[] dataArray = new int[10];
/* ------ COMSTRUCTORS ------ */
public Process() {
// Get JavaSpace
try {
SpaceAccessor newSpaceAccessor = new
SpaceAccessor("/home/staff5/ra/frans/JavaProgs/" +
"JavaSpaces/JavaSpacesUtils/frans_space.prop");
space = newSpaceAccessor.getSpace();
}
// Catch block
catch(Exception e) {
e.printStackTrace();
}
}
/* ------ METHODS ------ */
/* PROCESS DATA */
protected void processData() {
System.out.println("Processing local data");
// Loop
for(int index=0;index < dataArray.length;index++)
dataArray[index] = index;
}
/* OUTPUT DATA */
protected void outputData() {
// Loop
for(int index=0;index < dataArray.length;index++) {
System.out.println("(" + index + ") = " + dataArray[index]);
}
}
}
|
Table 3: Process class
/* ------------------------------------------- */
/* */
/* MASTER PROCESS */
/* */
/* ------------------------------------------- */
// Java packages
import java.util.*;
import java.rmi.*;
// JavaSpacesUtil package
import JavaSpacesUtils.SpaceAccessor;
// Jini core packages
import net.jini.core.lease.*;
import net.jini.core.transaction.*;
import net.jini.core.entry.*;
// Jini extension package
import net.jini.space.JavaSpace;
public class MasterProcess extends Process {
/* ------ FIELDS ------ */
private int numClients;
/* ------ COMSTRUCTORS ------ */
/* Also calls parent constructor */
public MasterProcess(int clients) {
numClients = clients;
}
/* ------ METHODS ------ */
/* START MASTER PROCESS */
public void startMasterProcess() {
System.out.println("MASTER PROCESS STARTED");
// try block
try {
// Tell client processes to start
sendStartMessage();
// Send syncro array
sendSynchroArray();
// Process data
processData();
outputData();
// Collect data fromClients
collectData();
// Output data
outputData();
// End
System.out.println("Clear up");
clearUp();
System.out.println("MASTER PROCESS COMPLETE");
System.exit(0);
}
// Catch block
catch(Exception e) {
e.printStackTrace();
}
}
/* SEND START MESSAGE */
private void sendStartMessage() throws TransactionException,
RemoteException {
// Create Message object entry
Message msg = new Message("Start");
// Send message to space
space.write(msg,null,Lease.FOREVER);
}
/* SEND SYNCHRO ARRAY */
private void sendSynchroArray() throws TransactionException,
RemoteException {
System.out.println("Send synchronisation");
// Create synchro elements
for (int index=0;index < numClients;index++) {
Element newElement = new Element("Synchro",index,0);
space.write(newElement,null,Lease.FOREVER);
}
}
/* COLLECT DATA: Collect data from clients */
private void collectData() throws UnusableEntryException,RemoteException,
TransactionException,InterruptedException {
int index;
int value;
System.out.println("Collecting data from clients");
// Create a template
Element template = new Element("data");
// Loop
while(true) {
// Read entry
Element result = (Element) space.takeIfExists(template,null,
JavaSpace.NO_WAIT);
if (result == null) {
if (checkForEnd()) {
System.out.println("No more data");
break;
}
else Thread.sleep(1000); // Delay 1 second
}
else {
index = result.index.intValue();
value = result.value.intValue();
dataArray[index] = dataArray[index] + value;
}
}
System.out.println("Collection Complete");
}
/* CHECK FOR END */
private boolean checkForEnd() throws UnusableEntryException,RemoteException,
TransactionException,InterruptedException {
for(int index=0;index < numClients;index++) {
Element template = new Element("Synchro",index,1);
Element result = (Element) space.readIfExists(template,null,
space.NO_WAIT);
if (result == null) return(false);
}
// End
return(true);
}
/* CLEAR UP: Remove unwanted entries from space */
private void clearUp() throws UnusableEntryException,RemoteException,
TransactionException,InterruptedException {
// Remove message entry
Message template1 = new Message("Start");
space.takeIfExists(template1,null,space.NO_WAIT);
// Remove synchro array
for (int index=0;index < numClients;index++) {
Element template2 = new Element("Synchro",index);
space.takeIfExists(template2,null,space.NO_WAIT);
}
}
}
|
Table 4: MasterProcess class
/* ------------------------------------------- */
/* */
/* CLIENT PROCESS */
/* */
/* ------------------------------------------- */
// Java packages
import java.rmi.*;
// JavaSpacesUtil package
import JavaSpacesUtils.SpaceAccessor;
// Jini core packages
import net.jini.core.lease.*;
import net.jini.core.transaction.*;
import net.jini.core.entry.*;
// Jini extension package
import net.jini.space.JavaSpace;
public class ClientProcess extends Process {
/* ------ FIELDS ------ */
private int clientNum;
/* ------ COMSTRUCTORS ------ */
/* Also calls parent constructor */
public ClientProcess(int clientN) {
clientNum = clientN-1; // First index is 0 not 1!
}
/* ------ METHODS ------ */
/* MAIN */
public void startClientProcess() {
// try block
try {
// Wait for Master
getStartMessage();
System.out.println("CLIENT PROCESS " + (clientNum+1) + " STARTED");
// Process data
processData();
// Output data
outputData();
// Send data
sendData();
sendEnd();
// End
System.out.println("CLIENT PROCESS " + (clientNum+1) + " COMPLETE");
System.exit(0);
}
// Catch block
catch(Exception e) {
e.printStackTrace();
}
}
/* GET START MESSAGE */
private void getStartMessage() throws UnusableEntryException,
RemoteException,TransactionException,
InterruptedException {
// Create Message template
Message template = new Message("Start");
// Get message
Message result = (Message) space.read(template,null,
Long.MAX_VALUE);
}
/* SEND DATA */
private void sendData() throws TransactionException,RemoteException {
Element newElement;
System.out.println("Send data");
// Loop
for(int index=0;index < dataArray.length;index++) {
// Create element
newElement = new Element("data",index,dataArray[index]);
// Write element
space.write(newElement,null,Lease.FOREVER);
}
}
/* SEND END: Get element of "Synchro" array that represents the current
process, change its value to 1 and resent into space. */
private void sendEnd() throws UnusableEntryException,RemoteException,
TransactionException,InterruptedException {
// Create template
Element template = new Element("Synchro",clientNum);
// Take element
Element result = (Element) space.take(template,null,Long.MAX_VALUE);
// Revise value to 1
result.value = new Integer(1);
// Send back
space.write(result,null,Lease.FOREVER);
}
}
|
Table 5: ClientProcess class
We will also need Master and Client application classes, these are presented in Tables 6 and 7.
/* ------------------------------------------- */
/* */
/* MASTER PROCESS APPLICATION */
/* */
/* ------------------------------------------- */
public class MasterProcessApp {
/* ------ METHODS ------ */
public static void main(String[] args) {
int numClients = Integer.parseInt(args[0]);
// Create instance of MasterProcess
MasterProcess newMasterProcess = new MasterProcess(numClients);
// Commence master process
newMasterProcess.startMasterProcess();
}
}
|
Table 6: MasterProcessApp class
/* ------------------------------------------- */
/* */
/* CLIENT PROCESS APPLICATION */
/* */
/* ------------------------------------------- */
public class ClientProcessApp {
/* ------ METHODS ------ */
public static void main(String[] args) {
int clientNum = Integer.parseInt(args[0]);
// Create instance of ClientProcess
ClientProcess newClientProcess = new ClientProcess(clientNum);
// Commence client process
newClientProcess.startClientProcess();
}
}
|
Table 7: ClientProcessApp class
To run the above we should start the client processes before the Master process. Tables 8, 9 and 10 show the output from running two Client process and a Master process.
$ ./javaRun ClientProcessApp 1 jiniURL = jini://linux10 spaceName = frans_space CLIENT PROCESS 1 STARTED Processing local data (0) = 0 (1) = 1 (2) = 2 (3) = 3 (4) = 4 (5) = 5 (6) = 6 (7) = 7 (8) = 8 (9) = 9 Send data CLIENT PROCESS 1 COMPLETE |
Table 8: Output produced by Client process 1
$ ./javaRun ClientProcessApp 2 jiniURL = jini://linux10 spaceName = frans_space CLIENT PROCESS 2 STARTED Processing local data (0) = 0 (1) = 1 (2) = 2 (3) = 3 (4) = 4 (5) = 5 (6) = 6 (7) = 7 (8) = 8 (9) = 9 Send data CLIENT PROCESS 2 COMPLETE |
Table 9: Output produced by Client process 2
$ ./javaRun MasterProcessApp 2 jiniURL = jini://linux10 spaceName = frans_space MASTER PROCESS STARTED Send synchronisation Processing local data (0) = 0 (1) = 1 (2) = 2 (3) = 3 (4) = 4 (5) = 5 (6) = 6 (7) = 7 (8) = 8 (9) = 9 Collecting data from clients No more data Collection Complete (0) = 0 (1) = 3 (2) = 6 (3) = 9 (4) = 12 (5) = 15 (6) = 18 (7) = 21 (8) = 24 (9) = 27 Clear up MASTER PROCESS COMPLETE |
Table 10: Output produced by Master process