Previous Contents Next

Chapter 2   Distributed programming

This chapter presents the distributed and mobile features of JoCaml. JoCaml is specifically designed to provide a simple and well-defined model of distributed programming. Since the language entirely relies on asynchronous message-passing, programs can either be used on a single machine (as described in the previous chapter), or they can be executed in a distributed manner on several machines.

In this chapter, we describe support for execution on several machines and new primitives that control locality, migration, and failure. To this end, we interleave a description of the model with a series of examples that illustrate the use of these primitives.

2.1   The Distributed Model

The execution of JoCaml programs can be distributed among numerous machines, possibly running different systems; new machines may join or quit the computation. At any time, every process or expression is running on a given machine. However, they may migrate from one machine to another, under the control of the language. In this implementation, the runtime support consists of several system-level processes that communicate using TCP/IP over the network.

In JoCaml, the execution of a process (or an expression) does not usually depend on its localization. Indeed, it is equivalent to run processes P and Q on two different machines, or to run the compound process { P | Q } on a single machine. In particular, the scope for defined names and values does not depend on their localization: whenever a port name appears in a process, it can be used to form messages (using the name as the address, or as the message contents) without knowing whether this port name is locally- or remotely-defined. So far, locality is transparent, and programs can be written independently of their run-time distribution.

Of course, locality matters in some circumstances: side-effects such as printing values on the local terminal depend on the current machine; besides, efficiency can be affected because message-sending over the network takes much longer than local calls; finally, the termination of some underlying runtime will affect all its local processes. For all these reasons, locality is explicitly controlled within JoCaml; it can be adjusted using migration. In contrast, resources such as definitions and processes are never silently relocated by the system.

An important issue when passing messages in a distributed system is whether the message contents is replicated or passed by reference. This is the essential difference between functions and synchronous channels. When a function is sent to a remote machine, its code and the values for its local variables are also sent there, and any invocation will be executed locally on the remote machine. When a synchronous port name is sent to a remote machine, only the name is sent and invocations on this name will forward the invocation to the machine were the name is defined, much as in a remote procedure call.

The name-server

Since JoCaml has lexical scoping, programs being executed on different machines do not initially share any port name; therefore, they would normally not be able to interact with one another. To bootstrap a distributed computation, it is necessary to exchange a few names, and this is achieved using a built-in library called the name server. Once this is done, these first names can be used to communicate some more names and to build more complex communication patterns.

The interface of the name server mostly consists of two functions to register and look up arbitrary values in a ``global table'' indexed by plain strings (see 3.2.3 for reference). To use the name server, do not forget to first launch it, using the command jocns in a shell.

For instance, the following program contains two processes running in parallel. One of them locally defines some resource (a function f that squares integers) and registers it under the string ``square''. The other process is not within the scope of f; it looks up for the value registered under the same string, locally binds it to sqr, then uses it to print something.

# spawn{ let def f x = reply x*x in Ns.register "square" f vartype; }
# ;;
# spawn{ let sqr = Ns.lookup "square" vartype in print_int (sqr 2); exit
# 0;} 
# ;;

File "ex1.ml", line 3, characters 36-43:
Warning: VARTYPE replaced by type 
( int ->  int) metatype
File "ex1.ml", line 1, characters 57-64:
Warning: VARTYPE replaced by type 
( int ->  int) metatype
-> 4
The vartype keyword stands for the type of the value that is being registered or looked up, and it is supplied by the compiler. When a value is registered, its type is explicitely stored with it. When a value is looked up, the stored type is compared with the inferred type in the receiving context; if these types do not match, an exception TypeMismatch is raised. This limited form of dynamic typing is necessary to ensure type safety; to prevent run-time TypeMismatch exceptions, the compiler also provides the inferred vartype at both ends of the name server, here int -> int.

Of course, using the name server makes sense only when the two processes are running as part of stand-alone programs on different machines, and when these processes use the same conventional strings to access the name server. To avoid name clashes when using the same name server for unrelated computations, a local identifier Ns.user is appended to the index string; by default, Ns.user contains the local user name.

Running several programs in concert

