   ## 4  Organizing concurrent computations

We first present iterators (type `enum`) over the elements (type `elt`) of a collection (type `t`) .

val start : t -> enum val step : enum -> (elt * enum) option

The function `start` builds an enumerator from a structure of type `t`, while `step` returns the first value built from an enumerator and the enumerator that builds the next values. We illustrate the idea of iterators by an enumerator over an integer interval.

type elt = int and t = int * int and enum = int * int let start c = c let step (n,m) = if n > m then None else Some (n,(n+1,m))

### 4.1  Concurrent iteration

We then define a channel `par_iter` that applies a function `worker` to all the values produced by an enumerator, function calls being performed concurrently.

def par_iter (worker, enum) = match step enum with | None -> 0 | Some (x,next) -> par_iter(worker, next) & begin worker x ; 0 end val par_iter : ((elt -> unit) * enum) Join.chan

If `step enum` does not return a value, then there is nothing to do (`0` is the empty process). Otherwise, a recursive sending to `par_iter` and a call to `worker` are performed concurrently. The construct `worker x ; 0` lifts the expression `worker x` (of type `unit`) into a process.

An example of `par_iter` usage is to print the integers from 1 to 3 in unspecified order:

let print x = print_char '(' ; print_int x ; print_char ')' let _ = spawn par_iter(print, start (1,3))

A possible output is: `(2)(1)(3)`2

To share parallel computations between several “agents”, we introduce the notion of a pool of functions.

type 'a pool = { register: ('a -> unit) Join.chan; compute: 'a -> unit; } let create_pool () = def compute(x) & agent(worker) = worker x ; agent(worker) & reply () to compute in { register=agent; compute=compute; }

A `pool` exports two channels: `register` and `compute`. Viewed from caller side, `register` is for offering computational power, whereas `compute` is for exploiting computational power. The pool implementation matches computing agents and exploiting agents with a straightforward join-pattern.

Internally, available computing agents are messages pending on channel `agent`. Notice that several instances of a given `worker` agent cannot execute concurrently. Namely, a computation can start only when an agent is available, and an agent engaged in a computation returns to available status only when the computation (`worker x`) is over.

To illustrate the use of the pool, we introduce a second function to print integers:

let print_bis x = print_char '<' ; print_int x ; print_char '>'

Then, we create a pool and register the `print` and `print_bis` functions.

let pool = create_pool () let () = spawn (pool.register(print) & pool.register(print_bis))

Finally, by combining `par_iter` and the `compute` component of the pool, we have the integer interval printed by two agents:

let () = spawn par_iter(pool.compute, start (1,3))

A possible output is `(2)<1><3>`. Due to concurrency of distinct agents, another possible output is `(<1>2<)3>`.

### 4.2  Collecting the results of concurrent iteration

In the previous example, `par_iter` is an asynchronous channel. This clearly expresses that it cannot be known when `par_iter` has finished its work. We now wish not to perform side effects, as `print` does, but instead to collect returned values.

More precisely, we aim at building a new kind of pool, with a “fold” function that behaves as `par_iter` as regards concurrency, but additionally returns a combination of results.

type ('a,'b) pool = { register: (elt -> 'a) Join.chan; fold: t -> ('a -> 'b -> 'b) -> 'b -> 'b; }

Interface for exploiting the pool is the `fold` function that, with respect to the previous `par_iter`, takes the combination function and an initial value for the result as extra arguments, and returns a combined result of type `'b`. Interface for offering computational power is `register` as before, but now registered functions return a result of type `'a`.

The new pool is controlled by the following monitor, of which primary job is to collect results.

