Previous Contents Next

Chapter 1   Concurrent programming

This part of the manual is a tutorial introduction to JoCaml. This chapter presents small, local examples. Chapter 2 deals with the distributed features. It is assumed that the reader has some previous knowledge of Ocaml.

1.1   Conventions

Examples are given as JoCaml source, followed by the output of the top-level (or of the compiler when prompted to print types). The JoCaml top-level provides an interactive environment, much as the Ocaml top-level.

In order to try the examples, you can either type them in a top-level, launched by the command joctop, or concatenate the sources chunks in some file a.ml then compile a.ml by the command joc -i a.ml, and finally run the produced code by the command ./a.out. (Option -i enables the output of inferred types).

1.2   Basics

Jocaml programs are made of processes and expressions. Roughly, processes are executed asynchronously and produce no result, whereas expressions are evaluated synchronously and their evaluation produces values. For instance, OCaml expressions are JoCaml expressions. Processes communicate by sending messages on channels (a.k.a. port names). Messages carried by channels are made of zero or more values, and channels are values themselves. In contrast with other process calculi (such as the pi-calculus and its derived programming language Pict), channels and the processes that listen on them are defined in a single language construct. This allows considering (and implementing) channels as functions when they have the same usage.

Jocaml programs are first organized as a list of top-level statements. A Top-level statement is a declaration (such as an OCaml binding let x = 1 or a channel binding) or an expression. Top-level statements are terminated by an optional ;; that triggers evaluation in interactive mode.

1.2.1   Simple channel declarations

Channels, or port names, are the main new primitive values of JoCaml. There are two important categories of port names: asynchronous and synchronous port names. Synchronous names return values, whereas asynchronous channels do not.

Users can create new channels with a new kind of let def binding, which should not be confused with the ordinary value let binding. The right hand-side of the definition of a channel a is a process that will be spawned whenever some message is sent on a. Additionally, the contents of messages received on a are bound to formal parameters.

For instance, here is the definition of an asynchronous echo channel:
# let def echo! x = print_int x;
# ;;
val echo : <<int>>
The new channel echo has type <<int>>, which is the type of asynchronous channels carrying values of type int. The presence of ! in the definition of the channel name indicates that this channel is asynchronous. This indication is present only in the channel definition, not when the channel is used. Sending an integer i on echo fires an instance of the garded process print_int i; which prints the integer on the console. Since the Ocaml expression print_int i returns the value (), it is necessary to append a ; that discards this value. echo is asynchronous and it is not possible to know when the actual printing takes place.

The definition of a synchronous print is as follows:
# let def print x = print_int x; reply
# ;;
val print : int -> unit
The type of print is the functional type int -> unit that takes one integer as argument and returns an empty result. However, print is not a function because it is introduced by a let def binding. Since there is no ! at the end of the defined name, print is synchronous, thus it must return a value. The mechanism to return values for synchronous channels is different from the one for functions: it uses a reply construct whose semantics is to send back some (here zero) values as result. This is the first difference with plain OCaml functions, which implicitly return the value of the guarded expression, instead of using the explicit reply. Message sending on print is synchronous, in the sense that one knows that console output has occurred when print returns the answer ().

1.2.2   Processes

As we just saw, synchronous names return values, whereas asynchronous channels do not. Therefore, message sending on synchronous channels occurs inside expressions, as if they were functions, whereas message sending on asynchronous channels occurs inside processes.

Processes are the new core syntactic class of JoCaml. The most basic process sends a message on an asynchronous channel, such as the channel echo just introduced. Since only declarations and expressions are allowed at top-level, processes are turned into expressions by ``spawning'' them : they are introduced by the keyword ``spawn'' followed by a process in curly brackets ``{'' ``}''.
# spawn {echo 1}
# ;;
# 
# spawn {echo 2}
# ;;

-> 12
Processes introduced by ``spawn'' are executed concurrently. The program above may either echo 1 then 2, or echo 2 then 1. Thus, the output above may be 12 or 21, depending on the implementation. Concurrent execution also occurs inside processes, using the parallel composition operator ``|''. This provides a more concise, semantically equivalent, alternative to the previous example:
# spawn {echo 1 | echo 2}
# ;;

-> 21
Composite processes also include conditionals (if's), matching (match's) and local binding (let...in's and let def...in's). Process grouping is done by using curly braces ``{'' and ``}''.
# spawn {
#   let x = 1 in
#   {let y = x+1 in echo y | echo (y+1)} | echo x }
# ;;

-> 132
Once again, the program above may echo the integers 1, 2 and 3 in any order. Grouping is necessary around the process let y = ...in to restrict the scope of y such that its evaluation occurs independently of the process echo x.

1.2.3   Expressions

The other important syntactic class of JoCaml is the class of expressions. By contrast with processes, expressions evaluate to some results.