The runtimes that participate to a distributed computation are launched as independent executables, e.g. bytecode executables generated by the compiler and linked to the distributed runtime. Each runtime is given its own JoCaml program.

When lookup or register is first used, the runtime attempts to communicate with its name server, using by default the IP address and port number provided in two environment variables JNSNAME and JNSPORT. If no name server is running, the lookup or register call fails. The IP address for the server (or its port number) can also be supplied as command-line parameters when the runtime is started.

The following example illustrates this with two machines. Let us assume that we have a single machine ``here.inria.fr'' that is particularly good at computing squares of integers; on this machine, we define a square function that also prints something when it is called (so that we can keep track of what is happening), and we register this function with key ``square'':

# let def f x = 
#   print_string ("["^string_of_int(x)^"] "); flush stdout;
#   reply x*x in
# Ns.register "square" f vartype
# ;;
# 
# Join.server ()
The ``Join.server'' primitive tells the program to keep running after the completion of all local statements, so that it can serve remote calls.

On machine here.inria.fr, we compile the previous program (p.ml) and we execute it:
here> joc p.ml -o p.out
here> ./p.out
We also write a program that relies on the previous machine to compute squares; this program first looks up for the name registered by here.inria.fr, then performs some computations and reports their results.

# let sqr = Ns.lookup "square" vartype
# ;;
# 
# let def log (s,x) = 
#   print_string ("q: "^s^"= "^string_of_int(x)^"\n"); flush stdout; reply
# ;;
# 
# let def sum (s,n) = reply (if n = 0 then s else sum (s+sqr(n),n-1))
# ;;
# 
# log ("sqr 3",sqr 3);
# log ("sum 5",sum (0,5));
# exit 0
On another machine there.inria,fr, we compile and run our second program (q.ml), after setting the address of our name server (we here use bash syntax to set the environment variable):
there> joc  q.ml -o q.out
there> export JNSNAME=here.inria.fr
there> ./q.out
What is the outcome of this computation? Whenever a process defines new port names, this is done locally, that is, their guarded processes will be executed at the same place as the defining process. Here, every call to square in sqr 3 and within sum 5 will be evaluated as a remote function call to here.inria.fr. The actual localization of processes is revealed by the print_int statements: f (aliased to sqr on there.inria.fr) always prints on machine here, and log always prints on machine there, no matter where the messages are posted.

The result on machine here.inria.fr is:
-> [3] [5] [4] [3] [2] [1] 
while the result on machine there.inria.fr is:
-> q: sqr 3= 9
-> q: sum 5= 55

Polymorphism and the name server

Typing the previous programs in the toplevel joctop gives an interesting error. Indeed, everything goes well until the following line:
# let sqr = Ns.lookup "square" vartype
# ;;
which results in the error:
-> Failure: Failure("vartype: no polymorphism is allowed")
The current implementation of the name server only deals with monomorphic types. Since there is no indication on the type of sqr when it is looked up, the lookup fails. A solution would be to give the type of sqr, as in:
# let sqr : int -> int = Ns.lookup "square" vartype
# ;;
This error does not occur when compiling the file q.ml because later use of sqr make its type precise enough.

2.2   Locations and Mobility

So far, the localization of processes and expressions is entirely static. In some cases, however, a finer control is called for. To compute the sum of squares in the previous example, each call to sqr within the loop resulted in two messages on the network (one for the request, and another one for the answer). It would be better to run the whole loop on the machine that actually computes squares. Yet, we would prefer not to modify the program running on the server every time we need to run a different kind of loop that involves numerous squares.

To this end, we introduce a unit of locality called ``location''. A location contains a bunch of definitions and running processes ``at the same place''. Every location is given a name, and these location names are first-class values. They can be communicated as contents of messages, registered to the name server, ...much as port names. These location names can also be used as arguments to primitives that dynamically control the relations between locations.

2.2.1   Basic examples

Locations can be declared either locally or as a top-level statement. For instance, we create a new location named this_location:

