-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmpi.ml
58 lines (41 loc) · 1.62 KB
/
mpi.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
open Marshal
open Unix
open System
(*******************************************************************************
* Simple message passing interface using channels
* Note: the channel type is a phantom type used to track the
* types of data sent and received throught the channel
*******************************************************************************)
module type S = sig
type ('s, 'r) channel
val spawn : (('r, 's) channel -> 'a -> unit) -> 'a -> ('s, 'r) channel
val send : ('s, 'r) channel -> 's -> unit
val receive : ('s, 'r) channel -> 'r
val wait_die : ('s, 'r) channel -> unit
end
module Mpi : S = struct
type ('s, 'r) channel = {send: out_channel; rcv: in_channel; pid: int}
let spawn f x =
let (pin, cout) = pipe () in
let (cin, pout) = pipe () in
let ppid = Unix.getpid () in
match fork () with
| 0 ->
(close pin; close pout;
f {send = out_channel_of_descr cout; rcv = in_channel_of_descr cin; pid=ppid} x;
Pervasives.exit 0)
| cid ->
(close cin; close cout;
{send=out_channel_of_descr pout; rcv=in_channel_of_descr pin; pid=cid})
let send (a: ('s, 'r) channel) (msg: 's) : unit =
let res = Marshal.to_channel a.send msg [Marshal.Closures] in
flush a.send; res
let receive (a: ('s, 'r) channel) : 'r =
((Marshal.from_channel a.rcv) : 'r)
let wait_die (a: ('s, 'r) channel) : unit =
close_out a.send;
close_in a.rcv;
match waitpid [] a.pid with
| (_,WEXITED 0) -> ()
| _ -> failwith "process failed to terminate in wait_die"
end