Expressions can occur at top-level. Expressions also occur in the right-hand side of value bindings or as arguments to message sending. Apart from OCaml expressions, the most basic expression sends some values on a synchronous channel, which behave like OCaml function. Synchronous channels return an answer, which is made of zero or more results, such as the channel print introduced above.
# let x = 1
# ;;
# print x
# ;;
# print (x+1)
# ;;
val x : int
-> 12
In the program above 1, x and x+1 are expressions whose evaluation returns a single value (here an integer). The expressions print x and print (x+1) return empty results, the value (). This makes sense with respect to synchronization: as top-level phrases are evaluated by following textual ordering, the program above first outputs 1 and then outputs 2.

Synchronous channels can be considered as functions, and used as such, for instance in a sequence:
# let x = 1 in
# print x; print (x+1)
# ;;

-> 12
Sequences may also occur inside processes. The general form of a sequence inside a process is expression ;   process , where the result of expression will be discarded. As expression can itself be a sequence, thus one may write:
# spawn
#   { print 1 ; print 2 ; echo 3 }
# ;;

-> 123
A sequence may be terminated by an empty process that does nothing and is denoted by ``'', the empty sequence of characters. Thus, an alternative to the previous example is as follows:
# spawn
#   { print 1 ; print 2 ; print 3 ; }
# ;;

-> 123
This is why print_int x; in the definition of the echo channel is considered as a process.

Concrete syntax for processes and expressions are purposely similar, when not the same. A noticeable exception is grouping, which is expressed by curly braces ``{'' and ``}'' in the case of processes and by ordinary parentheses ``('' and ``)'' in the case of expressions. Since grouping is necessary when there is a sequence or a parallel composition in a branch of an if instruction, the grouping is either if expression then {   process 1 } else {   process 2 } or if expression then (   expression 1 ) else (   expression 2 ), depending on whether the whole if is a process or an expression. The same rule is applied when considering matching.

1.2.4   More on channels

The guarded process in a channel definition can spawn several messages, as in a stuttering echo channel:

# let def echo_twice! x = echo x | echo x
# ;;
val echo_twice : <<int>>
It is also possible to define directly such a channel, without referring to the channel echo, but by using the OCaml function print_int. In this case, it is necessary to enclose each use of print_int in ``{'' and ``}'', as in this new definition of echo_twice:
# let def echo_twice! x = {print_int x;} | {print_int x;}
# ;;
val echo_twice : <<int>>
This ``grouping'' is necessary because | binds more tightly than ;, as in:
# let def echo3! x = print_int x; echo x | echo x
# ;;
val echo3 : <<int>>
Now mixing synchronous and asynchronous calls:
# spawn {echo3 1}
# ;;
# 
# print 2; print 2
# ;;
# 
# spawn {echo_twice 3}
# ;;

-> 2213113
Observe that, since processes execute concurrently, 1221331 would also be a perfectly valid output. Printing both 2's before any 3 is the only constraint here, by the synchronous character of print.

Since synchronous and asynchronous channels have different types, the type-checker flags an error whenever a channel is used in the wrong context.
# spawn {print 1}
# ;;
File "ex14.ml", line 9, characters 7-14:
Expecting an asynchronous channel, but receive int -> unit
Channels are polyadic with respect to both arguments and results (when they exist). For instance, print accepts one argument and returns an empty result. The following channel f has arity two, both for argument and result, as shown by its type.
# let def f (x,y) = reply x+y,y-x
# ;;
val f : int * int -> int * int
As in OCaml, polyadic results are exploited by using polyadic value bindings. For instance the following program should print 3 on the console:
# let x,y = f (1,2)
# ;;
# 
# print_int x
# ;;
val x : int
val y : int
-> 3
Since they have the same type and behave like functions, synchronous names can be used to support a functional programming style. A traditional example is the Fibonacci function.
# let def fib n =
#   if n <= 1 then reply 1
#   else reply fib (n-1) + fib (n-2)
# ;;
# 
# print_int (fib 10)
# ;;
val fib : int -> int
-> 89
In contrast with value bindings, channel definitions are always potentially recursive.

Port names are first-class values in JoCaml. They can be sent as messages on other port names and returned as results. As a result, higher order ``ports'' can be written, such as
# let def twice f =
#   let def r x = reply f (f x) in
#   reply r
# ;;
val twice : ('a -> 'a) -> 'a -> 'a
The type for twice is polymorphic: it includes a type variable 'a that can be replaced by any type. Thus twice is a synchronous channel that takes a synchronous channel (or a function) of type <'a> -> <'a> as argument and returns one result of the same type.

For instance, 'a can be the type of integers or the type of strings (^ is OCaml string concatenation):
# let def succ x = reply x+1
# ;;
# 
# let def double s = reply s^s
# ;;
# 
# let f = twice succ in
# let g = twice double in
# print_int (f 0) ; print_string (g "X")
# ;;
val succ : int -> int
val double : string -> string
-> 2XXXX

1.2.5   Threads

Threads are not part of the JoCaml language, but they are part of the implementation. Threads are some kind of execution units at the implementation level. Threads are created, may suspend then resume, and finally die. Threads match the intuition of a ``direct'' causality. For instance, all the printing actions in a sequence are to be executed one after another and clearly belong to an unique thread. By contrast, the parallel composition operator ``|'' separates two threads that execute independently.