# let loc this_location
#   def   square x = reply x*x
#     and  cubic x = reply (square x)*x
#   do { 
#     print_int (square 2);
#   }
# ;;
# print_int (cubic 2)
# ;;
val this_location : Join.location
val cubic : int -> int
val square : int -> int
-> 48
The let loc declaration binds a location name this_location, and two port names square and cubic whose scope extends to the location and to the following statements. Here, the declaration also has an init part that starts a process {print_int (square 2);}. This process runs within the location, in parallel with the remaining part of the program. As a result, we can obtain either 84 or 48.

Distributed computations are organized as trees of nested locations; every definition and every process is permanently attached to the location where it appears in the source program; a process can create new sublocations with an initial content (bindings and processes) and a fresh location name. Once created, there is no way to place new bindings and processes in the location from outside the location.

For instance, the following program defines three locations such that the locations named kitchen and living_room are sublocations of house. As regards the scopes of names, the locations kitchen, living_room and the ports cook, switch, on, off all have the same scope, which extends to the whole house location (betweeen the first do { and the last }); the location name house has a larger scope that would include whatever follows in the source file.
# let loc house 
#   do {
#     let loc kitchen 
#       def cook () = print_string "cooking...\n"; reply
#       do {}
#     and living_room
#       def  switch () | off! () = print_string "music on\n"; on () | reply
#        or  switch () | on! ()  = print_string "music off\n"; off() | reply
#       do { off () }
#     in
#     switch (); cook (); switch ();
#   }
# ;;
val house : Join.location
-> music on
-> cooking...
-> music off

2.2.2   Mobile Agents

While processes and definitions are statically attached to their location, locations can move from one enclosing location to another. Such migrations are triggered by a process inside of the moving location. As a result of the migration, the moving location becomes a sublocation of its destination location. Notice that locations can be used for several purposes: as a destination addresses, as mobile agents, or as a combination of the two.

Our next example is an agent-based variant of the square example above. On the squaring side, we create a new empty location ``here'', and we register it on the name-server; its name will be used as the destination address for our mobile agent.

# let loc here do {}
# ;;
# 
# Ns.register "here" here vartype;
# Join.server ()
On the client side, we create another location ``mobile'' that wraps the loop computation that should be executed on the square side; the process within mobile first gets the name here, then migrates its location inside of ``here''. Once this is done, it performs the actual computation.

# let loc mobile 
#   do {
#     let here = Ns.lookup "here" vartype in
#     go here;
#     let sqr = Ns.lookup "square" vartype in
#     let def sum (s,n) = 
#       reply (if n = 0 then s else sum (s+sqr n, n-1)) in
#     let result = sum (0,5) in
#     print_string ("q: sum 5= "^string_of_int result^"\n"); flush stdout;
#   }
The go here primitive migrates the mobile location with its current contents to the machine here.inria.fr, as a sub-location of location here, then it returns. Afterwards, the whole computation (calls to the name server, to sqr and to sum) is local to here.inria.fr. There are only three messages exchanged between the machines: one for the lookup here request, one for the answer, and one for the migration.

Let us consider a variant of mobile that combines migration and remote communication:

# let sqr  = Ns.lookup "square" vartype
# let here = Ns.lookup "here" vartype
# 
# let def done1! () | done2! () = exit 0;
# ;;
# 
# let def log (s,x) =
#       let r = string_of_int x in
#       print_string ("agent: "^s^" is "^r^"\n"); flush stdout;
#       reply 
# ;;
# let loc mobile
#   def quadric x = reply sqr(sqr x)
#   and sum (s,n,f) = reply (if n = 0 then s else sum(s+f n, n-1, f))
#   do { 
#     go here; 
#     log ("sum ( i^2 , i= 1..10 )",sum (0,10,sqr)); 
#     done1 () }
# ;;
# spawn {log ("sum ( i^4 , i= 1..10 )", sum (0,10,quadric)); done2 ()}
As before, the mobile agent contains a process that first controls the migration, then performs some computation. Here, the location mobile is also used as a container for the definitions of quadric and sum. The scoping rules are never affected by migrations, so quadric and sum can be used in the following expressions that remain on machine there.