type ('a,'b) monitor = { enter: unit -> unit; leave: 'a Join.chan; wait: unit -> 'b; finished: unit Join.chan; } let create_monitor combine init = def state(n,r) & enter() = state(n+1,r) & reply () to enter or state(n,r) & leave(v) = state(n-1,combine v r) or state(0,r) & wait() & finished() = reply r to wait in spawn state(0,init) ; { enter=enter ; leave=leave ; wait=wait; finished=finished ; } val create_monitor : ('a -> 'b -> 'b) -> 'b -> ('a, 'b) monitor

A monitor provides four channels: `enter`, `leave`, `finished` and `wait`. Channel `enter` (resp. `leave`) is used by the monitored pool (see below) to signal that a new task starts (resp. ends). The monitored pool will send a message on `finished` when iteration has come to an end. Finally, `wait` is a function that returns the combination of all the results of the monitored tasks.

A monitor has an internal state of which first component `n` counts the number of tasks being computed. This counter is updated with the channels `enter` and `leave`. Additionally, the message on `leave` is a task result to be combined with the second component `r`. The last join-pattern (`state(0,r) & finished()` `& wait()`) states that when there are no more tasks, either active (`state(0,_)`), or to be allocated (`finished()`), then the call to `wait` can be answered.

We now present the pool implementation:

let create_pool () = def loop(monitor,enum) & agent(worker) = match step enum with | Some(x, next) -> monitor.enter() ; loop(monitor,next) & call_worker(monitor, x, worker) | None -> monitor.finished() & agent(worker) and call_worker(monitor,x,worker) = let v = worker(x) in monitor.leave(v) & agent(worker) in let fold x combine init = let monitor = create_monitor combine init in spawn loop(monitor, start x) ; monitor.wait () in { fold=fold ; register=agent ; }

Let us examine the definition of `fold`: it first creates a monitor, then starts the iteration, and finally calls the `wait` function of the monitor.

The `loop`/`agent` definition is essentially a combination of the previous pool implementation and of `par_iter`, with worker calls being put aside for clarity (channel `call_worker`). The combination has the effect that various instances of a given `worker` agent now execute in sequence, following iteration order. Additionally, calls to the monitor are inserted at appropriate places. A remarkable point is that we can be sure that all calls to `monitor.enter` have been performed before the message on `monitor.finished` is sent. This is almost obvious by considering that the recursive sending on `loop` is performed only once the call `monitor.enter()` has returned, by the virtue of the sequencing operator “`;`”. Moreover, the internal counter of the monitor indeed counts active tasks: as “`&`” binds more tightly than “`;`”, the process `call_worker(monitor,x,worker)` executes once `monitor.enter()` has returned, and thus once the monitor counter has been incremented. Similarly, the counter is decremented (by `monitor.leave(v)`) only once the worker has returned.

The new pool is quite powerful, since it can serve as a meeting place between several agents that offer computational power and several agents that exploit it. Let us define the former agents and register them.

let double x = print x; 2*x and double_bis x = print_bis x; x+x let pool = create_pool () let () = spawn (pool.register(double) & pool.register(double_bis))

Exploiting agents are two functions, with different combination behavior.

let sum x = pool.fold x (+) 0 and prod x = pool.fold x ( * ) 1

Finally all agents meet through the pool.

def echo(c,x) = print_char c ; print_int x ; print_char c ; 0 let () = spawn echo('+',sum(1,3)) & echo('*',prod(4,5))

A possible output is `<1>(2)<4>(5)*80*<3>+12+`.

### 4.3  Distributed computations

In JoCaml, concurrent and distributed computations are based on the same model: different programs (abstracted as sites) may communicate by the means of channels. More precisely, site A may send messages on a channel of which definition resides on another site B. In that situation, guarded processes execute on site B. One practical problem in distributed applications is for the communicating partners, first to know one another, and then to have at least a few channels in common. To solve the issue, JoCaml provides library calls to connect sites and a name service, which basically is a repository for values (more specifically channel names) indexed by plain strings.

As an example, this first program runs on machine A.

let pool = create_pool() let () = Join.Ns.register Join.Ns.here "reg" (pool.register: (int -> int) Join.chan) let () = Join.Site.listen (ADDR_INET (Join.Site.get_local_addr(), 12345)) let () = print_int (pool.fold (1,3) (+) 0)

This program creates a pool. Then, by calling `Join.Ns.register`, it stores the channel `pool.register` associated to the name `"reg"` in the local name service (`Join.Ns.here`). Then, `Join.Site.listen` starts to listen for connections on the default Internet address of the local site (`Join.Site.get_local_addr()`) on port `12345`. Finally, the program calls `pool.fold`. This call blocks, since no computing agent has entered the pool yet.

To become such a computing agent, machine B runs the following program.

(* master_addr is the Internet address of A *) let a_site = Join.Site.there (ADDR_INET(master_addr,12345)) let ns = Join.Ns.of_site a_site let register = (Join.Ns.lookup ns "reg": (int -> int) Join.chan) def double(x) = print_char '(' ; print_int x ; print_char ')' ; reply x+x to double let () = spawn register(double)

Here, B first gets the site identity of the program running on A, with the function `Join.Site.there`, and its name service with `Join.Ns.of_site`. Then, it retrieves the (synchronous) channel associated to the key `"reg"`. The name service is not type safe3. For instance, the type of `Join.Ns.lookup` is `Join.Ns.t ->` `string -> 'a`. As a minimal precaution, we insert explicit type constraints. Finally, B defines and registers the synchronous channel `double`. The effect of A calling the registered `double` is the one of a remote function call. Hence, console output is `12` on A and `(1)(2)(3)` on B.

Another issue deserves mention. The program of B above is not complete: as `spawn register(double)` returns immediately, execution goes on. For the program not to terminate by reaching its end, we deadlock it purposely.

let () = def dead() & lock() = reply () to dead in dead()

But B is now blocked for ever, whereas a desirable behavior is for B to be released when A does not need B anymore, or at least when the program running on A terminates.

The function `Join.Site.at_fail` provides a convenient solution. It takes a site A and a channel (of type `unit Join.chan`) as arguments, and returns `()`. When it is detected that A has failed, then a message is sent on the channel. Thus, we replace the code above by:

let () = def wait() & release() = reply () to wait in Join.Site.at_fail a_site release ; wait()

2
Due to abundant concurrency, another possible output is `((2)1)(3)`.
3
A weakness of JoCaml, we agree.   