More precisely, the following program creates three threads:
# spawn {
#   {print_int 1 ; print_int 2 ; print_int 3 ;} |
#   { {print_int 4;}
#   | {print_int 5 ; print_int 6 ;}} }
# ;;

-> 123456
The output of the program is a mix of the output of its three threads: 123, 4 and 56. An output such as 12, then 4, then 356 would reveal that the thread printing 1, 2 and 3 has been preempted before printing 3.

Sending messages on channels ultimately fires a new process and should thus create a new thread. However, a new thread is not always created. For instance, compare the following definitions that create infinitely many threads and one, never dying, thread:
# let def rabbit! () = print_string "+" ; rabbit () | rabbit ()
# ;;
# 
# let def forever () = print_string "-" ; forever() ; forever() ; reply
# ;;
val rabbit : <<unit>>
val forever : unit -> 'a
Sending a message on asynchronous rabbit, fires a small thread that terminates spawning two new instances of rabbit, thus creating two other thread. By contrast, sending a message on synchronous forever blocks the sender thread until the thread fired by the message completes. The implementation takes advantage of this: it does not fire a new thread and executes the process guarded by forever () on the sender thread. Here, message sending results in an ordinary function call.

This implementation behavior is exposed by the following program:
# spawn {
#   rabbit () |
#   {print_newline () ; exit 0;} |
#   {forever () ;} }
# ;;

-> 
-> +----------------------------
The program concurrently starts three threads: first, a exit 0 thread that flushes pending output and kills the system; then, two crazy threads, rabbit () and forever ().

Considering processes only, the program may terminate after an unspecified number of -'s and +'s has been printed. Considering threads, more -'s than +'s should appear on the console, since all -'s are printed by the same thread, whereas +'s are printed by different threads. All these printing threads compete for scheduling with the fatal thread and the forever() thread has to be preempted to stop outputting.

As a conclusion, informal reasoning about threads helps predicting program output, taking the implementation into account. What is predicted is likely output and not correct output.

1.3   Modules

The current implementation of JoCaml relies on the same module system as OCaml. Users can create their own modules, compile them separately and link them together into an executable program. For instance, users may write a module stutter, that exports two channels echo and print. Ideally, users first specify the names exported by module stutter and their types, by writing an interface file stutter.mli.
# val echo : <<int>>
# val print : int -> unit
The interface stutter.mli is compiled by issuing the command joc stutter.mli. This produces an object interface file stutter.cmi. Then, the implementation file stutter.ml contains the actual definitions for Stutter.echo and Stutter.print.
# let def echo! x = {print_int x ;} | {print_int x ;}
# 
# let def print x = print_int x ; print_int x ; reply
The implementation file stutter.ml is compiled by issuing the command joc -c stutter.ml (-c is the compile-only, do-not-link option). This produces an object implementation file stutter.cmo.

Now that the module stutter is properly compiled, some other implementation file user.ml can use it.
# Stutter.print 1
# ;;
# 
# spawn {Stutter.echo 2 | Stutter.echo 3}
# ;;
The implementation file user.ml can be compiled into user.cmo by issuing the command joc -c user.ml. This compilation uses the compiled interface stutter.cmi. An executable a.out is produced by the command joc stutter.cmo user.cmo that links the modules stutter and user together. Alternatively, a.out can be produced in one step by the command joc stutter.cmo user.ml.

Running a.out may produce the following output:
-> 113232

1.4   Join-patterns

Join patterns significantly extend port name definitions. A join-pattern defines several ports simultaneously and specifies a synchronization pattern between these co-defined ports. For instance, the following source fragment defines two synchronizing port names fruit and cake:
# let def fruit! f | cake! c =
#   print_string (f^" "^c) ; print_newline () ;
# ;;
val cake : <<string>>
val fruit : <<string>>
To trigger the guarded process print_string (f^" "^c) ; print_newline () ;, messages must be sent on both fruit and cake.
# spawn {fruit "apple" | cake "pie"}
# ;;

-> apple pie
The parallel composition operator ``|'' appears both in join-patterns and in processes. This highlights the kind of synchronization that the pattern matches. Join-definitions such as the one for fruit and cake provide a simple mean to express non-determinism.
# spawn {fruit "apple" | fruit "raspberry" | cake "pie" | cake "crumble"}
# ;;

-> raspberry pie
-> apple crumble
Two cake names must appear on the console, but both combinations of fruits and cakes are correct.

Composite join-definitions can specify several synchronization patterns.
# let def apple! () | pie! () = print_string "apple pie" ;
# or  raspberry! () | pie! () = print_string "raspberry pie" ;
val pie : <<unit>>
val apple : <<unit>>
val raspberry : <<unit>>
Observe that the name pie is defined only once. Thus, pie potentially takes part in two synchronizations. This co-definition is expressed by the keyword or.

Again, internal choice is performed when only one invocation of pie is present:
# spawn {apple () | raspberry () | pie ()}
# ;;