Once the agent arrives on the square machine here, the calls to sum and quadric become remote calls, as if both functions had been defined and registered on machine here. Conversely, messages sent to log from the body of the mobile agent arrive on machine there where the result is printed. As we run this program on machine there, we thus obtain the local output:
-> agent: sum ( i^4 , i= 1..10 ) is 25333
-> agent: sum ( i^2 , i= 1..10 ) is 385
As regards locality, every repeated use of sqr is now performed on machine here. In the example, the computation of the two sums is entirely local once the agent arrives on machine here (one network datagram), which is much more efficient than the equivalent RPC-based program, which would send over sixty network datagrams. (The done1 and done2 messages are signals used to ensure the termination of the program in the script that generates the tutorial.)

Remember that localization and scopes are independent in JoCaml: an agent can perform exactly the same actions no matter where it is actually positioned in the location tree. If we forget about the difference between what is local and what is remote, our program produces the same result as a plain program where the locations boundaries and the migration have been removed:
# let sqr  = Ns.lookup "square" vartype
# 
# let def done1! () | done2! () = exit 0;
# and log (s,x) =
#       let r = string_of_int x in
#       print_string ("agent: "^s^" is "^r^"\n"); flush stdout;
#       reply  
# and quadric x = reply sqr(sqr x)
# and sum (s,n,f) = reply (if n = 0 then s else sum(s+f n, n-1, f)) ;;
# 
# spawn { log ("sum ( i^2 , i= 1..10 )", sum (0,10,sqr)); done1 () } ;;
# spawn { log ("sum ( i^4 , i= 1..10 )", sum (0,10,quadric)); done2 () } ;;
Apart from the performances, both styles are equivalent. In particular, we can first write and test programs, then refine them to get a better tuning of locality.

Applets

The next example shows how to define ``applets''. An applet is a program that is downloaded from a remote server, then used locally. As compared to the previous examples, this migration operates the other way round. Here, the applet defines a reference cell with destructive reading:

# let def cell there = 
#   let def log s = print_string ("cell "^s^"\n"); flush stdout; reply in
# 
#   let loc applet
#     def get () | some! x = log ("is empty"); none () | reply x
#     and  put x | none! () = log ("contains "^x); some x | reply
#     do { go there; none () } in 
# 
#   reply get, put
# ;;
# 
# Ns.register "cell" cell vartype;
# Join.server ()
Our applet has two states: either none () or some s where s is a string, and two methods get and put. Each time cell is called, it creates a new applet in its own location. Thus, numerous independent cells can be created and shipped to callers. The name cell takes as argument the location (there) where the new cell should reside. The relocation is controlled by the process go there; none () that first performs the migration, then sends an internal message to activate the cell. Besides, cell defines a log function outside of the applet. The latter therefore remains on the server and, when called from within the applet on the client machine, keeps track of the usage of its cell. This is in contrast with applets à la Java: the location migrates with its code, but also with its communication capabilities unaffected.

We complement our example with a simplistic user that allocates and uses a local cell:

# let cell = Ns.lookup "cell" vartype
# 
# let loc user
#   do {
#     let get, (put : string -> unit) = cell user in
#     put "world";
#     put ("hello, "^get ()); 
#     print_string (get ());
#     exit 0;
#   } 
-> hello, world
On the server side, we get the trace:

-> cell contains world
-> cell is empty
-> cell contains hello, world
-> cell is empty
On the client side, there are no more go primitives in the applet after its arrival, and this instance of the location name applet does not appear anywhere. As a result, the contents of the applet can be considered part of the host location, as if this contents had been defined locally in the beginning. (Some other host location may still move, but then it would carry the cell applet as a sublocation.)

Data-driven Migration

In the following examples, we consider ``large'' data structures distributed between several machines. We are interested in defining a general iterator that takes a distributed data structure and applies a function to each of its basic component. Because of their relative sizes, it is better to have agents move from site to site as they use the data, rather than move the data or, even worse, access the data one piece at a time.

In practice, we use arrays as the building blocs of our data structure; the basic functions to allocate arrays and fill them (make), and to create a general function that applies a function over every value in the array (iter) could be defined by the following module table.ml:

