Chapter 2 Distributed programming
This chapter presents the distributed and mobile features of
JoCaml.
JoCaml is specifically designed to provide a simple and well-defined
model of distributed programming. Since the language entirely relies
on asynchronous message-passing, programs can either be used on a
single machine (as described in the previous chapter), or they can be
executed in a distributed manner on several machines.
In this chapter, we describe support for execution on several machines
and new primitives that control locality, migration, and failure. To
this end, we interleave a description of the model with a series of
examples that illustrate the use of these primitives.
2.1 The Distributed Model
The execution of JoCaml programs can be distributed among numerous
machines, possibly running different systems; new machines may join or
quit the computation.
At any time, every process or expression is running on a given
machine. However, they may migrate from one machine to another, under
the control of the language.
In this implementation, the runtime support consists of several system-level
processes that communicate using TCP/IP over the network.
In JoCaml, the execution of a process (or an expression)
does not usually depend on its localization. Indeed, it is equivalent
to run processes P and Q on two different machines, or to run
the compound process { P | Q } on a single machine.
In particular, the scope for defined names and values does not depend
on their localization: whenever a port name appears in a process, it
can be used to form messages (using the name as the address, or as the
message contents) without knowing whether this port name is locally-
or remotely-defined. So far, locality is transparent, and programs can
be written independently of their run-time distribution.
Of course, locality matters in some circumstances: side-effects such
as printing values on the local terminal depend on the current
machine; besides, efficiency can be affected because message-sending
over the network takes much longer than local calls; finally, the
termination of some underlying runtime will affect all its local
processes.
For all these reasons, locality is explicitly controlled within
JoCaml; it can be adjusted using migration. In contrast,
resources such as definitions and processes are never silently
relocated by the system.
An important issue when passing messages in a distributed system is
whether the message contents is replicated or passed by reference.
This is the essential difference between functions and synchronous
channels.
When a function is sent to a remote machine, its code and the values
for its local variables are also sent there, and any invocation will
be executed locally on the remote machine.
When a synchronous port name is sent to a remote machine, only the
name is sent and invocations on this name will forward the invocation
to the machine were the name is defined, much as in a remote procedure
call.
The name-server
Since JoCaml has lexical scoping, programs being executed on different
machines do not initially share any port name; therefore, they would
normally not be able to interact with one another. To bootstrap a
distributed computation, it is necessary to exchange a few names, and
this is achieved using a built-in library called the name server. Once
this is done, these first names can be used to communicate some more
names and to build more complex communication patterns.
The interface of the name server mostly consists of two functions to
register and look up arbitrary values in a ``global table'' indexed by
plain strings (see 3.2.3 for reference). To
use the name server, do not forget to first launch it, using the
command jocns in a shell.
For instance, the following program contains two processes running in
parallel. One of them locally defines some resource (a function f
that squares integers) and registers it under the string ``square''.
The other process is not within the scope of f; it looks up for the
value registered under the same string, locally binds it to sqr,
then uses it to print something.
# spawn{ let def f x = reply x*x in Ns.register "square" f vartype; }
# ;;
# spawn{ let sqr = Ns.lookup "square" vartype in print_int (sqr 2); exit
# 0;}
# ;;
File "ex1.ml", line 3, characters 36-43:
Warning: VARTYPE replaced by type
( int -> int) metatype
File "ex1.ml", line 1, characters 57-64:
Warning: VARTYPE replaced by type
( int -> int) metatype
-> 4
The vartype keyword stands for the type of the value that is being
registered or looked up, and it is supplied by the compiler. When a
value is registered, its type is explicitely stored with it. When a
value is looked up, the stored type is compared with the inferred type
in the receiving context; if these types do not match, an exception
TypeMismatch is raised. This limited form of dynamic typing is
necessary to ensure type safety; to prevent run-time TypeMismatch
exceptions, the compiler also provides the inferred vartype at both
ends of the name server, here int -> int.
Of course, using the name server makes sense only when the two
processes are running as part of stand-alone programs on different
machines, and when these processes use the same conventional strings
to access the name server. To avoid name clashes when using the same
name server for unrelated computations, a local identifier Ns.user
is appended to the index string; by default, Ns.user contains the
local user name.
Running several programs in concert
The runtimes that participate to a distributed computation are
launched as independent executables, e.g. bytecode executables
generated by the compiler and linked to the distributed runtime. Each
runtime is given its own JoCaml program.
When lookup or register is first used, the runtime attempts to
communicate with its name server, using by default the IP address and
port number provided in two environment variables JNSNAME and
JNSPORT. If no name server is running, the lookup or register
call fails. The IP address for the server (or its port number) can
also be supplied as command-line parameters when the runtime is
started.
The following example illustrates this with two machines. Let us
assume that we have a single machine ``here.inria.fr'' that is
particularly good at computing squares of integers; on this machine,
we define a square function that also prints something when it is
called (so that we can keep track of what is happening), and we
register this function with key ``square'':
# let def f x =
# print_string ("["^string_of_int(x)^"] "); flush stdout;
# reply x*x in
# Ns.register "square" f vartype
# ;;
#
# Join.server ()
The ``Join.server'' primitive tells the program to keep running
after the completion of all local statements, so that it can serve
remote calls.
On machine here.inria.fr, we compile the previous program (p.ml)
and we execute it:
here> joc p.ml -o p.out
here> ./p.out
We also write a program that relies on the previous machine to compute
squares; this program first looks up for the name registered by
here.inria.fr, then performs some computations and reports their
results.
# let sqr = Ns.lookup "square" vartype
# ;;
#
# let def log (s,x) =
# print_string ("q: "^s^"= "^string_of_int(x)^"\n"); flush stdout; reply
# ;;
#
# let def sum (s,n) = reply (if n = 0 then s else sum (s+sqr(n),n-1))
# ;;
#
# log ("sqr 3",sqr 3);
# log ("sum 5",sum (0,5));
# exit 0
On another machine there.inria,fr, we compile and run our second
program (q.ml), after setting the address of our name server (we
here use bash syntax to set the environment variable):
there> joc q.ml -o q.out
there> export JNSNAME=here.inria.fr
there> ./q.out
What is the outcome of this computation? Whenever a process defines new
port names, this is done locally, that is, their guarded processes
will be executed at the same place as the defining process. Here,
every call to square in sqr 3 and within sum 5 will be evaluated
as a remote function call to here.inria.fr. The actual localization
of processes is revealed by the print_int statements: f (aliased to
sqr on there.inria.fr) always prints on machine here, and log
always prints on machine there, no matter where the messages are
posted.
The result on machine here.inria.fr is:
-> [3] [5] [4] [3] [2] [1]
while the result on machine there.inria.fr is:
-> q: sqr 3= 9
-> q: sum 5= 55
Polymorphism and the name server
Typing the previous programs in the toplevel joctop gives an
interesting error. Indeed, everything goes well until the following
line:
# let sqr = Ns.lookup "square" vartype
# ;;
which results in the error:
-> Failure: Failure("vartype: no polymorphism is allowed")
The current implementation of the name server only deals with
monomorphic types. Since there is no indication on the type of sqr
when it is looked up, the lookup fails. A solution would be to give
the type of sqr, as in:
# let sqr : int -> int = Ns.lookup "square" vartype
# ;;
This error does not occur when compiling the file q.ml because later
use of sqr make its type precise enough.
2.2 Locations and Mobility
So far, the localization of processes and expressions is entirely
static. In some cases, however, a finer control is called for. To
compute the sum of squares in the previous example, each call to sqr
within the loop resulted in two messages on the network (one for the
request, and another one for the answer). It would be better to run
the whole loop on the machine that actually computes squares. Yet, we
would prefer not to modify the program running on the server every
time we need to run a different kind of loop that involves numerous
squares.
To this end, we introduce a unit of locality called ``location''.
A location contains a bunch of definitions and running processes ``at
the same place''. Every location is given a name, and these location
names are first-class values. They can be communicated as contents of
messages, registered to the name server, ...much as port names.
These location names can also be used as arguments to primitives that
dynamically control the relations between locations.
2.2.1 Basic examples
Locations can be declared either locally or as a top-level statement.
For instance, we create a new location named this_location:
# let loc this_location
# def square x = reply x*x
# and cubic x = reply (square x)*x
# do {
# print_int (square 2);
# }
# ;;
# print_int (cubic 2)
# ;;
val this_location : Join.location
val cubic : int -> int
val square : int -> int
-> 48
The let loc declaration binds a location name this_location, and
two port names square and cubic whose scope extends to the
location and to the following statements. Here, the declaration also
has an init part that starts a process {print_int (square 2);}.
This process runs within the location, in parallel with the remaining part
of the program. As a result, we can obtain either 84 or 48.
Distributed computations are organized as trees of nested locations;
every definition and every process is permanently attached to the
location where it appears in the source program; a process can create
new sublocations with an initial content (bindings and processes) and
a fresh location name. Once created, there is no way to place new
bindings and processes in the location from outside the location.
For instance, the following program defines three locations such that
the locations named kitchen and living_room are sublocations of
house. As regards the scopes of names, the locations kitchen,
living_room and the ports cook, switch, on, off all have the
same scope, which extends to the whole house location (betweeen the
first do { and the last }); the location name house has a larger
scope that would include whatever follows in the source file.
# let loc house
# do {
# let loc kitchen
# def cook () = print_string "cooking...\n"; reply
# do {}
# and living_room
# def switch () | off! () = print_string "music on\n"; on () | reply
# or switch () | on! () = print_string "music off\n"; off() | reply
# do { off () }
# in
# switch (); cook (); switch ();
# }
# ;;
val house : Join.location
-> music on
-> cooking...
-> music off
2.2.2 Mobile Agents
While processes and definitions are statically attached to their
location, locations can move from one enclosing location to another.
Such migrations are triggered by a process inside of the moving
location. As a result of the migration, the moving location becomes a
sublocation of its destination location.
Notice that locations can be used for several purposes: as a
destination addresses, as mobile agents, or as a combination of the
two.
Our next example is an agent-based variant of the square example
above. On the squaring side, we create a new empty location
``here'', and we register it on the name-server; its name will be
used as the destination address for our mobile agent.
# let loc here do {}
# ;;
#
# Ns.register "here" here vartype;
# Join.server ()
On the client side, we create another location ``mobile'' that wraps
the loop computation that should be executed on the square side; the
process within mobile first gets the name here, then migrates its
location inside of ``here''. Once this is done, it performs the
actual computation.
# let loc mobile
# do {
# let here = Ns.lookup "here" vartype in
# go here;
# let sqr = Ns.lookup "square" vartype in
# let def sum (s,n) =
# reply (if n = 0 then s else sum (s+sqr n, n-1)) in
# let result = sum (0,5) in
# print_string ("q: sum 5= "^string_of_int result^"\n"); flush stdout;
# }
The go here primitive migrates the mobile location with its
current contents to the machine here.inria.fr, as a sub-location of
location here, then it returns. Afterwards, the whole computation
(calls to the name server, to sqr and to sum) is local to
here.inria.fr. There are only three messages exchanged between the
machines: one for the lookup here request, one for the answer,
and one for the migration.
Let us consider a variant of mobile that combines migration and
remote communication:
# let sqr = Ns.lookup "square" vartype
# let here = Ns.lookup "here" vartype
#
# let def done1! () | done2! () = exit 0;
# ;;
#
# let def log (s,x) =
# let r = string_of_int x in
# print_string ("agent: "^s^" is "^r^"\n"); flush stdout;
# reply
# ;;
# let loc mobile
# def quadric x = reply sqr(sqr x)
# and sum (s,n,f) = reply (if n = 0 then s else sum(s+f n, n-1, f))
# do {
# go here;
# log ("sum ( i^2 , i= 1..10 )",sum (0,10,sqr));
# done1 () }
# ;;
# spawn {log ("sum ( i^4 , i= 1..10 )", sum (0,10,quadric)); done2 ()}
As before, the mobile agent contains a process that first controls the
migration, then performs some computation. Here, the location mobile
is also used as a container for the definitions of quadric and
sum. The scoping rules are never affected by migrations, so
quadric and sum can be used in the following expressions that
remain on machine there.
Once the agent arrives on the square machine here, the calls to
sum and quadric become remote calls, as if both functions had been
defined and registered on machine here. Conversely, messages sent to
log from the body of the mobile agent arrive on machine
there where the result is printed. As we run this program on machine
there, we thus obtain the local output:
-> agent: sum ( i^4 , i= 1..10 ) is 25333
-> agent: sum ( i^2 , i= 1..10 ) is 385
As regards locality, every repeated use of sqr is now performed on
machine here. In the example, the computation of the two sums is
entirely local once the agent arrives on machine here (one network
datagram), which is much more efficient than the equivalent RPC-based
program, which would send over sixty network datagrams. (The done1
and done2 messages are signals used to ensure the termination of the
program in the script that generates the tutorial.)
Remember that localization and scopes are independent in
JoCaml: an agent can perform exactly the same actions no matter
where it is actually positioned in the location tree. If we forget
about the difference between what is local and what is remote, our
program produces the same result as a plain program where the
locations boundaries and the migration have been removed:
# let sqr = Ns.lookup "square" vartype
#
# let def done1! () | done2! () = exit 0;
# and log (s,x) =
# let r = string_of_int x in
# print_string ("agent: "^s^" is "^r^"\n"); flush stdout;
# reply
# and quadric x = reply sqr(sqr x)
# and sum (s,n,f) = reply (if n = 0 then s else sum(s+f n, n-1, f)) ;;
#
# spawn { log ("sum ( i^2 , i= 1..10 )", sum (0,10,sqr)); done1 () } ;;
# spawn { log ("sum ( i^4 , i= 1..10 )", sum (0,10,quadric)); done2 () } ;;
Apart from the performances, both styles are equivalent. In
particular, we can first write and test programs, then refine them to
get a better tuning of locality.
Applets
The next example shows how to define ``applets''. An applet is a
program that is downloaded from a remote server, then used locally. As
compared to the previous examples, this migration operates the other
way round. Here, the applet defines a reference cell with destructive
reading:
# let def cell there =
# let def log s = print_string ("cell "^s^"\n"); flush stdout; reply in
#
# let loc applet
# def get () | some! x = log ("is empty"); none () | reply x
# and put x | none! () = log ("contains "^x); some x | reply
# do { go there; none () } in
#
# reply get, put
# ;;
#
# Ns.register "cell" cell vartype;
# Join.server ()
Our applet has two states: either none () or some s where s is a
string, and two methods get and put. Each time cell is called,
it creates a new applet in its own location. Thus, numerous
independent cells can be created and shipped to callers.
The name cell takes as argument the location (there) where the new
cell should reside. The relocation is controlled by the process
go there; none () that first performs the migration, then sends an
internal message to activate the cell. Besides, cell defines a log
function outside of the applet. The latter therefore remains on the
server and, when called from within the applet on the client machine,
keeps track of the usage of its cell. This is in contrast with
applets à la Java: the location migrates with its code, but also with
its communication capabilities unaffected.
We complement our example with a simplistic user that allocates and
uses a local cell:
# let cell = Ns.lookup "cell" vartype
#
# let loc user
# do {
# let get, (put : string -> unit) = cell user in
# put "world";
# put ("hello, "^get ());
# print_string (get ());
# exit 0;
# }
-> hello, world
On the server side, we get the trace:
-> cell contains world
-> cell is empty
-> cell contains hello, world
-> cell is empty
On the client side, there are no more go primitives in the applet
after its arrival, and this instance of the location name applet
does not appear anywhere. As a result, the contents of the applet can
be considered part of the host location, as if this contents had been
defined locally in the beginning. (Some other host location may still
move, but then it would carry the cell applet as a sublocation.)
Data-driven Migration
In the following examples, we consider ``large'' data structures
distributed between several machines.
We are interested in defining a general iterator that takes a
distributed data structure and applies a function to each of its basic
component.
Because of their relative sizes, it is better to have agents move from
site to site as they use the data, rather than move the data or, even
worse, access the data one piece at a time.
In practice, we use arrays as the building blocs of our data
structure; the basic functions to allocate arrays and fill them
(make), and to create a general function that applies a function
over every value in the array (iter) could be defined by the
following module table.ml:
# let def make (n,f) =
# let a = Array.create n 0 in
# for m = 0 to n - 1 do Array.set a m (f m) done;
# reply a
# ;;
# let def iter a =
# let def i f = Array.iter f a; reply in
# reply i
# ;;
We now need to ``glue'' together several arrays. More precisely, we
define an iterator that is consistent with locality: for each array,
we move an agent that contains the function to apply inside of the
array location, then we apply it, then we move to the next
array, ...
Now, each part of the data structure is a pair (host location,
iterator), and the mobility protocol consists in (1) migrate the
function to apply inside of the host location, (2) call the iterator
with this function as argument.
For instance, the module statistics.ml collects data using this
protocol and keeps its partial results as an internal message:
# let def collect! (loc_data, iter_data) =
#
# let loc agent
# def state! (n,s,s2) | f x = state (n+1,s+x,s2+x*x) | reply
# or state! (n,s,s2) | finished! () | result () = reply n,s,s2
# do {
# go loc_data;
# { state (0,0,0) | iter_data f; finished () }
# } in
#
# let n,s,s2 = result () in
# print_string "the size is "; print_int n;
# print_string ", the average is "; print_int (s/n);
# print_string ", the variance is "; print_int ((n*s2-s*s)/(n*n));
# print_newline ();
# exit 0;
Here is the definition of a basic data structure that consists of one array:
# let loc here ;;
#
# let iter =
# let def f x = reply 2*x+5 in
# Table.iter (Table.make (100,f)) ;;
#
# Ns.register "loc_a" here vartype;
# Ns.register "iter_a" (iter : (int -> unit) -> unit) vartype;
# Join.server ()
In order to build our structure in a compositional way, we use a
merge function. This function takes two (location, iterator) pairs
and returns a new such pair standing for the compound data structure
(module merge.ml):
# let def merge (loc1,iter1,loc2,iter2) =
# let loc mobile
# def iter f = go loc1; iter1 f; go loc2 ; iter2 f; reply in
# reply mobile,iter
Thereby, we can assemble data by repeatedly calling the merge
functions on miscellaneous chunks; this defines a structural tree
whose leaves are basic arrays. At the same time, merge sets up the
locations that are needed to traverse it locally.
In our example, the locations of arrays are stationary, but this is
not the case of the locations of compound structures: these locations
move to each of the subcomponents in turn before applying their
iterator to them.
For instance, if we consider the data structure built from three
arrays in the following program,
# let (itr_a : (int -> unit) -> unit) = Ns.lookup "iter_a" vartype
# let loc_a = Ns.lookup "loc_a" vartype
# let (itr_b : (int -> unit) -> unit) = Ns.lookup "iter_b" vartype
# let loc_b = Ns.lookup "loc_b" vartype
# let (itr_c : (int -> unit) -> unit) = Ns.lookup "iter_c" vartype
# let loc_c = Ns.lookup "loc_c" vartype
#
# open Merge
# let loc_ab, itr_ab = merge (loc_a ,itr_a ,loc_b, itr_b)
# let loc_ab_c,itr_ab_c = merge (loc_ab,itr_ab,loc_c, itr_c)
# ;;
#
# spawn {Statistics.collect (loc_ab_c, itr_ab_c)}
We obtain the results on the machine that runs the program:
-> the size is 230, the average is 77, the variance is 3109
The successive nestings of locations during the computation are:
iter on a ----> iter on b ----> iter on c
-----------------------------------------------------
a b c | a b c | a b c
ab --> | ab | ab
ab_c | ab_c --> | ab_c
agent | agent | agent
Notice that migrations are delegated at each level of nesting: as we
apply a function f, we put it in its own location (agent)
and migrate it inside the data structure location (ab_c), which is a
leaf of the location tree. Then, we repeatedly apply the function and
migrate some location whose branch contains the location of f. At
any stage, the branch of the location tree that contains the leaf
agent is the inverse of a branch in the structural tree, and the
subpart locations are the successive superlocations of the compound
location.
2.2.3 Mobile objects
In JoCaml, all objects are also locations. Thus they can migrate to a
location or be the target of a migration. Object methods are also
channels, thus a method call will be run on the machine where the
object is, not necessarily the same machine as where the call is made.
An example of a mobile object
For instance we can create an object that has methods to make it
migrate, and print some information to see where the object is:
# let home = Join.here
# ;;
#
# class migrant () =
# method hello () =
# print_string "hello\n"
# method go_home () =
# Join.go home;
# print_string "I'm home\n"
# method go_there l =
# Join.go l;
# print_string "I'm not home\n"
# end
# ;;
#
# let obj = new migrant () in
# let def finished! () = { exit 0; } in
# Ns.user := "mobile_object";
# Ns.register "migrant" obj vartype;
# Ns.register "finished" finished vartype;
# Join.server ()
To use this object, we get it from the name server (it could also be
sent in a message). In both cases, we only get a reference to the
object, not the object itself.
# let here = Join.here
# ;;
#
# Ns.user := "mobile_object"
# ;;
#
# let (obj : < hello : unit -> unit;
# go_home : unit -> unit;
# go_there : Join.location -> unit > ) =
# Ns.lookup "migrant" vartype
# ;;
#
# let (finished : << unit >> ) = Ns.lookup "finished" vartype
# ;;
#
# obj#hello ();
# obj#go_there here;
# obj#go_home ();
# spawn{finished ()};
# exit 0
The output on the first machine is:
-> hello
-> I'm home
Even though the ``hello'' call was made from the second machine, the
message is printed on the machine where the object is.
The output on the second machine is:
-> I'm not home
It is clear here that the object has migrated to the second machine,
before printing it is there.
Objects as migration targets
Since objects can be considered as locations, it is legitimate to have
an objct as a migration target. However, the ``Join.go'' primitive
expects a location:
# let f a = Join.go a
# ;;
val f : Join.location -> unit
This is why there is another primitive to migrate to an object, called
``Join.goo'':
# let g a = Join.goo a
# ;;
val g : < > -> unit
There is however a small issue because of explicit subtyping and
objects. Consider for instance the following object:
# class foo () =
# method bar () = print_string "hello\n"
# end
# ;;
#
# let obj = new foo()
# ;;
class foo (unit) = method bar : unit -> unit end
val obj : foo
Let us now create a location that migrates to this object:
# let loc test do {
# Join.goo obj;
# }
# ;;
File "ex7.ml", line 4, characters 11-14:
This expression has type Ex6.foo = < bar : unit -> unit >
but is here used with type < >
There is a typing problem, because subtyping between objects is
explicit in Objective Caml. Thus we need to constraint the type of the
object the location is migrating to:
# let loc test do {
# Join.goo (obj :> < >);
# }
# ;;
val test : Join.location
Mobility and object creation
For the time being, a missing feature in JoCaml is the migration of
the capability to create objects. For instance, consider the following
code:
# let home = Join.here
# ;;
#
# class migrant () =
# method hello () =
# print_string "hello\n"
# method go_home () =
# Join.go home;
# print_string "I'm home\n"
# method go_there l =
# Join.go l;
# print_string "I'm not home\n"
# end
# ;;
#
# let f () = new migrant ()
# ;;
#
# Ns.register "f" f vartype
# ;;
val home : Join.location
class migrant (unit) =
method go_home : unit -> unit
method go_there : Join.location -> unit
method hello : unit -> unit
end
val f : unit -> migrant
File "ex9.ml", line 19, characters 18-25:
Warning: VARTYPE replaced by type
( unit -> <go_home:( unit -> unit) ; go_there:( Join.location -> unit) ; hello:( unit -> unit) ; >) metatype
If another runtime looks the function ``f'' up and uses it (or gets
it by other means, as in a message), the object returned may not
work. Similarly, if a location or an object containing a function that
creates some object migrates to another runtime, the object returned
by the call to such a function may not be correct. This is a
limitation of the current implementation.
2.3 Termination, Failures and Failure Recovery
As a matter of fact, some parts of a distributed computation may fail
(e.g., because a machine is abruptly switched off). The simplest
solution would be to abort the whole computation whenever this is
detected, but this is not realistic in case numerous machines are
involved. Rather, we would like our programs to detect such failures
and take adequate measures, such as cleanly report the problem, abort
related parts of the computation, or make another attempt on a
different machine. To this end, JoCaml provides an abstract model of
failure and failure detection expressed in terms of locations:
-
a location can run a primitive process halt () that, when
executed, will atomically halts every process inside of this
location (and recursively every sublocation);
- a location can detect that another location with name there
has halted, using the primitive call fail there; P. When the
process P is triggered, it is guaranteed that location there is
halted.
In the current implementation, halting is detected only when the halt () primitive is issued in the same runtime, or when the runtime
containing the location actually stops. Thus, simply issuing a halt () will not trigger a matching fail in another runtime. This fail
will be triggered when the runtime hosting the watched location
terminates or becomes unreachable.
The semantics of JoCaml guarantees that this is the only reason why
parts of the computation may stop. Now, an executable running the
program P on a fallible machine can be thought as a system-defined
location let loc machine do { P | {crash (); exit 0;} } where
crash may return at any time, with exit 0 terminating the runtime.
In the model, a machine can in particular remotely detect that another
machine has stopped, once it knows the name of a location there. In
practice, it is difficult to provide reliable failure detection, as
this requires further assumptions on the network.
In the prototype implementation, error detection is only partially
implemented, hence there no guarantee that when a runtime terminates
abnormally, the failure of its locations is detected. (Still, detected
failures provide the expected negative guarantees: the failed location
is not visible anymore to any part of the computation.)
Since locations fail only as a whole, the programmer can define
suitable units of failure, and even use the halt/fail primitives
to control the computation. In the current implementation, the control
is not this fine-grained when spanning over several runtimes, and
require to use exit 0/fail primitives. Notice that no silent
recovery mechanism is provided; the programmer must figure out what to
do in case of problems.
2.3.1 Basic examples
To begin with, we use simple examples that attempt to use a port name
say inside of a fallible location to get messages printed. Because
these calls may never return in case the locations stopped, we spawn
them instead of waiting for their completion.
In this first example, location agent can stop at any time. After
the failure occurred, we print some report message. We know for sure
that the first say can only print something before the failure
report, and that the second say cannot print anything.
# let loc agent
# def say s = print_string s; reply
# do { halt (); } ;;
#
# spawn { say "it may work before.\n"; }
# ;;
# spawn { fail agent;
# print_string "the location stopped\n";
# say "it never works after\n"; }
# ;;
val agent : Join.location
val say : string -> unit
-> the location stopped
The following example is more tricky. First, the agent does not halt
itself; however, it migrates within a location that stops and this is
a deadly move. Second, the halt () process can be triggered only from
the outside by a normal message kill (). Thus we know that the first
say always prints its message. Finally, as there is no halt in
location agent, it can only stop because location fallible halted,
so that fail agent; implies that fallible also stopped.
# let loc fallible
# def kill! () = halt ();
# ;;
#
# let loc agent
# def say s = print_string s; reply
# do { go fallible; }
# ;;
#
# spawn { say "it always works.\n"; kill () }
# ;;
# spawn { say "it may work before.\n"; }
# ;;
# spawn { fail agent;
# print_string "both locations stopped.\n";
# say "it never works after.\n"; }
# ;;
val fallible : Join.location
val kill : <<unit>>
val agent : Join.location
val say : string -> unit
-> it always works.
-> it may work before.
-> both locations stopped.
2.3.2 Watching for Failures
We now move on to some more realistic use of failure-detection; we
first consider a function that encapsulates a session with mobility
and potential failures.
There is usually no need to halt locations that completed their task
explicitly (the garbage-collector should take care of them). However,
in some case we would like to be sure that no immigrant location is
still running locally.
Let us assume that job is a remote function within location there
that may create mobile sublocations and migrate them to the caller's
site. To this end, the caller should supply a host location, as in
the previous examples. How can we make sure that job is not using
this location to run other agents after the call completes ? This is
handled using a new temporary location box for each call, and
halting it once the function call has completed.
# let def safe! (job,arg,success,failure) =
#
# let loc box
# def kill! () = halt();
# and start () = reply job (box,arg) in
#
# let def finished! x | running! () = finished x | kill()
# or finished! x | failed! () = success x
# or running! () | failed! () = failure () in
#
# finished (start ()) | running () | fail box; failed ()
# ;;
val safe : <<((Join.location * 'a -> 'b) * 'a * <<'b>> * <<unit>>)>>
Our supervising protocol either returns a result on success, or a
signal on failure. In either case, the message guarantees that no
alien computation may take place afterward.
Initially, there is a message running (), and the control definition
waits for either some result on finished x or some failure detection on
failed (). Whatever its definition is, the job process can create
and move locations inside of the box, and eventually return some value
to the start process within the box. Once this occurs, done
forwards the reply to the control process, and the first join-pattern
is triggered. In this case, the running () message is consumed and
eventually replaced by a failed () message (once the kill () message
is handled, the box gets closed, and the fail guard in the control
process is triggered, releasing a message on failed).
At this stage, we know for sure that no binding or computation
introduced by job remains on the caller's machine, and we can return
the value as if a plain RPC had occurred.
This ``wrapper'' is quite general. Once a location-passing convention
is chosen, the safe function does not depend on the actual
computation performed by job (its arguments, its results, and even
the way it uses locations are parametric here).
We could refine this example further to transform unduly long calls to
job into failure (by sending a failed () message after an external
timeout), to give some more control to the caller (adding an abort
message),...
2.3.3 Recovering from partial failures
We finally come back to distributed loops. We give an example of a
program that uses the CPU of whatever machine is available to compute
the sum 1 + 2 + ... + 999. Basically, we only assume that the
iteration could be computed in any order, we cut the loop in small
chunks, and we distribute a chunk to every available machine. The
program takes care of potential failure. When a machine failes, any
chunk that was distributed to that the machine is taken back and given
to another machine.
# let size = 1000
# let chunk = 200
#
# let def join! (name,there) =
# let loc mobile
# do {
# let def start! (i,finished) =
# let def loop! (u,s) = if u<(i+1)*chunk then loop (u+1,s+u) else finished s in
# loop (i*chunk,0) in
# go there;
# worker (name,mobile,start) } in
# print_string (name^" joins the party\n"); flush stdout;
#
# and job! i | worker! (name,there,start) =
# print_string (name^","^string_of_int(i*chunk)^"\n"); flush stdout;
# let def once! () | finished! s = add s | worker (name,there,start)
# or once! () | failed! () = print_string (name^" went down\n"); job i in
# once () | start (i,finished) | fail there;failed()
#
# and result! (n,s) | add! ds =
# let s' = s + ds in
# if n > 0 then result (n-1,s')
# else {print_string("The sum is "^string_of_int s'^"\n"); exit 0;} ;;
#
# spawn { result (size/chunk-1,0)
# | let def jobs! n = job n | if n>0 then jobs (n-1) in jobs (size/chunk-1) };;
#
# Ns.register "join" join vartype ; Join.server ()
The actual work is performed in the mobile locations, once they
reach the locations there provided by joining machines. Messages
job i partition the work to be done. Each remote computation
concludes either with a finised s or failed () message; in the latter
case, the aborted job is re-issued. The resulting sum is accumulated
as a message on result.
The client is not specific to our computation at all; indeed, its only
contribution is a location where others may place their sublocations.
# let loc worker do {
# let join = Ns.lookup "join" vartype in
# join ("reliable",worker) }
#
In the following, we explicitly model an unreliable task force, as a
variant of the previous program that also has a ``time bomb'' which
eventually stops the joining location:
# let delay = int_of_string(Sys.getenv "DELAY")
# let name = Sys.getenv "NAME"
#
# let loc unreliable_worker do {
# let join = Ns.lookup "join" vartype in
# let def tictac! n = if n = 0 then {exit 0;} else tictac (n-1) in
# join (name,unreliable_worker) | tictac delay }
We start three of them with various parameters, for instance with the
command lines:
DELAY=1000 ; NAME=fallible; ./work.out &
DELAY=2000 ; NAME=dubious ; ./work.out &
DELAY=30000 ; NAME=reliable; ./work.out &
./distributed_loop.out
and we observe the output of the last command (the main loop program):
-> dubious joins the party
-> dubious,0
-> fallible joins the party
-> dubious,200
-> reliable joins the party
-> fallible,400
-> reliable,600
-> dubious,800
-> The sum is 499500