-> raspberry pie
Join-patterns are the programming paradigm for concurrency in JoCaml. They allow the encoding of many concurrent data structures. For instance, the following code defines a counter:
# let def count! n | inc () = count (n+1) | reply to inc
#      or count! n | get () = count n | reply n to get
# ;;
# 
# spawn {count 0}
# ;;
val inc : unit -> unit
val count : <<int>>
val get : unit -> int
This definition calls for two remarks. First, join-pattern may mix synchronous and asynchronous message, but when there are several synchronous message, each reply construct must specify the name to which it replies, using the new reply ... to name construct. In the case where there is a single synchronous name in the pattern, the to construct is optional. For instance, it was not necessary in the previous example.

Second, the usage of the name count above is a typical way of ensuring mutual exclusion. For the moment, assume that there is at most one active invocation on count. When one invocation is active, count holds the counter value as a message and the counter is ready to be incremented or examined. Otherwise, some operation is being performed on the counter and pending operations are postponed until the operation being performed has left the counter in a consistent state. As a consequence, the counter may be used consistently by several threads.
# spawn {{inc () ; inc () ;} | {inc() ;}}
# ;;
# 
# let def wait! () =
#   let x = get () in
#   if x < 3 then wait () else {
#     print_string "three is enough !!!" ; print_newline () ;
#   }
# ;;
# 
# spawn {wait ()}
# ;;
val wait : <<unit>>
-> three is enough !!!
Ensuring the correct counter behavior in the example above requires some programming discipline: only one initial invocation on count has to be made. If there are more than one simultaneous invocations on count, then mutual exclusion is lost. If there is no initial invocation on count, then the counter will not work at all. This can be avoided by making the count, inc and get names local to a create_counter definition and then by exporting inc and get while hiding count, taking advantage of lexical scoping rules.
# let def create_counter () =
#   let def count! n | inc0 () = count (n+1) | reply
#        or count! n | get0 () = count n | reply n in
#   count 0 | reply inc0, get0
# ;;
# 
# let inc,get = create_counter ()
# ;;
val create_counter : unit -> (unit -> unit) * (unit -> int)
val inc : unit -> unit
val get : unit -> int
This programming style is reminiscent of ``object-oriented'' programming: a counter is a thing called an object, it has some internal state (count and its argument), and it exports some methods to the external world (here, inc and get). The constructor create_counter creates a new object, initializes its internal state, and returns the exported methods. As a consequence, several counters may be allocated and used independently.

1.5   Control structures

Join-pattern synchronization can express many common programming paradigms, either concurrent or sequential.

1.5.1   Control structures for concurrency

Locks

Join-pattern synchronization can be used to emulate simple locks:
# let def new_lock () =
#   let def free! () | lock () = reply
#   and unlock () = free () | reply in
#   free () | reply lock, unlock
# ;;
val new_lock : unit -> (unit -> unit) * (unit -> unit)
Threads try to acquire the lock by performing a synchronous call on channel lock. Due to the definition of lock(), this consumes the name free and only one thread can get a response at a time. Another thread that attempts to acquire the lock is blocked until the thread that has the lock releases it by the synchronous call unlock that fires another invocation of free. As in OCaml, it is possible to introduce several bindings with the and keyword. These bindings are recursive.

To give an example of lock usage, we introduce channels that output their string arguments several times:
# 
# let def double p =
#   let def r s = p s ; p s ; reply in
#   reply r
# ;;
# 
# let def print_port s = print_string s ; Thread.delay 0.001; reply
# ;;
# 
# let print16 = double(double(double(double(print_port))))
# ;;
val double : ('a -> 'b) -> 'a -> 'b
val print_port : string -> unit
val print16 : string -> unit
The Thread.delay calls prevents the same thread from running long enough to print all its strings. Now consider two threads, one printing -'s, the other printing +'s.
# spawn {{print16 "-" ;} | {print16 "+" ;}}
# ;;

-> -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
As threads execute concurrently, their outputs may mix, depending upon scheduling.

However, one can use a lock to delimit a critical section and prevent the interleaving of -'s and +'s.
# let lock, unlock = new_lock ()
# ;;
# 
# spawn {
#   {lock () ; print16 "-" ; unlock () ;} |
#   {lock () ; print16 "+" ; unlock () ;}  }
# ;;
# 
val lock : unit -> unit
val unlock : unit -> unit
-> ----------------++++++++++++++++

Barriers

A barrier is another common synchronization mechanism. Basically, barriers define synchronization points in the execution of parallel tasks. Here is a simple barrier that synchronizes two threads:
# let def join1 () | join2 () = reply to join1 | reply to join2
# ;;
val join2 : unit -> unit
val join1 : unit -> unit
The definition above includes two reply constructs, which makes the mention of a port mandatory.

The following two threads print ba or ab between matching parenthesis:
# spawn {
#   {print_string "(" ; join1 () ; print_string "a" ; join1() ;
#    print_string ")" ;} 
#   |
#   {join2 () ; print_string "b" ; join2 () ;} }
# ;;

