-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmr_skel.erl
111 lines (76 loc) · 1.94 KB
/
mr_skel.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
%%%-------------------------------------------------------------------
%%% @author Ken Friis Larsen <[email protected]>
%%% @copyright (C) 2011, Ken Friis Larsen
%%% Created : Oct 2011 by Ken Friis Larsen <[email protected]>
%%%-------------------------------------------------------------------
-module(mr).
-export([start/1, stop/1, job/5]).
%%%% Interface
start(N) ->
{Reducer, Mappers} = init(N),
{ok, spawn(fun() -> coordinator_loop(Reducer, Mappers) end)}.
stop(Pid) -> ....
job(CPid, MapFun, RedFun, RedInit, Data) -> ....
%%%% Internal implementation
init(N) -> ....
%% synchronous communication
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
reply(From, Msg) ->
From ! {self(), Msg}.
reply_ok(From) ->
reply(From, ok).
reply_ok(From, Msg) ->
reply(From, {ok, Msg}).
%% asynchronous communication
async(Pid, Msg) ->
Pid ! Msg.
stop_async(Pid) ->
async(Pid, stop).
data_async(Pid, D) ->
async(Pid, {data, D}).
%%% Coordinator
coordinator_loop(Reducer, Mappers) ->
receive
{From, stop} ->
io:format("~p stopping~n", [self()]),
lists:foreach(fun stop_async/1, Mappers),
stop_async(Reducer),
reply_ok(From);
....
end.
send_data(Mappers, Data) ->
send_loop(Mappers, Mappers, Data).
send_loop(Mappers, [Mid|Queue], [D|Data]) ->
data_async(Mid, D),
send_loop(Mappers, Queue, Data);
send_loop(_, _, []) -> ok;
send_loop(Mappers, [], Data) ->
send_loop(Mappers, Mappers, Data).
%%% Reducer
reducer_loop() ->
receive
stop ->
io:format("Reducer ~p stopping~n", [self()]),
ok;
....
end.
gather_data_from_mappers(Fun, Acc, Missing) ->
receive
...
end.
%%% Mapper
mapper_loop(Reducer, Fun) ->
receive
stop ->
io:format("Mapper ~p stopping~n", [self()]),
ok;
....
Unknown ->
io:format("unknown message: ~p~n",[Unknown]),
mapper_loop(Reducer, Fun)
end.