# let def make (n,f) =
#   let a = Array.create n 0 in
#   for m = 0 to n - 1 do Array.set a m (f m) done; 
#   reply a
# ;;
# let def iter a = 
#   let def i f = Array.iter f a; reply in
#   reply i
# ;;
We now need to ``glue'' together several arrays. More precisely, we define an iterator that is consistent with locality: for each array, we move an agent that contains the function to apply inside of the array location, then we apply it, then we move to the next array, ...

Now, each part of the data structure is a pair (host location, iterator), and the mobility protocol consists in (1) migrate the function to apply inside of the host location, (2) call the iterator with this function as argument.

For instance, the module statistics.ml collects data using this protocol and keeps its partial results as an internal message:
# let def collect! (loc_data, iter_data) =
# 
#   let loc agent 
#     def state! (n,s,s2) | f x = state (n+1,s+x,s2+x*x) | reply
#     or  state! (n,s,s2) | finished! () | result () = reply n,s,s2
#   do {
#     go loc_data; 
#     { state (0,0,0) | iter_data f; finished () }
#   } in
# 
#   let n,s,s2 = result () in
#   print_string "the size is "; print_int n;
#   print_string ", the average is "; print_int (s/n);
#   print_string ", the variance is "; print_int ((n*s2-s*s)/(n*n));
#   print_newline ();
#   exit 0;
Here is the definition of a basic data structure that consists of one array:
# let loc here  ;;
# 
# let iter = 
#   let def f x = reply 2*x+5 in
#   Table.iter (Table.make (100,f)) ;;
# 
# Ns.register "loc_a" here vartype;
# Ns.register "iter_a" (iter : (int -> unit) -> unit) vartype;
# Join.server ()
In order to build our structure in a compositional way, we use a merge function. This function takes two (location, iterator) pairs and returns a new such pair standing for the compound data structure (module merge.ml):

# let def merge (loc1,iter1,loc2,iter2) =
#   let loc mobile 
#     def iter f = go loc1; iter1 f; go loc2 ; iter2 f; reply in
#   reply mobile,iter
Thereby, we can assemble data by repeatedly calling the merge functions on miscellaneous chunks; this defines a structural tree whose leaves are basic arrays. At the same time, merge sets up the locations that are needed to traverse it locally. In our example, the locations of arrays are stationary, but this is not the case of the locations of compound structures: these locations move to each of the subcomponents in turn before applying their iterator to them.

For instance, if we consider the data structure built from three arrays in the following program,

# let (itr_a : (int -> unit) -> unit) = Ns.lookup "iter_a" vartype 
# let loc_a = Ns.lookup "loc_a" vartype
# let (itr_b : (int -> unit) -> unit) = Ns.lookup "iter_b" vartype 
# let loc_b = Ns.lookup "loc_b" vartype
# let (itr_c : (int -> unit) -> unit) = Ns.lookup "iter_c" vartype 
# let loc_c = Ns.lookup "loc_c" vartype
# 
# open Merge
# let loc_ab,  itr_ab   = merge (loc_a ,itr_a ,loc_b, itr_b)
# let loc_ab_c,itr_ab_c = merge (loc_ab,itr_ab,loc_c, itr_c) 
# ;;
# 
# spawn {Statistics.collect (loc_ab_c, itr_ab_c)}
We obtain the results on the machine that runs the program:

-> the size is 230, the average is 77, the variance is 3109
The successive nestings of locations during the computation are:
 iter on a     ---->  iter on b  ---->  iter on c
-----------------------------------------------------
 a     b     c   |  a     b     c  |  a     b     c
 ab -->          |       ab        |       ab
ab_c             |      ab_c -->   |            ab_c
agent            |      agent      |            agent
Notice that migrations are delegated at each level of nesting: as we apply a function f, we put it in its own location (agent) and migrate it inside the data structure location (ab_c), which is a leaf of the location tree. Then, we repeatedly apply the function and migrate some location whose branch contains the location of f. At any stage, the branch of the location tree that contains the leaf agent is the inverse of a branch in the structural tree, and the subpart locations are the successive superlocations of the compound location.

2.2.3   Mobile objects

In JoCaml, all objects are also locations. Thus they can migrate to a location or be the target of a migration. Object methods are also channels, thus a method call will be run on the machine where the object is, not necessarily the same machine as where the call is made.