-> (ab)

Bi-directional channels

Bi-directional channels appear in most process calculi. In the asynchronous pi-calculus, for instance, and for a given channel c, a value v can be sent asynchronously on c (written c![v]) or received from c and bound to some variable x in some guarded process P (written c?x.P). Any process can send and receive on the channels they know. In contrast, A JoCaml process that knows a channel can only send messages on it, whereas a unique channel definition receives all messages. Finally, the scope of a pi-calculus channel name c is defined by the new c in P operator. Such an operator does not exist in JoCaml, since join-definitions are binding constructs.

Nonetheless, bi-directional channels can be defined in JoCaml as follows:
# let def new_pi_channel () =
#   let def send! x | receive () = reply x in
#   reply send, receive
# ;;
val new_pi_channel : unit -> <<'a>> * (unit -> 'a)
A pi-calculus channel is implemented by a join definition with two port names. The port name send is asynchronous and is used to send messages on the channel. Such messages can be received by making a synchronous call to the other port name receive. Let us now ``translate'' the pi-calculus process
new c,d in c![1] | c![2] | c?x.d![x+x] | d?y. print(y))

We get:
# spawn {
#   let sc, rc = new_pi_channel ()
#   and sd, rd = new_pi_channel () in
#   sc 1 |  sc 2 |  {let x = rc () in sd (x+x)} |
#   {let y = rd () in print_int y ;} }
# ;;

-> 2
Synchronous pi-calculus channels are encoded just as easily as asynchronous ones: it suffices to make send synchronous:
# let def new_pi_sync_channel () =
#   let def send x | receive () = reply x to receive | reply to send in
#   reply send, receive
# ;;
val new_pi_sync_channel : unit -> ('a -> unit) * (unit -> 'a)

1.5.2   Loops

Join-patterns are also useful for expressing various programming control structures. Our first examples deal with iterations on an integer interval.

Simple loops

