We first
present iterators (type enum) over the elements (type elt)
of a collection (type t) [4].
The function start builds an enumerator from a structure of type
t, while step returns the first value built from an
enumerator and the enumerator that builds the next values. We
illustrate the idea of iterators by an enumerator over
an integer interval.
We then define a channel par_iter that applies
a function worker to all the values produced by an enumerator,
function calls being performed concurrently.
If step enum does not return a value,
then there is nothing to do (0 is the empty process).
Otherwise, a recursive sending to
par_iter and a call to worker are performed concurrently.
The construct worker x ; 0 lifts
the expression worker x (of type unit) into a process.
An example of par_iter usage is to print the integers
from 1 to 3 in unspecified order:
A possible output is: (2)(1)(3)2
To share parallel computations between several “agents”, we introduce the notion of a pool of functions.
A pool exports two channels: register and
compute.
Viewed from caller side,
register is for offering computational
power, whereas compute is for exploiting computational power.
The pool implementation matches
computing agents and exploiting agents with
a straightforward join-pattern.
Internally, available computing agents are messages pending on
channel agent.
Notice that several instances of a given worker
agent cannot execute concurrently.
Namely, a computation can start only when
an agent is available, and an agent engaged in a computation
returns to available status only when the computation (worker x)
is over.
To illustrate the use of the pool, we introduce a second function to print integers:
Then, we create a pool and register the print and print_bis
functions.
Finally, by combining par_iter and the compute component
of the pool, we have the integer interval printed by two agents:
A possible output is (2)<1><3>.
Due to concurrency of distinct agents,
another possible output is (<1>2<)3>.
In the previous example, par_iter is an asynchronous
channel. This clearly expresses that it cannot be known
when par_iter has finished its work.
We now wish not to perform side effects, as print
does, but instead to collect returned values.
More precisely, we aim at building a new kind of
pool, with a “fold” function that
behaves as par_iter as regards concurrency, but additionally
returns a combination of results.
Interface for exploiting the pool is the fold function that,
with respect to the previous par_iter,
takes the combination function and an initial value for the result as extra
arguments, and returns a combined result of type 'b.
Interface for
offering computational power is register as before,
but now registered functions return a result of type 'a.
The new pool is controlled by the following monitor, of which primary job is to collect results.
A monitor provides four channels: enter, leave,
finished and wait.
Channel enter (resp. leave)
is used by the monitored pool (see below) to
signal that a new task starts (resp. ends).
The monitored pool will send a message on finished when
iteration has come to an end.
Finally, wait is a function that returns
the combination of all the results of the monitored tasks.
A monitor has an internal state of which
first component n counts the number of tasks being computed.
This counter is updated with the channels enter and leave.
Additionally, the message on leave is a task result to be
combined with the second component r.
The last join-pattern (state(0,r) & finished() & wait()) states
that when there are no more tasks, either active (state(0,_)),
or to be allocated (finished()), then the call to wait can
be answered.
We now present the pool implementation:
Let us examine the definition of fold:
it first creates a monitor, then starts the iteration,
and finally calls the wait function of the monitor.
The loop/agent
definition is essentially a combination of the previous pool
implementation and of par_iter, with worker calls being put
aside for clarity (channel call_worker).
The combination has the effect that
various instances of a given worker agent
now execute in sequence, following iteration order.
Additionally, calls to
the monitor are inserted at appropriate places.
A remarkable point is that we can be sure that all
calls to monitor.enter have been performed before
the message on monitor.finished is sent.
This is almost obvious by considering that the recursive sending
on loop is performed only once the call
monitor.enter() has returned,
by the virtue of the sequencing operator “;”.
Moreover, the internal counter of the monitor
indeed counts active tasks:
as “&” binds more tightly than
“;”, the process call_worker(monitor,x,worker)
executes once monitor.enter() has
returned, and thus once the monitor counter has been incremented.
Similarly, the counter is decremented (by monitor.leave(v)) only
once the worker has returned.
The new pool is quite powerful, since it can serve as a meeting place between several agents that offer computational power and several agents that exploit it. Let us define the former agents and register them.
Exploiting agents are two functions, with different combination behavior.
Finally all agents meet through the pool.
A possible output is <1>(2)<4>(5)*80*<3>+12+.
In JoCaml, concurrent and distributed computations are based on the same model: different programs (abstracted as sites) may communicate by the means of channels. More precisely, site A may send messages on a channel of which definition resides on another site B. In that situation, guarded processes execute on site B. One practical problem in distributed applications is for the communicating partners, first to know one another, and then to have at least a few channels in common. To solve the issue, JoCaml provides library calls to connect sites and a name service, which basically is a repository for values (more specifically channel names) indexed by plain strings.
As an example, this first program runs on machine A.
This program creates a pool. Then, by calling
Join.Ns.register, it stores
the channel pool.register associated to the
name "reg" in the local name service
(Join.Ns.here).
Then, Join.Site.listen starts to listen for
connections on the default Internet address of the local site
(Join.Site.get_local_addr()) on port 12345.
Finally, the program calls pool.fold.
This call blocks, since no computing agent has entered the pool yet.
To become such a computing agent, machine B runs the following program.
Here, B first
gets the site identity of the program running on A,
with the function Join.Site.there, and its name service with
Join.Ns.of_site.
Then, it retrieves the (synchronous) channel associated
to the key "reg".
The name service is not type safe3.
For instance, the type of Join.Ns.lookup is
Join.Ns.t -> string -> 'a. As a minimal precaution,
we insert explicit type constraints.
Finally, B defines and registers the synchronous
channel double.
The effect of A calling the registered double is the
one of a remote function call.
Hence, console output is 12 on A
and (1)(2)(3) on B.
Another issue deserves mention.
The program of B above is not complete:
as spawn register(double) returns immediately,
execution goes on.
For the program not to terminate by reaching its end,
we deadlock it purposely.
But B is now blocked for ever, whereas a desirable behavior is for B to be released when A does not need B anymore, or at least when the program running on A terminates.
The function Join.Site.at_fail provides a convenient solution.
It takes a site A and a channel
(of type unit Join.chan) as arguments,
and returns ().
When it is detected that A has failed, then a message
is sent on the channel. Thus, we replace the code above by:
((2)1)(3).