An example of a mobile object

For instance we can create an object that has methods to make it migrate, and print some information to see where the object is:

# let home = Join.here
# ;;
# 
# class migrant () =
#   method hello () =
#     print_string "hello\n"
#   method go_home () = 
#     Join.go home;
#     print_string "I'm home\n"
#   method go_there l =
#     Join.go l;
#     print_string "I'm not home\n"
# end
# ;;
# 
# let obj = new migrant () in
# let def finished! () = { exit 0; } in
# Ns.user := "mobile_object";
# Ns.register "migrant" obj vartype;
# Ns.register "finished" finished vartype;
# Join.server ()
To use this object, we get it from the name server (it could also be sent in a message). In both cases, we only get a reference to the object, not the object itself.

# let here = Join.here
# ;;
# 
# Ns.user := "mobile_object"
# ;;
# 
# let (obj : < hello : unit -> unit;
#              go_home : unit -> unit;
#              go_there : Join.location -> unit > ) =
#   Ns.lookup "migrant" vartype
# ;;
# 
# let (finished : << unit >> ) = Ns.lookup "finished" vartype
# ;;
# 
# obj#hello ();
# obj#go_there here;
# obj#go_home ();
# spawn{finished ()};
# exit 0
The output on the first machine is:

-> hello
-> I'm home
Even though the ``hello'' call was made from the second machine, the message is printed on the machine where the object is.

The output on the second machine is:

-> I'm not home
It is clear here that the object has migrated to the second machine, before printing it is there.

Objects as migration targets

Since objects can be considered as locations, it is legitimate to have an objct as a migration target. However, the ``Join.go'' primitive expects a location:
# let f a = Join.go a
# ;;
val f : Join.location -> unit
This is why there is another primitive to migrate to an object, called ``Join.goo'':
# let g a = Join.goo a
# ;;
val g : <  > -> unit
There is however a small issue because of explicit subtyping and objects. Consider for instance the following object:
# class foo () =
#   method bar () = print_string "hello\n"
# end
# ;;
# 
# let obj = new foo()
# ;;
class foo (unit) = method bar : unit -> unit end
val obj : foo
Let us now create a location that migrates to this object:
# let loc test do {
#   Join.goo obj;
# }
# ;;
File "ex7.ml", line 4, characters 11-14:
This expression has type Ex6.foo = < bar : unit -> unit >
but is here used with type <  >
There is a typing problem, because subtyping between objects is explicit in Objective Caml. Thus we need to constraint the type of the object the location is migrating to:
# let loc test do {
#   Join.goo (obj :> < >);
# }
# ;;
val test : Join.location

Mobility and object creation

For the time being, a missing feature in JoCaml is the migration of the capability to create objects. For instance, consider the following code:

# let home = Join.here
# ;;
# 
# class migrant () =
#   method hello () =
#     print_string "hello\n"
#   method go_home () = 
#     Join.go home;
#     print_string "I'm home\n"
#   method go_there l =
#     Join.go l;
#     print_string "I'm not home\n"
# end
# ;;
# 
# let f () = new migrant ()
# ;;
# 
# Ns.register "f" f vartype
# ;;
val home : Join.location
class migrant (unit) =
  method go_home : unit -> unit
  method go_there : Join.location -> unit
  method hello : unit -> unit
end
val f : unit -> migrant
File "ex9.ml", line 19, characters 18-25:
Warning: VARTYPE replaced by type 
( unit -> <go_home:( unit ->  unit) ; go_there:( Join.location ->  unit) ; hello:( unit ->  unit) ; >) metatype
If another runtime looks the function ``f'' up and uses it (or gets it by other means, as in a message), the object returned may not work. Similarly, if a location or an object containing a function that creates some object migrates to another runtime, the object returned by the call to such a function may not be correct. This is a limitation of the current implementation.

2.3   Termination, Failures and Failure Recovery

As a matter of fact, some parts of a distributed computation may fail (e.g., because a machine is abruptly switched off). The simplest solution would be to abort the whole computation whenever this is detected, but this is not realistic in case numerous machines are involved. Rather, we would like our programs to detect such failures and take adequate measures, such as cleanly report the problem, abort related parts of the computation, or make another attempt on a different machine. To this end, JoCaml provides an abstract model of failure and failure detection expressed in terms of locations:

