We first
present iterators (type enum
) over the elements (type elt
)
of a collection (type t
) [4].
The function start
builds an enumerator from a structure of type
t
, while step
returns the first value built from an
enumerator and the enumerator that builds the next values. We
illustrate the idea of iterators by an enumerator over
an integer interval.
We then define a channel par_iter
that applies
a function worker
to all the values produced by an enumerator,
function calls being performed concurrently.
If step enum
does not return a value,
then there is nothing to do (0
is the empty process).
Otherwise, a recursive sending to
par_iter
and a call to worker
are performed concurrently.
The construct worker x ; 0
lifts
the expression worker x
(of type unit
) into a process.
An example of par_iter
usage is to print the integers
from 1 to 3 in unspecified order:
A possible output is: (2)(1)(3)
2
To share parallel computations between several “agents”, we introduce the notion of a pool of functions.
A pool
exports two channels: register
and
compute
.
Viewed from caller side,
register
is for offering computational
power, whereas compute
is for exploiting computational power.
The pool implementation matches
computing agents and exploiting agents with
a straightforward join-pattern.
Internally, available computing agents are messages pending on
channel agent
.
Notice that several instances of a given worker
agent cannot execute concurrently.
Namely, a computation can start only when
an agent is available, and an agent engaged in a computation
returns to available status only when the computation (worker x
)
is over.
To illustrate the use of the pool, we introduce a second function to print integers:
Then, we create a pool and register the print
and print_bis
functions.
Finally, by combining par_iter
and the compute
component
of the pool, we have the integer interval printed by two agents:
A possible output is (2)<1><3>
.
Due to concurrency of distinct agents,
another possible output is (<1>2<)3>
.
In the previous example, par_iter
is an asynchronous
channel. This clearly expresses that it cannot be known
when par_iter
has finished its work.
We now wish not to perform side effects, as print
does, but instead to collect returned values.
More precisely, we aim at building a new kind of
pool, with a “fold” function that
behaves as par_iter
as regards concurrency, but additionally
returns a combination of results.
Interface for exploiting the pool is the fold
function that,
with respect to the previous par_iter
,
takes the combination function and an initial value for the result as extra
arguments, and returns a combined result of type 'b
.
Interface for
offering computational power is register
as before,
but now registered functions return a result of type 'a
.
The new pool is controlled by the following monitor, of which primary job is to collect results.
A monitor provides four channels: enter
, leave
,
finished
and wait
.
Channel enter
(resp. leave
)
is used by the monitored pool (see below) to
signal that a new task starts (resp. ends).
The monitored pool will send a message on finished
when
iteration has come to an end.
Finally, wait
is a function that returns
the combination of all the results of the monitored tasks.
A monitor has an internal state of which
first component n
counts the number of tasks being computed.
This counter is updated with the channels enter
and leave
.
Additionally, the message on leave
is a task result to be
combined with the second component r
.
The last join-pattern (state(0,r) & finished()
& wait()
) states
that when there are no more tasks, either active (state(0,_)
),
or to be allocated (finished()
), then the call to wait
can
be answered.
We now present the pool implementation:
Let us examine the definition of fold
:
it first creates a monitor, then starts the iteration,
and finally calls the wait
function of the monitor.
The loop
/agent
definition is essentially a combination of the previous pool
implementation and of par_iter
, with worker calls being put
aside for clarity (channel call_worker
).
The combination has the effect that
various instances of a given worker
agent
now execute in sequence, following iteration order.
Additionally, calls to
the monitor are inserted at appropriate places.
A remarkable point is that we can be sure that all
calls to monitor.enter
have been performed before
the message on monitor.finished
is sent.
This is almost obvious by considering that the recursive sending
on loop
is performed only once the call
monitor.enter()
has returned,
by the virtue of the sequencing operator “;
”.
Moreover, the internal counter of the monitor
indeed counts active tasks:
as “&
” binds more tightly than
“;
”, the process call_worker(monitor,x,worker)
executes once monitor.enter()
has
returned, and thus once the monitor counter has been incremented.
Similarly, the counter is decremented (by monitor.leave(v)
) only
once the worker has returned.
The new pool is quite powerful, since it can serve as a meeting place between several agents that offer computational power and several agents that exploit it. Let us define the former agents and register them.
Exploiting agents are two functions, with different combination behavior.
Finally all agents meet through the pool.
A possible output is <1>(2)<4>(5)*80*<3>+12+
.
In JoCaml, concurrent and distributed computations are based on the same model: different programs (abstracted as sites) may communicate by the means of channels. More precisely, site A may send messages on a channel of which definition resides on another site B. In that situation, guarded processes execute on site B. One practical problem in distributed applications is for the communicating partners, first to know one another, and then to have at least a few channels in common. To solve the issue, JoCaml provides library calls to connect sites and a name service, which basically is a repository for values (more specifically channel names) indexed by plain strings.
As an example, this first program runs on machine A.
This program creates a pool. Then, by calling
Join.Ns.register
, it stores
the channel pool.register
associated to the
name "reg"
in the local name service
(Join.Ns.here
).
Then, Join.Site.listen
starts to listen for
connections on the default Internet address of the local site
(Join.Site.get_local_addr()
) on port 12345
.
Finally, the program calls pool.fold
.
This call blocks, since no computing agent has entered the pool yet.
To become such a computing agent, machine B runs the following program.
Here, B first
gets the site identity of the program running on A,
with the function Join.Site.there
, and its name service with
Join.Ns.of_site
.
Then, it retrieves the (synchronous) channel associated
to the key "reg"
.
The name service is not type safe3.
For instance, the type of Join.Ns.lookup
is
Join.Ns.t ->
string -> 'a
. As a minimal precaution,
we insert explicit type constraints.
Finally, B defines and registers the synchronous
channel double
.
The effect of A calling the registered double
is the
one of a remote function call.
Hence, console output is 12
on A
and (1)(2)(3)
on B.
Another issue deserves mention.
The program of B above is not complete:
as spawn register(double)
returns immediately,
execution goes on.
For the program not to terminate by reaching its end,
we deadlock it purposely.
But B is now blocked for ever, whereas a desirable behavior is for B to be released when A does not need B anymore, or at least when the program running on A terminates.
The function Join.Site.at_fail
provides a convenient solution.
It takes a site A and a channel
(of type unit Join.chan
) as arguments,
and returns ()
.
When it is detected that A has failed, then a message
is sent on the channel. Thus, we replace the code above by:
((2)1)(3)
.