Previous Up Next

4  Organizing concurrent computations

We first present iterators (type enum) over the elements (type elt) of a collection (type t) [4].

val start : t -> enum val step : enum -> (elt * enum) option

The function start builds an enumerator from a structure of type t, while step returns the first value built from an enumerator and the enumerator that builds the next values. We illustrate the idea of iterators by an enumerator over an integer interval.

type elt = int and t = int * int and enum = int * int let start c = c let step (n,m) = if n > m then None else Some (n,(n+1,m))

4.1  Concurrent iteration

We then define a channel par_iter that applies a function worker to all the values produced by an enumerator, function calls being performed concurrently.

def par_iter (worker, enum) = match step enum with | None -> 0 | Some (x,next) -> par_iter(worker, next) & begin worker x ; 0 end val par_iter : ((elt -> unit) * enum) Join.chan

If step enum does not return a value, then there is nothing to do (0 is the empty process). Otherwise, a recursive sending to par_iter and a call to worker are performed concurrently. The construct worker x ; 0 lifts the expression worker x (of type unit) into a process.

An example of par_iter usage is to print the integers from 1 to 3 in unspecified order:

let print x = print_char '(' ; print_int x ; print_char ')' let _ = spawn par_iter(print, start (1,3))

A possible output is: (2)(1)(3)2

To share parallel computations between several “agents”, we introduce the notion of a pool of functions.

type 'a pool = { register: ('a -> unit) Join.chan; compute: 'a -> unit; } let create_pool () = def compute(x) & agent(worker) = worker x ; agent(worker) & reply () to compute in { register=agent; compute=compute; }

A pool exports two channels: register and compute. Viewed from caller side, register is for offering computational power, whereas compute is for exploiting computational power. The pool implementation matches computing agents and exploiting agents with a straightforward join-pattern.

Internally, available computing agents are messages pending on channel agent. Notice that several instances of a given worker agent cannot execute concurrently. Namely, a computation can start only when an agent is available, and an agent engaged in a computation returns to available status only when the computation (worker x) is over.

To illustrate the use of the pool, we introduce a second function to print integers:

let print_bis x = print_char '<' ; print_int x ; print_char '>'

Then, we create a pool and register the print and print_bis functions.

let pool = create_pool () let () = spawn (pool.register(print) & pool.register(print_bis))

Finally, by combining par_iter and the compute component of the pool, we have the integer interval printed by two agents:

let () = spawn par_iter(pool.compute, start (1,3))

A possible output is (2)<1><3>. Due to concurrency of distinct agents, another possible output is (<1>2<)3>.

4.2  Collecting the results of concurrent iteration

In the previous example, par_iter is an asynchronous channel. This clearly expresses that it cannot be known when par_iter has finished its work. We now wish not to perform side effects, as print does, but instead to collect returned values.

More precisely, we aim at building a new kind of pool, with a “fold” function that behaves as par_iter as regards concurrency, but additionally returns a combination of results.

type ('a,'b) pool = { register: (elt -> 'a) Join.chan; fold: t -> ('a -> 'b -> 'b) -> 'b -> 'b; }

Interface for exploiting the pool is the fold function that, with respect to the previous par_iter, takes the combination function and an initial value for the result as extra arguments, and returns a combined result of type 'b. Interface for offering computational power is register as before, but now registered functions return a result of type 'a.

The new pool is controlled by the following monitor, of which primary job is to collect results.

