-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMasterSupportFunctions.go
383 lines (353 loc) · 9.44 KB
/
MasterSupportFunctions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/*
******************************************************
Anything master-related that does not deal with the
actual map reduce master algorithim goes in this file
******************************************************
*/
package mapreduce
import (
"database/sql"
"fmt"
_ "github.com/mattn/go-sqlite3"
"log"
//"time"
"errors"
"net"
"net/http"
"net/rpc"
"os"
"runtime"
"strconv"
"strings"
)
/*
Type declarations for functions
*/
type MapFunction func(key, value string, output chan<- Pair) error
type ReduceFunction func(key string, values <-chan string, output chan<- Pair) error
/*
Log Level constants:
used in Logf
*/
const (
FULL_DEBUG = 0 //shows everything in loops
VARS_DEBUG = 1 //shows all variables
ERRO_DEBUG = 2 //only shows errors
MESSAGES = 3 //messages like "task assigned" "task complete" etc.
SPECIAL_CASE = 4 //rare cases that need to be shown no matter what
)
/*
Choose a desired level using the consts
Any level lower than the global will be printed
*/
func LogF(level int, message string, args ...interface{}) {
if level >= Global_Chat_Level || level == SPECIAL_CASE {
log.Println(fmt.Sprintf(message, args...))
}
}
/*
Used to allow errors to be shown/hidden using
global debug levels without the need to type
---- Logf(VARS_DEBUG, "%v", errorVar)
after every if err != nil
*/
func PrintError(err error) {
if VARS_DEBUG >= Global_Chat_Level {
log.Println(err)
}
}
/*
Special error format:
skip = number of functions to skip in the stack.
eg. stack goes, MasterServer.Create() -> SetServerAddress() -> FormatError()
skip 0 will get SetServerAddress()
skip 1 will get MasterServer.Create()
message and args are standard printf format
*/
func FormatError(skip int, message string, args ...interface{}) error {
message = fmt.Sprintf(message, args...)
callerFunc, callerFile, callerLine, okay := runtime.Caller(skip)
if !okay {
LogF(SPECIAL_CASE, "Could not trace stack in an error report")
return nil
}
functionName := runtime.FuncForPC(callerFunc).Name()
return errors.New(fmt.Sprintf("\n Error ---> %s \n %s\n %s : %d ", functionName, message, callerFile, callerLine))
}
/*
Called in MapReduce.StartMaster()
Handles creating and configuring
a master server struct
*/
func NewMasterServer(Settings Config, Tasks *[]Task) MasterServer {
Global_Chat_Level = Settings.LogLevel
var self MasterServer
self.NumMapTasks = Settings.NumReduceTasks
self.NumReduceTasks = Settings.NumReduceTasks
// max servers is workers plus worker
// this is only used when grabbing an IP
self.MaxServers = Settings.NumMapTasks*2 + Settings.NumReduceTasks*2
self.StartingIP = 3410
self.IsListening = false
self.SetServerAddress(self.StartingIP)
self.Tasks = *Tasks
self.ReduceCount = 0
self.DoneChannel = make(chan int)
self.Table = Settings.TableName
self.Output = Settings.OutputFolderName
self.LogLevel = Settings.LogLevel
self.listen()
return self
}
func Extend(array []interface{}, element interface{}, sizeMultiplier int) []interface{} {
length := len(array)
if length == cap(array) {
// Slice is full; must grow.
// We double its size and add 1, so if the size is zero we still grow.
newArray := make([]interface{}, length, sizeMultiplier*length+1)
copy(newArray, array)
array = newArray
}
array[length] = element
return array
}
/*
Takes an integer representing the desired port
And converts is into a valid address string for
RPC.register()
*/
func PortIntToAddressString(intPort int) (string, error) {
tempPort := ":" + strconv.Itoa(intPort)
stringPort := CheckAddressValidity(tempPort)
if stringPort == "" {
err := errors.New("Failed to convert port to string")
return stringPort, err
}
return stringPort, nil
}
/*
Checks to make sure string is valid IP
If string is just a port, then it will append localhost
*/
func CheckAddressValidity(s string) string {
host, _ := os.Hostname()
IP, _ := net.LookupIP(host)
/*for index, value := range IP {
fmt.Println(index, " ", value.String())
}
fmt.Println("TRYING", IP[2].String())
*/
if strings.HasPrefix(s, ":") {
return IP[2].String() + s
} else if strings.Contains(s, ":") {
return s
} else {
return ""
}
}
/*
Returns whatever
MasterServer.Address is
*/
func (elt *MasterServer) GetServerAddress() string {
return elt.Address
}
/*
Only makes changes to the listener if
this function is called BEFORE MasterServer.create()
*/
func (elt *MasterServer) SetServerAddress(newAddressInt int) error {
if elt.IsListening {
return FormatError(1, "MasterServer already listening on: [%s]", elt.GetServerAddress())
}
tempAddress, err := PortIntToAddressString(newAddressInt)
if err != nil {
return FormatError(1, "Not a valid port number [%d] \n Error: [%v]", newAddressInt, err)
} else {
elt.Address = tempAddress
return nil
}
}
/*
Register the master on rpc
then serve it and listen on the starting port
*/
func (elt *MasterServer) listen() error {
rpc.Register(elt)
rpc.HandleHTTP()
listening := false
nextAddress := 0
var l net.Listener
for !listening {
LogF(VARS_DEBUG, "Trying address: [%s]", elt.GetServerAddress())
nextAddress += 1
listener, err := net.Listen("tcp", elt.GetServerAddress())
if err != nil {
if nextAddress >= elt.MaxServers {
log.Fatal("Map Recuce is full")
}
LogF(ERRO_DEBUG, "%v", err)
//build next IP
if err := elt.SetServerAddress(elt.StartingIP + nextAddress); err != nil {
PrintError(err)
}
} else {
l = listener
listening = true
}
}
LogF(MESSAGES, "Address is: [%s]", elt.GetServerAddress())
go http.Serve(l, nil)
return nil
}
func FindOpenIP(StartingIP int) (openAddress string) {
//Only used for it's functions
var elt MasterServer
var l net.Listener
elt.SetServerAddress(StartingIP)
foundPort := false
nextPort := 0
for !foundPort {
LogF(VARS_DEBUG, "Trying address: [%s]", elt.GetServerAddress())
nextPort += 1
listener, err := net.Listen("tcp", elt.GetServerAddress())
if err != nil {
//build next IP
if err := elt.SetServerAddress(StartingIP + nextPort); err != nil {
PrintError(err)
}
} else {
l = listener
foundPort = true
}
}
l.Close()
openAddress = elt.GetServerAddress()
return openAddress
}
func Merge(ReduceTasks int, reduceFunction ReduceFunction, output string) error {
finalPathname := "final/"
//temp := "tmp/AK47/"
outputPath := fmt.Sprintf("%s/", output)
if runtime.GOOS == "windows" {
finalPathname = "final\\"
outputPath = fmt.Sprintf("%s\\", output)
//temp = "tmp\\AK47\\"
}
os.Mkdir(finalPathname, 0777)
// Combine all the rows into a single input file
sqlCommands := []string{
"create table if not exists data (key text not null, value text not null)",
"create index if not exists data_key on data (key asc, value asc);",
"pragma synchronous = off;",
"pragma journal_mode = off;",
}
for i := 0; i < ReduceTasks; i++ {
LogF(VARS_DEBUG, "Aggregating Reducer Output Files")
db, err := sql.Open("sqlite3", fmt.Sprintf("%sreduce_out_%d.sql", outputPath, i))
if err != nil {
log.Println(err)
continue
}
defer db.Close()
rows, err := db.Query("select key, value from data;")
if err != nil {
fmt.Println(err)
continue
}
defer rows.Close()
for rows.Next() {
var key string
var value string
rows.Scan(&key, &value)
sqlCommands = append(sqlCommands, fmt.Sprintf("insert into data values ('%s', '%s');", key, value))
}
}
enddb, err := sql.Open("sqlite3", outputPath+"end.sql")
for _, sql := range sqlCommands {
_, err = enddb.Exec(sql)
if err != nil {
LogF(ERRO_DEBUG, "%q: %s\n", err, sql)
}
}
enddb.Close()
enddb, err = sql.Open("sqlite3", (outputPath + "end.sql"))
defer enddb.Close()
rows, err := enddb.Query("select key, value from data order by key asc;")
if err != nil {
LogF(ERRO_DEBUG, "sql.Query3\n%v", err)
return err
}
defer rows.Close()
var key string
var value string
rows.Next()
rows.Scan(&key, &value)
inChan := make(chan string)
outChan := make(chan Pair)
go func() {
err = reduceFunction(key, inChan, outChan)
if err != nil {
PrintError(err)
}
}()
inChan <- value
current := key
var outputPairs []Pair
// Walk through the file's rows, performing the reduce func
for rows.Next() {
rows.Scan(&key, &value)
if key == current {
inChan <- value
} else {
close(inChan)
p := <-outChan
outputPairs = append(outputPairs, p)
inChan = make(chan string)
outChan = make(chan Pair)
go func() {
err = reduceFunction(key, inChan, outChan)
if err != nil {
PrintError(err)
}
}()
inChan <- value
current = key
}
}
close(inChan)
p := <-outChan
outputPairs = append(outputPairs, p)
// Prepare tmp database
dbfin, err := sql.Open("sqlite3", fmt.Sprintf("%soutput.sql", finalPathname))
defer dbfin.Close()
if err != nil {
PrintError(FormatError(0, "Failed in opening final output:\n%v", err))
return err
}
sqlCommands = []string{
"create table if not exists data (key text not null, value text not null)",
"create index if not exists data_key on data (key asc, value asc);",
"pragma synchronous = off;",
"pragma journal_mode = off;",
}
for _, sql := range sqlCommands {
_, err = dbfin.Exec(sql)
if err != nil {
LogF(ERRO_DEBUG, "%q: %s\n", err, sql)
return err
}
}
// Write the data locally
for _, outputPair := range outputPairs {
sql := fmt.Sprintf("insert into data values ('%s', '%s');", outputPair.Key, outputPair.Value)
_, err = dbfin.Exec(sql)
if err != nil {
LogF(ERRO_DEBUG, "%q: %s\n", err, sql)
return err
}
}
//os.Remove(outputPath)
//os.Remove(temp)
return nil
}