Asynchronous loops can be used when the execution order for the iterated actions is irrelevant, e.g., when these actions are asynchronous.
# let def loop! (a,x) = if x > 0 then {a () | loop (a,x-1)}
# ;;
# 
# let def echo_star! () = print_string "*" ;
# ;;
# 
# spawn {loop (echo_star,5)}
# ;;
val loop : <<(<<unit>> * int)>>
val echo_star : <<unit>>
-> *****
When execution order matters, a sequential loop is preferable:
# let def loop (a,x) = if x > 0 then {a x ; loop (a,x-1) ; reply} else reply
# ;;
# 
# let def print x = print_int(x) ; print_string " " ; reply
# ;;
# 
# loop (print,5)
# ;;
val loop : (int -> 'a) * int -> unit
val print : int -> unit
-> 5 4 3 2 1 
When the loop produces a result, this result can be computed inside reply constructs and accumulated. The following example computes the sum of the squares of the integer between 1 and 32:
# let def sum (i0,f) =
#   let def iter i =
#     if i > 0 then reply (f i) + iter (i-1) else reply 0 in
#   reply iter i0
# ;;
# 
# let def square x = reply x*x
# ;;
val sum : int * (int -> int) -> int
val square : int -> int
# print_int (sum (32,square))
# ;;

-> 11440
Port name definitions such as the one for iter above belong to a functional programming style. In particular, the various iterations of the loop body (computing f i here) never execute concurrently, since iterations are performed one after the other.

However, since integer addition is associative and commutative, the summing order of the various f i does not matters. Thus, asynchronous iteration can be used here, leading to a program with much more opportunities for concurrent execution:
# let def sum (i0,f) = 
# 
#   let def add! dr | total (r,i) =
#     let r' = r+dr in
#     if i > 1 then reply total (r',i-1)
#     else  reply r' in
# 
#   let def loop! i = if i > 0 then {add (f i) | loop (i-1)} in
#   loop i0 | reply total (0,i0)
# ;;
# 
# print_int( sum (32,square))
# ;;
val sum : int * (int -> int) -> int
-> 11440
Observe how the loop result is accumulated using the synchronous name total. The argument in the reply to sum consists in one call to total. This trick enables the synchronous sum to return its result when the asynchronous loop is over. In fact, the current JoCaml language places strong restrictions on the positioning of reply constructs in definitions, and a direct reply to sum from within the sub-definition for add! dr | total (r,i) is rejected by the compiler:
# let def sum (i0,f) = 
# 
#   let def add! dr | total! (r,i) =
#     let r' = r+dr in
#     if i > 1 then total (r',i-1)
#     else  reply r' to sum in
# 
#   let def loop! i = if i > 0 then {add(f(i)) | loop(i-1)} in
#   loop(i0)
# ;;
File "ex45.ml", line 6, characters 10-25:
Reply to channel external from def sum

Distributed loops

Sharing a loop between several ``agents'' requires more work. Let us informally define an agent as some computing unit. In this section, an agent is represented by a synchronous channel. In a more realistic setting, different agents would reside on different computers. The agent paradigm then serves to allocate computing resources (see section 2.3.3 in the next chapter).

For instance, here are two agents square1 and square2. The agent square1 modelizes a fast machine, whereas square2 modelizes a slow machine by computing squares in a uneficient way. Additionally, square1 and square2 differ marginally by their console output: square1 outputs a +, while square2 outputs a * when it starts and a - just before answering.
# let def square1 i = print_string "+" ; reply i*i
# ;;
# 
# let def square2 i =
#    print_string "*" ;
#    let def total! r | wait () = reply r in
#    let def mult! (r,j) =
#      if j <= 0 then total r else mult (r+i,j-1) in
#    mult (0,i) |
#    let r =  wait () in
#    print_string "-" ; reply r
# ;;
val square1 : int -> int
val square2 : int -> int
Sharing a loop between several agents is allocating the iterations to be performed to them. The following channel make_sum, returns a register channel and a wait channel. An agent registers by sending its computing channel on register. The final loop result is returned on wait.
# let def make_sum i0 = 
# 
#   let def add! dr | total i =
#     if i > 1 then reply dr+total(i-1)
#     else reply dr
#   and  wait () = reply total i0 in
# 
#   let def loop! i | register! f =
#     if i > 0 then {add (f i) | register f | loop (i-1) } in
# 
#   loop i0 | reply register, wait
# ;;
val make_sum : int -> <<(int -> int)>> * (unit -> int)
The only difference with the asynchronous sum loop from the previous section resides in the replacement of the definition let def loop! i = ... by the join-pattern definition let def loop! i | register f = .... As a consequence, the agents square1 and square2 may now compete for loop iterations, provided two invocations register square1 and register square2 are active.
# let register, wait = make_sum 32
# ;;
# 
# spawn {register square1 | register square2}
# ;;
# 
# print_int (wait ())
# ;;
val register : <<(int -> int)>>
val wait : unit -> int
-> *+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+----------------11440
The distributed loop above is not satisfactory, since it does not take the relative computing speed of square1 and square2 into account while allocating iterations. The add(f i) jobs are spawned asynchronously, so that the different iterations performed by a given agent are executed concurrently. As a result, the iteration space is partitioned evenly between square1 and square2, as illustrated by the output above. This leads to a poor load balance, since the fast square1 stands idle at the end of the loop, while the slow square2 is overwhelmed.

A better solution is for an an agent to execute its share of work in sequence rather than concurrently. This is achieved by the slightly modified definition for loop i | register f below:
# let def make_sum i0 =
# 
#   let def add! dr | total i =
#     if i > 1 then reply dr+total(i-1)
#     else reply dr in
# 
#   let def wait () = reply total i0 in
# 
#   let def loop! i | register! f =
#     if i > 0 then
#       {loop(i-1) |
#       let r = f i in
#       add r | register f} in
# 
#   loop i0 | reply register, wait
# ;;
val make_sum : int -> <<(int -> int)>> * (unit -> int)
In the new definitions, register f is launched again only once f i is computed. By contrast, loop (i-1) is launched immediately, for another agent to grab the next iteration as soon as possible.
# let register,wait = make_sum 32
# ;;
# 
# spawn {register square1 | register square2}
# ;;
# 
# print_int (wait ())
# ;;
val register : <<(int -> int)>>
val wait : unit -> int
-> *+++++++++++++++++++++++++++++++-11440

1.6   Data structures

To explore the expressive power of message-passing in JoCaml, we now consider the encoding of some data structures. In practice however, one would use the state-of-the-art built-in data structures inherited from OCaml, rather than their Jocaml internal encodings.

1.6.1   Pairs

Polymorphic pairs can be encoded quite easily in JoCaml by taking advantage of port name arity.
# let def create (x,y) =
#   let def get () = reply x,y in
#   reply get
# ;;
val create : 'a * 'b -> unit -> 'a * 'b
As exposed by the type above, a pair is a synchronous port name that takes zero arguments and returns two values. The synchronous name create returns such a name when given the values to store in the pair.

The content of a pair can be retrieved by sending a message of arity zero on it:
# let def fst p = let x,y = p () in reply x
# and snd p = let x,y = p () in reply y
# ;;
val fst : (unit -> 'a * 'b) -> 'a
val snd : (unit -> 'c * 'd) -> 'd
Pairs can now be used in an ``abstract'' fashion, by considering only the constructor create and the two destructors, fst and snd:
# let p = create (1,"four")
# ;;
# print_int (fst p + String.length (snd p))
# ;;
val p : unit -> int * string
-> 5

1.6.2   Encoding data structures in object-oriented style

A convenient programming style for expressing encodings of data structures in JoCaml is the so-called object-oriented style. An object has an internal, hidden, state and exports some methods, (here some synchronous names) that operate on its internal state. Consider for instance the following encoding for lists:
# let def cons (h,t) =
#   let def head () = reply h
#   and tail () = reply t
#   and self () = reply false, head, tail in
#   reply self
# and nil () =
#   let def head () = reply failwith "head of nil"
#   and tail () = reply failwith "tail of nil"
#   and self () = reply true, head, tail in
#   reply self
# ;;
val cons : 'a * 'b -> unit -> bool * (unit -> 'a) * (unit -> 'b)
val nil : unit -> unit -> bool * (unit -> 'c) * (unit -> 'd)
The internal state of a list cell is an emptiness status (true or false) plus, when appropriate, two subcomponents (h and t). The emptiness status is exposed directly to the external world. The two methods head and tail give access to the subcomponents of a non-empty list cell or fail. Observe how the name self is introduced to pack methods together in the reply to cons and nil. Finally, the types for the results of nil and cons are the same. Hence, both kinds of list cells are values of the same type.

Lists can now be used by directly retrieving methods:
# let def list_concat l =
#   let n,h,t = l () in
#   if n then reply ""
#   else reply h()^list_concat(t ())
# ;;
val list_concat :
  (unit -> bool * (unit -> string) * (unit -> 'a) as 'a) -> string
The above type is recursive. This reflects the fact that lists are recursive data structures. More precisely, the type of the cons-cell creator cons was not recursive. However, the recursive type for lists of strings appears naturally when writing a function that traverses lists.

Finally, the following source fragment shows how to create and use string lists:
# let def replicate (elem,n) =
#   if n <= 0 then reply nil ()
#   else reply cons (elem, replicate (elem,n-1))
# ;;
# 
# print_string (list_concat (replicate ("X",16)))
# ;;
val replicate :
  'a * int -> (unit -> bool * (unit -> 'a) * (unit -> 'b) as 'b)
-> XXXXXXXXXXXXXXXX

1.6.3   Mutable data structures

Object states, represented as join-patterns, can be altered by invoking the appropriate methods. Here is a definition for a reference cell. One method (get) examines the content of the cell, while another (put) alters it.
# let def create_ref y0 =
#   let def state! y | get () = state y | reply y
#   or state! y | put new_y = state new_y | reply in
#   state y0 | reply get, put
# ;;
val create_ref : 'a -> (unit -> 'a) * ('a -> unit)
Here, the internal state of a cell is its content, its is stored as a message y on the channel state. Lexical scoping is used to keep the state internal to a given cell.

# let gi, pi = create_ref 0
# and gs, ps = create_ref ""
# ;;
val gi : unit -> int
val pi : int -> unit
val gs : unit -> string
val ps : string -> unit

1.6.4   A concurrent FIFO

We are now ready for a more sophisticated example of data structure encoding in JoCaml. First we define a new kind of list cells. Such a cell always holds an element (x below). When created, it stands in the first position of a list, and a name (see first below) reflecting that status is activated. Then, when another element is cons-ed is front of the list, a cell additionally holds a pointer to this previous element (see the pattern first! () | set_prev prev below). In the end, a cell can be destroyed (see kill below), it then returns both its content, a pointer to the previous cell (when it exists), and a boolean, which is set to true, when the destroyed cell is the only one in the list.
# let def new_cell x =
#   let def first! () | set_prev prev = inside prev | reply
#   or  inside! prev  | kill () = reply x, prev, false
#   or  first! ()     | kill () = reply x, self, true
#   or  self () = reply set_prev, kill in
#   first () | reply self
# ;;      
val new_cell : 'a -> (unit -> ('b -> unit) * (unit -> 'a * 'b * bool) as 'b)
A fifo is a data structure that provides two methods put and get. The method put stores its argument in the fifo, while the method get retrieves one element from the fifo. The internal state of the fifo is either empty (and then the name empty is activated), or it contains some elements (internally, non-empty fifos have the name state activated). The name state holds two arguments: fst is a pointer to the first cell of the element list, while lst is a pointer to the last cell of the element list. Thus, elements stored in the fifo are cons-ed in front of fst (see put below), while retrieved elements are taken from the end of the element list (see get below). There is no empty! () | get () pattern below. As a consequence, an attempt to retrieve an element from an empty fifo is not an error: answering to get is simply postponed until the fifo fills in.