type ('a,'b) monitor = { enter: unit -> unit; leave: 'a Join.chan; wait: unit -> 'b; finished: unit Join.chan; } let create_monitor combine init = def state(n,r) & enter() = state(n+1,r) & reply () to enter or state(n,r) & leave(v) = state(n-1,combine v r) or state(0,r) & wait() & finished() = reply r to wait in spawn state(0,init) ; { enter=enter ; leave=leave ; wait=wait; finished=finished ; } val create_monitor : ('a -> 'b -> 'b) -> 'b -> ('a, 'b) monitor

A monitor provides four channels: enter, leave, finished and wait. Channel enter (resp. leave) is used by the monitored pool (see below) to signal that a new task starts (resp. ends). The monitored pool will send a message on finished when iteration has come to an end. Finally, wait is a function that returns the combination of all the results of the monitored tasks.

A monitor has an internal state of which first component n counts the number of tasks being computed. This counter is updated with the channels enter and leave. Additionally, the message on leave is a task result to be combined with the second component r. The last join-pattern (state(0,r) & finished() & wait()) states that when there are no more tasks, either active (state(0,_)), or to be allocated (finished()), then the call to wait can be answered.

We now present the pool implementation:

let create_pool () = def loop(monitor,enum) & agent(worker) = match step enum with | Some(x, next) -> monitor.enter() ; loop(monitor,next) & call_worker(monitor, x, worker) | None -> monitor.finished() & agent(worker) and call_worker(monitor,x,worker) = let v = worker(x) in monitor.leave(v) & agent(worker) in let fold x combine init = let monitor = create_monitor combine init in spawn loop(monitor, start x) ; monitor.wait () in { fold=fold ; register=agent ; }

Let us examine the definition of fold: it first creates a monitor, then starts the iteration, and finally calls the wait function of the monitor.

The loop/agent definition is essentially a combination of the previous pool implementation and of par_iter, with worker calls being put aside for clarity (channel call_worker). The combination has the effect that various instances of a given worker agent now execute in sequence, following iteration order. Additionally, calls to the monitor are inserted at appropriate places. A remarkable point is that we can be sure that all calls to monitor.enter have been performed before the message on monitor.finished is sent. This is almost obvious by considering that the recursive sending on loop is performed only once the call monitor.enter() has returned, by the virtue of the sequencing operator “;”. Moreover, the internal counter of the monitor indeed counts active tasks: as “&” binds more tightly than “;”, the process call_worker(monitor,x,worker) executes once monitor.enter() has returned, and thus once the monitor counter has been incremented. Similarly, the counter is decremented (by monitor.leave(v)) only once the worker has returned.

The new pool is quite powerful, since it can serve as a meeting place between several agents that offer computational power and several agents that exploit it. Let us define the former agents and register them.

let double x = print x; 2*x and double_bis x = print_bis x; x+x let pool = create_pool () let () = spawn (pool.register(double) & pool.register(double_bis))

Exploiting agents are two functions, with different combination behavior.

let sum x = pool.fold x (+) 0 and prod x = pool.fold x ( * ) 1

Finally all agents meet through the pool.

def echo(c,x) = print_char c ; print_int x ; print_char c ; 0 let () = spawn echo('+',sum(1,3)) & echo('*',prod(4,5))

A possible output is <1>(2)<4>(5)*80*<3>+12+.

4.3  Distributed computations

In JoCaml, concurrent and distributed computations are based on the same model: different programs (abstracted as sites) may communicate by the means of channels. More precisely, site A may send messages on a channel of which definition resides on another site B. In that situation, guarded processes execute on site B. One practical problem in distributed applications is for the communicating partners, first to know one another, and then to have at least a few channels in common. To solve the issue, JoCaml provides library calls to connect sites and a name service, which basically is a repository for values (more specifically channel names) indexed by plain strings.

As an example, this first program runs on machine A.

let pool = create_pool() let () = Join.Ns.register "reg" (pool.register: (int -> int) Join.chan) let () = Join.Site.listen (ADDR_INET (Join.Site.get_local_addr(), 12345)) let () = print_int (pool.fold (1,3) (+) 0)

This program creates a pool. Then, by calling Join.Ns.register, it stores the channel pool.register associated to the name "reg" in the local name service ( Then, Join.Site.listen starts to listen for connections on the default Internet address of the local site (Join.Site.get_local_addr()) on port 12345. Finally, the program calls pool.fold. This call blocks, since no computing agent has entered the pool yet.

To become such a computing agent, machine B runs the following program.

(* master_addr is the Internet address of A *) let a_site = Join.Site.there (ADDR_INET(master_addr,12345)) let ns = Join.Ns.of_site a_site let register = (Join.Ns.lookup ns "reg": (int -> int) Join.chan) def double(x) = print_char '(' ; print_int x ; print_char ')' ; reply x+x to double let () = spawn register(double)

Here, B first gets the site identity of the program running on A, with the function Join.Site.there, and its name service with Join.Ns.of_site. Then, it retrieves the (synchronous) channel associated to the key "reg". The name service is not type safe3. For instance, the type of Join.Ns.lookup is Join.Ns.t -> string -> 'a. As a minimal precaution, we insert explicit type constraints. Finally, B defines and registers the synchronous channel double. The effect of A calling the registered double is the one of a remote function call. Hence, console output is 12 on A and (1)(2)(3) on B.

Another issue deserves mention. The program of B above is not complete: as spawn register(double) returns immediately, execution goes on. For the program not to terminate by reaching its end, we deadlock it purposely.

let () = def dead() & lock() = reply () to dead in dead()

But B is now blocked for ever, whereas a desirable behavior is for B to be released when A does not need B anymore, or at least when the program running on A terminates.

The function Join.Site.at_fail provides a convenient solution. It takes a site A and a channel (of type unit Join.chan) as arguments, and returns (). When it is detected that A has failed, then a message is sent on the channel. Thus, we replace the code above by:

let () = def wait() & release() = reply () to wait in Join.Site.at_fail a_site release ; wait()

Due to abundant concurrency, another possible output is ((2)1)(3).
A weakness of JoCaml, we agree.

Previous Up Next