In the current implementation, halting is detected only when the halt () primitive is issued in the same runtime, or when the runtime containing the location actually stops. Thus, simply issuing a halt () will not trigger a matching fail in another runtime. This fail will be triggered when the runtime hosting the watched location terminates or becomes unreachable.

The semantics of JoCaml guarantees that this is the only reason why parts of the computation may stop. Now, an executable running the program P on a fallible machine can be thought as a system-defined location let loc machine do { P | {crash (); exit 0;} } where crash may return at any time, with exit 0 terminating the runtime.

In the model, a machine can in particular remotely detect that another machine has stopped, once it knows the name of a location there. In practice, it is difficult to provide reliable failure detection, as this requires further assumptions on the network.

In the prototype implementation, error detection is only partially implemented, hence there no guarantee that when a runtime terminates abnormally, the failure of its locations is detected. (Still, detected failures provide the expected negative guarantees: the failed location is not visible anymore to any part of the computation.)

Since locations fail only as a whole, the programmer can define suitable units of failure, and even use the halt/fail primitives to control the computation. In the current implementation, the control is not this fine-grained when spanning over several runtimes, and require to use exit 0/fail primitives. Notice that no silent recovery mechanism is provided; the programmer must figure out what to do in case of problems.

2.3.1   Basic examples

To begin with, we use simple examples that attempt to use a port name say inside of a fallible location to get messages printed. Because these calls may never return in case the locations stopped, we spawn them instead of waiting for their completion.

In this first example, location agent can stop at any time. After the failure occurred, we print some report message. We know for sure that the first say can only print something before the failure report, and that the second say cannot print anything.
# let loc agent
#   def say s = print_string s; reply
#   do { halt (); } ;;
# 
# spawn { say "it may work before.\n"; }
# ;;
# spawn { fail agent; 
#         print_string "the location stopped\n";
#         say "it never works after\n"; } 
# ;;
val agent : Join.location
val say : string -> unit
-> the location stopped
The following example is more tricky. First, the agent does not halt itself; however, it migrates within a location that stops and this is a deadly move. Second, the halt () process can be triggered only from the outside by a normal message kill (). Thus we know that the first say always prints its message. Finally, as there is no halt in location agent, it can only stop because location fallible halted, so that fail agent; implies that fallible also stopped.
# let loc fallible
#   def kill! () = halt (); 
# ;;
# 
# let loc agent
#   def say s = print_string s; reply
#   do { go fallible; }
# ;;
# 
# spawn { say "it always works.\n"; kill () } 
# ;;
# spawn { say "it may work before.\n"; }
# ;;
# spawn { fail agent; 
#         print_string "both locations stopped.\n";
#         say "it never works after.\n"; }
# ;;
val fallible : Join.location
val kill : <<unit>>
val agent : Join.location
val say : string -> unit
-> it always works.
-> it may work before.
-> both locations stopped.

2.3.2   Watching for Failures

We now move on to some more realistic use of failure-detection; we first consider a function that encapsulates a session with mobility and potential failures.

There is usually no need to halt locations that completed their task explicitly (the garbage-collector should take care of them). However, in some case we would like to be sure that no immigrant location is still running locally.

Let us assume that job is a remote function within location there that may create mobile sublocations and migrate them to the caller's site. To this end, the caller should supply a host location, as in the previous examples. How can we make sure that job is not using this location to run other agents after the call completes ? This is handled using a new temporary location box for each call, and halting it once the function call has completed.

# let def safe! (job,arg,success,failure) =
# 
#   let loc box
#     def  kill! ()  = halt();
#     and  start () = reply job (box,arg) in
# 
#   let def finished! x | running! () = finished x | kill()
#       or  finished! x | failed! ()  = success x
#       or  running! () | failed! ()  = failure () in
# 
#   finished (start ()) | running () | fail box; failed ()
# ;;
val safe : <<((Join.location * 'a -> 'b) * 'a * <<'b>> * <<unit>>)>>
Our supervising protocol either returns a result on success, or a signal on failure. In either case, the message guarantees that no alien computation may take place afterward.