# let def fifo () =
#   let def empty! () | put x =
#     let fst = new_cell x in
#     state (fst,fst) | reply
#   or state! (fst,lst) | put x =
#      let new_fst  = new_cell x in
#      let set, rem = fst () in
#      set new_fst ;
#      state (new_fst,lst) | reply
#   or state! (fst,lst) | get () =
#      let set, rem = lst () in
#      let x, prev, last_cell = rem () in
#      if last_cell then empty () else state (fst,prev) |
#      reply x in
#   empty () | reply put, get
# ;;   
val fifo : unit -> ('a -> unit) * (unit -> 'a)
From the fifo point of view, elements are retrieved in the order they are stored. In a concurrent setting this means that when a thread performs several get in a row, the retrieved elements come in an order that is compatible with the order in which another thread feeds the fifo.
# spawn {
#   let put, get = fifo () in
#   {print_int (get ()) ; print_int (get ()) ; print_newline () ;} |
#   {let x = get () and y = get () in 0;} |
#   {put(1) ; put(2) ; put(3) ; put(4);} }
# ;;

-> 24
Therefore, the program above prints two integers from the set {1, 2, 3, 4} in increasing order.

1.7   A word on typing

The JoCaml type system is derived from the ML type system and it should be no surprise to ML programmers. The key point in typing la ML is parametric polymorphism. For instance, here is a polymorphic identity function:
# let def id x = reply x
# ;;
val id : 'a -> 'a
The type for id contains a type variable ``'a'' that can be instantiated to any type each time id is actually used. Such a type variable is a generalized type variable. For instance, in the following program, variable ``'a'' is instantiated successively to int and string:
# let i = id 1 and s = id "coucou"
# ;;
# 
# print_int i ; print_string s
# ;;
val i : int
val s : string
-> 1coucou
In other words, the first occurrence of id above has type int -> int, while the second has type string -> string. Experienced ML programmers may wonder how JoCaml type system achieves mixing parametric polymorphism and mutable data structures. There is no miracle here. Consider, again, the JoCaml encoding of a reference cell:
# let def state! x | get () = state x | reply x
# or state! x | set new_x = state new_x | reply
# ;;
val get : unit -> '_a
val state : <<'_a>>
val set : '_a -> unit
The type variable ``'_a'' that appears inside the types for state, get and set is prefixed by an underscore ``_''. Such type variables are non-generalized type variables that are instantiated only once. That is, all the occurrences of state must have the same type. Moreover, once ``'_a'' is instantiated with some type, this type replaces ``'_a'' in all the types where ``'_a'' appears (here, the types for get and set). This wide-scope instantiation guarantees that the various port names whose type contains ``'_a'' (state, get and set here) are used consistently.