Initially, there is a message running (), and the control definition waits for either some result on finished x or some failure detection on failed (). Whatever its definition is, the job process can create and move locations inside of the box, and eventually return some value to the start process within the box. Once this occurs, done forwards the reply to the control process, and the first join-pattern is triggered. In this case, the running () message is consumed and eventually replaced by a failed () message (once the kill () message is handled, the box gets closed, and the fail guard in the control process is triggered, releasing a message on failed).

At this stage, we know for sure that no binding or computation introduced by job remains on the caller's machine, and we can return the value as if a plain RPC had occurred.

This ``wrapper'' is quite general. Once a location-passing convention is chosen, the safe function does not depend on the actual computation performed by job (its arguments, its results, and even the way it uses locations are parametric here). We could refine this example further to transform unduly long calls to job into failure (by sending a failed () message after an external timeout), to give some more control to the caller (adding an abort message),...

2.3.3   Recovering from partial failures

We finally come back to distributed loops. We give an example of a program that uses the CPU of whatever machine is available to compute the sum 1 + 2 + ... + 999. Basically, we only assume that the iteration could be computed in any order, we cut the loop in small chunks, and we distribute a chunk to every available machine. The program takes care of potential failure. When a machine failes, any chunk that was distributed to that the machine is taken back and given to another machine.

# let size = 1000
# let chunk = 200
# 
# let def join! (name,there) =
#   let loc mobile 
#   do {
#     let def start! (i,finished) = 
#       let def loop! (u,s) = if u<(i+1)*chunk then loop (u+1,s+u) else finished s in
#       loop (i*chunk,0) in
#     go there; 
#     worker (name,mobile,start) } in
#   print_string (name^" joins the party\n"); flush stdout;
# 
# and job! i | worker! (name,there,start) =
#   print_string (name^","^string_of_int(i*chunk)^"\n"); flush stdout;
#   let def once! () | finished! s  = add s | worker (name,there,start)
#   or  once! () | failed! () = print_string (name^" went down\n"); job i in
#   once () | start (i,finished) | fail there;failed()
# 
# and result! (n,s) | add! ds =
#   let s' = s + ds in
#   if n > 0 then result (n-1,s')
#   else {print_string("The sum is "^string_of_int s'^"\n"); exit 0;} ;;
# 
# spawn { result (size/chunk-1,0)
#       | let def jobs! n = job n | if n>0 then jobs (n-1) in jobs (size/chunk-1) };;
# 
# Ns.register "join" join vartype ; Join.server ()
The actual work is performed in the mobile locations, once they reach the locations there provided by joining machines. Messages job i partition the work to be done. Each remote computation concludes either with a finised s or failed () message; in the latter case, the aborted job is re-issued. The resulting sum is accumulated as a message on result.

The client is not specific to our computation at all; indeed, its only contribution is a location where others may place their sublocations.

# let loc worker do {
#   let join =  Ns.lookup "join" vartype in 
#   join ("reliable",worker) }
# 
In the following, we explicitly model an unreliable task force, as a variant of the previous program that also has a ``time bomb'' which eventually stops the joining location:

# let delay = int_of_string(Sys.getenv "DELAY") 
# let name  = Sys.getenv "NAME"
# 
# let loc unreliable_worker do {
#   let join =  Ns.lookup "join" vartype in
#   let def tictac! n = if n = 0 then {exit 0;} else tictac (n-1) in 
#   join (name,unreliable_worker) | tictac delay } 
We start three of them with various parameters, for instance with the command lines:

DELAY=1000  ; NAME=fallible; ./work.out &
DELAY=2000  ; NAME=dubious ; ./work.out &
DELAY=30000 ; NAME=reliable; ./work.out &
./distributed_loop.out
and we observe the output of the last command (the main loop program):
-> dubious joins the party
-> dubious,0
-> fallible joins the party
-> dubious,200
-> reliable joins the party
-> fallible,400
-> reliable,600
-> dubious,800
-> The sum is 499500

Previous Contents Next