More specifically, if ``'_a'' is instantiated to some type int, by sending the message 0 on state. Then, the type for get is unit -> int in the rest of the program, as shown by the type for x below. As a consequence, the following program does not type-check and a runtime type-error (printing an integer, while believing it is a string) is avoided:
# let def state! x | get () = state x | reply x
# or state! x | set new_x = state new_x | reply
# ;;
# 
# spawn {state 0}
# ;;
# 
# let x = get ()
# ;;
# 
# print_string x
# ;;
File "ex65.ml", line 13, characters 13-14:
This expression has type int but is here used with type string
Non generalized type variables appear when the type of several co-defined port names share a type variable. Such a type variable is not generalized.
# let def port! p | arg! x = p x
# ;;
val arg : <<'_a>>
val port : <<<<'_a>>>>
A workaround is to encapsulate the faulty names into another port name definition that defines only one name. This restores polymorphism.
# let def make_it () =
#   let def port! p | arg! x = p x in reply port,arg
# ;;
val make_it : unit -> <<<<'a>>>> * <<'a>>
Non-generalized type variables also appear in the types of the identifiers defined by a value binding.
# let p1, a1 = make_it ()
# ;;
# 
# let p2, a2 = make_it ()
# ;;
# 
# let def echo! x = print_int x;
# and echo_string! x = print_string x;
# ;;
# 
# spawn {p1 echo | p2 echo_string  | a1 1 | a2 "coucou"}
# ;;
val p1 : <<<<int>>>>
val a1 : <<int>>
val p2 : <<<<string>>>>
val a2 : <<string>>
val echo : <<int>>
val echo_string : <<string>>
-> coucou1
It is interesting to notice that invoking make_it () twice produces two different sets of port and arg port names, whose types contain different type variables. Thereby, programmers make explicit the different type instantiations that are performed silently by the compiler in the case of generalized type variables.

1.8   Exceptions

Since processes are mapped to several threads at run-time, it is important to specify their behaviours in the presence of exceptions.

Exceptions behave as in OCaml for OCaml expressions. If the exception is not catched in the expression, the behaviour will depend on the synchrony of the process.

If the process is asynchronous, the exception is printed on the standard output and the asynchronous process terminates. No other process is affected.
# spawn { 
#     {failwith "Bye bye";} 
#   | {for i = 1 to 10 do print_string "-" done;} 
# }
# ;;

-> Uncaught exception: Failure("Bye bye")
-> ----------
However, if the process was synchronous, the process waiting for the result of this process will receive the exception, which will be propagated as in an OCaml function.
# let def die () = failwith "die"; reply
# ;;
# 
# try
#   die ()
# with _ -> print_string "dead\n"
# ;;
val die : unit -> 'a
-> dead
Several processes may be waiting for a result as an exception is raised---this is the case for instance when their reply constructs are syntactically guarded by a shared expression that raises the exception. In such cases, the exception is duplicated and thrown at all threads, reversing joins into forks.

# let def a () | b () = failwith "die"; reply to a | reply to b
# ;;
#  
# spawn {
#     { (try a () with _ -> print_string "hello a\n"); }
#   | { (try b () with _ -> print_string "hello b\n"); }
# } ;;
val b : unit -> unit
val a : unit -> unit
-> hello a
-> hello b



Previous Contents Next