diff --git a/logger/.logger.go.swp b/logger/.logger.go.swp new file mode 100644 index 0000000..927872c Binary files /dev/null and b/logger/.logger.go.swp differ diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..383bce1 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,111 @@ +// Copyright 2013-2017 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logger + +import ( + "log" + "os" + "sync" +) + +// LogPriority specifies the logging level for the client +type LogPriority int + +const ( + DEBUG LogPriority = iota - 1 + INFO + WARNING + ERR + OFF LogPriority = 999 +) + +type genericLogger interface { + Printf(format string, v ...interface{}) +} + +type logger struct { + Logger genericLogger + + level LogPriority + mutex sync.RWMutex +} + +// Logger is the default logger instance +var Logger = newLogger() + +func newLogger() *logger { + return &logger{ + Logger: log.New(os.Stdout, "", log.LstdFlags), + level: OFF, + } +} + +// SetLogger sets the *log.Logger object where log messages should be sent to. +func (lgr *logger) SetLogger(l genericLogger) { + lgr.mutex.Lock() + defer lgr.mutex.Unlock() + + lgr.Logger = l +} + +// SetLevel sets logging level. Default is ERR. +func (lgr *logger) SetLevel(level LogPriority) { + lgr.mutex.Lock() + defer lgr.mutex.Unlock() + + lgr.level = level +} + +// Error logs a message if log level allows to do so. +func (lgr *logger) LogAtLevel(level LogPriority, format string, v ...interface{}) { + switch level { + case DEBUG: + lgr.Debug(format, v) + case INFO: + lgr.Info(format, v) + case WARNING: + lgr.Warn(format, v) + case ERR: + lgr.Error(format, v) + } +} + +// Debug logs a message if log level allows to do so. +func (lgr *logger) Debug(format string, v ...interface{}) { + if lgr.level <= DEBUG { + lgr.Logger.Printf("DEBUG: " + format, v...) + } +} + +// Info logs a message if log level allows to do so. +func (lgr *logger) Info(format string, v ...interface{}) { + if lgr.level <= INFO { + lgr.Logger.Printf("INFO: " + format, v...) + } +} + +// Warn logs a message if log level allows to do so. +func (lgr *logger) Warn(format string, v ...interface{}) { + if lgr.level <= WARNING { + lgr.Logger.Printf("WARNING: " + format, v...) + } +} + +// Error logs a message if log level allows to do so. +func (lgr *logger) Error(format string, v ...interface{}) { + if lgr.level <= ERR { + lgr.Logger.Printf("ERROR: " + format, v...) + } +} diff --git a/syncCluster.go b/syncCluster.go index 1ab9457..4900c1f 100644 --- a/syncCluster.go +++ b/syncCluster.go @@ -8,26 +8,30 @@ package main import ( as "github.com/aerospike/aerospike-client-go" - "strings" - "strconv" + . "github.com/sud82/aerospike-data-sync/logger" + "bufio" + "bytes" + "encoding/hex" + "errors" "flag" "fmt" + "log" "os" - "time" - "bufio" - "errors" - "sync" - "encoding/hex" "reflect" - "math/rand" + "strings" + "sync" + "strconv" + "time" ) type TStats struct { // Total checked objects nObj int - // Number of Sampled objects + // Num of Sampled objects nSampleObj int + // Track total scanned records + scanReq int // Stats for records found not in sync recNotInSyncTotal int @@ -40,8 +44,12 @@ type TStats struct { recSyncedUpdated int recSyncedInserted int recSyncedDeleted int + // Generation error at (check and write) operation genErr int + // Error returned by server while scaning records + reqErr int + } @@ -89,7 +97,7 @@ var ( syncOnly bool = false useXdr bool = false useCksm bool = false - overWriteF bool = false + removeFiles bool = false // (After-before) timerange options modBefore int64 = time.Now().In(time.UTC).UnixNano() @@ -100,60 +108,91 @@ var ( timeLayout string = "Jan 2, 2006 at 3:04pm (MST)" // AS client related - scanPolicy *as.ScanPolicy = nil - readPolicy *as.BasePolicy = nil - srcClientPolicy *as.ClientPolicy = nil dstClientPolicy *as.ClientPolicy = nil srcClient *as.Client = nil dstClient *as.Client = nil + readPolicy *as.BasePolicy = nil + + queryPolicy *as.QueryPolicy = nil + + findSyncThread = 1 // Global stats to track synced, unsynced records gStat TStats // Track stats for all sets within namespace setStats = map[string]*TStats{} - genErr int = 0 // For threshold, Time window (track time after 1 sec) timeWEnd = time.Now() recSyncedTotalOld int = 0 // Extra data stats, variable - totalObj int = 0 samplePer int = 10 - tps = 0 + tps int = 0 + + logger *log.Logger + ) +func initLogger() { + var buf bytes.Buffer + logger = log.New(&buf, "", log.LstdFlags|log.Lshortfile) + logger.SetOutput(os.Stdout) + logger.Print("Init logger.") + + // Init log file to direct logs to the file + os.MkdirAll("log", os.ModePerm) + logfile, err := os.OpenFile("log/sync.log",os.O_CREATE|os.O_WRONLY|os.O_APPEND,os.ModePerm) + if err == nil { + logger.SetOutput(logfile) + } else { + logger.Print("Failed to log to file, using default stderr") + } + + // Set customzed logger + Logger.SetLogger(logger) + Logger.SetLevel(DEBUG) + +} func main() { + initLogger() + + Logger.Info("****** Starting Aerospike data synchronizer ******") - flag.StringVar(&srcHost, "srcHost", srcHost, "Source host, eg:x.x.x.x:3000\n") - flag.StringVar(&srcUser, "srcUser", srcUser, "Source host User name.\n") - flag.StringVar(&srcPass, "srcPass", srcPass, "Source host Password.\n") - flag.StringVar(&dstHost, "dstHost", dstHost, "Destination host,eg:x.x.x.x:3000\n") - flag.StringVar(&srcUser, "dstUser", srcUser, "Source host User name.\n") - flag.StringVar(&srcPass, "dstPass", srcPass, "Source host Password.\n") + flag.StringVar(&srcHost, "sh", srcHost, "Source host, eg:x.x.x.x:3000\n") + flag.StringVar(&srcUser, "su", srcUser, "Source host User name.\n") + flag.StringVar(&srcPass, "sp", srcPass, "Source host Password.\n") + flag.StringVar(&dstHost, "dh", dstHost, "Destination host,eg:x.x.x.x:3000\n") + flag.StringVar(&srcUser, "du", srcUser, "Destination host User name.\n") + flag.StringVar(&srcPass, "dp", srcPass, "Destination host Password.\n") flag.StringVar(&namespace, "n", namespace, "Aerospike namespace.\n") - flag.StringVar(&set, "s", set, "Aerospike set name.\n") + flag.StringVar(&set, "s", set, "Aerospike set name. Default: All sets in ns.\n") flag.StringVar(&binString, "B", binString, "Bin list: bin1,bin2,bin3...\n") flag.StringVar(&modBeforeString, "b", modBeforeString, "Time before which records modified. eg: Jan 2, 2006 at 3:04pm (MST)\n") flag.StringVar(&modAfterString, "a", modAfterString, "Time after which records modified. eg: Jan 2, 2006 at 3:04pm (MST)\n") flag.StringVar(&recLogFile, "o", recLogFile, "Output File to log records to be synced.\n") - flag.IntVar(&tps, "t", tps, "Throttling limit. will throttle server writes if tps exceed given limit.\n") + //flag.IntVar(&tps, "t", tps, "Throttling limit. will throttle server writes if tps exceed given limit.\n") flag.IntVar(&priorityInt, "f", priorityInt, "The scan priority. 0 (auto), 1(low), 2 (medium), 3 (high). Default: 0.\n") - flag.IntVar(&samplePer, "sp", samplePer, "Sample percentage. Default: 10\n") - flag.BoolVar(&overWriteF, "of", overWriteF, "Overwrite sync log file.") + flag.IntVar(&samplePer, "p", samplePer, "Sample percentage. Default: 10.\n") + flag.IntVar(&findSyncThread, "st", findSyncThread, "Find sync thread. Default: 10.\n") + flag.BoolVar(&removeFiles, "r", removeFiles, "Remove existing sync log file.") flag.BoolVar(&syncDelete, "sd", syncDelete, "Delete synced data also. Warning (Don't use this in active-active topology.)\n") flag.BoolVar(&findOnly, "fo", findOnly, "Tool will just find unsynced data. By default: (find and sync)\n") flag.BoolVar(&syncOnly, "so", syncOnly, "Tool will just sync records using record log file.\n") - flag.BoolVar(&useXdr, "xdr", useXdr, "Use XDR to ship unsynced records.\n") - flag.BoolVar(&useCksm, "c", useCksm, "Compare record checksum.\n") + //flag.BoolVar(&useXdr, "xdr", useXdr, "Use XDR to ship unsynced records.\n") + //flag.BoolVar(&useCksm, "c", useCksm, "Compare record checksum.\n") flag.BoolVar(&verbose, "v", verbose, "Verbose mode\n") flag.BoolVar(&showUsage, "u", showUsage, "Show usage information.\n") readFlags() + Logger.Info("Src: %s, Dst: %s, Namespace: %s, Set: %s, Binlist: %s, ModAfter: %s, ModBefore: %s, RecLogFile: %s, Priority: %s, SamplePer:%s", + srcHost, dstHost, namespace, set, binString, modAfterString, + modBeforeString, recLogFile, strconv.Itoa(priorityInt), strconv.Itoa(samplePer)) + initSyncLogFile() initPolicies() @@ -166,6 +205,7 @@ func main() { if !syncOnly { findRecordsNotInSync() } + // TODO: Currently sync disable if !findOnly { doSync() @@ -176,7 +216,9 @@ func main() { func readFlags() { + Logger.Info("Parsing input arguments.") flag.Parse() + if showUsage { fmt.Println("********** Usage **********") flag.Usage() @@ -206,9 +248,11 @@ func readFlags() { if modAfterString != "" { modAfter = timeStringToTimestamp(modAfterString) } + if modBeforeString != "" { modBefore = timeStringToTimestamp(modBeforeString) } + if modBefore < modAfter { err = errors.New("Timerange incorrect. modafter > modbefore.") } @@ -228,22 +272,25 @@ func readFlags() { func initSyncLogFile() { + Logger.Info("Init sync log file: %s", recLogFile) if recLogFile == "" { + Logger.Debug("No path for sync file. Returning without initialization. ") return } if _, err := os.Stat(recLogFile); !os.IsNotExist(err) && !syncOnly { - if !overWriteF { - fmt.Println("Record log file already exist. Please remove it: " + recLogFile) - os.Exit(0) - } else { + if removeFiles { + Logger.Info("Remove old Record log file: %s", recLogFile) os.Remove(recLogFile) + } else { + panicOnError(errors.New("Record log file already exist. Use -r or Please remove it: " + recLogFile)) } } // Create file and write header if not synconly if !syncOnly { // create and write header in file + Logger.Info("Create new Record log file: %s", recLogFile) file, err := os.OpenFile(recLogFile, os.O_CREATE|os.O_WRONLY, 0600) if err != nil { panicOnError(err) @@ -266,22 +313,21 @@ func getLogFileHeader(version string, modAfter int64, modBefore int64, ns string hd := headerDel return "version:" + version + hd + "mod_after:" + modAfterString + hd + - "mod_before:" + modBeforeString + hd + "ns:" + ns + hd + "bins:" + binString + + "mod_before:" + modBeforeString + hd + "namespace:" + ns + hd + "bins:" + binString + "\n" + "Action" + hd + "Digest" + hd + "Set" + hd + "Gen" + "\n" } // Init scan, clinet policies func initPolicies() { - scanPolicy = as.NewScanPolicy() - scanPolicy.ConcurrentNodes = true - scanPolicy.Priority = priority - + Logger.Info("Init all client, query policies.") readPolicy = as.NewPolicy() // Get only checksum for record from server + /* if useCksm { readPolicy.ChecksumOnly = true; } + */ srcClientPolicy = as.NewClientPolicy() srcClientPolicy.User = srcUser @@ -290,14 +336,17 @@ func initPolicies() { dstClientPolicy = as.NewClientPolicy() dstClientPolicy.User = dstUser dstClientPolicy.Password = dstPass + + queryPolicy = as.NewQueryPolicy() + queryPolicy.Priority = priority } func getClient(policy *as.ClientPolicy, host string) (*as.Client, error) { + Logger.Info("Connect to host: %s", host) hostInfo := strings.Split(host, ":") if len(hostInfo) < 2 { - err = fmt.Errorf("Wrong host format. it should be (x.x.x.x:yyyy).") - panicOnError(err) + panicOnError(errors.New("Wrong host format. it should be (x.x.x.x:yyyy).")) } ip := hostInfo[0] @@ -312,10 +361,13 @@ func getClient(policy *as.ClientPolicy, host string) (*as.Client, error) { //---------------------------------------------------------------------- // Main func to found records not in sync func findRecordsNotInSync() { - findSyncThread := 2 + Logger.Info("Find records not in sync") wg := new(sync.WaitGroup) + var dstRecordset *as.Recordset = nil - recordInfoChan := make(chan string, 50*findSyncThread) + + // Channel to store unsync record's info, max 100 record at a time + recordInfoChan := make(chan string, 100000) // Open record log file to write found unsync records var file *os.File = nil @@ -327,38 +379,47 @@ func findRecordsNotInSync() { defer file.Close() } - allSetMap := getSetMap(srcClient, namespace) - for setname, v := range allSetMap { + // Get replication factor + replFact := getReplicationFact(srcClient, namespace) + if replFact == 0 { + panicOnError(errors.New("Coundn't get replication factor for NS: " + namespace + ". check Config.")) + } + + // Parsed sets Stats fetched from source aesospike server + allSetStatsMap := getSetMap(srcClient, namespace) + + // Scan records from source and validate them by running + // multiple validation threads + for setname, statsMap := range allSetStatsMap { + if set != "" && set != setname { continue } - nObj := getObjectCount(v, namespace) + + // It gives all objects so divide by repl factor + nObj := getObjectCount(statsMap, namespace) + nObj = nObj / replFact if nObj == 0 { continue } - srcRecordsetArr := getRecordset(srcClient, namespace, setname, nObj, binList, modAfter, modBefore) - + // Update set stats setStats[setname] = &TStats{} - (setStats[setname]).nObj = nObj - (setStats[setname]).nSampleObj = nObj * samplePer / 100 + setStats[setname].nObj = nObj + setStats[setname].nSampleObj = nObj * samplePer / 100 + + srcRecordset := getRecordset(srcClient, namespace, setname, binList, modAfter, modBefore) - // Allocate different record channels to different threads - // Each thread will validate and find few set of records if they are in - // sync or not. + // Run multiple thread to fetch records from queryRecordQueue + // and validate those records to see if they are in sync or not for w := 0; w < findSyncThread; w++ { wg.Add(1) - bucket := (len(srcRecordsetArr) / findSyncThread) + 1 - go func(mul int, setname string) { + go func(setname string) { defer wg.Done() - for i := (bucket * mul); i < (bucket * (mul + 1)); i++ { - if i >= len(srcRecordsetArr) { - break - } - validateAndFind(srcRecordsetArr[i], dstRecordset, recordInfoChan, setname) - } - }(w, setname) + + validateAndFind(srcRecordset, dstRecordset, recordInfoChan, setname) + }(setname) } } @@ -368,9 +429,8 @@ func findRecordsNotInSync() { close(recordInfoChan) }() - // Continue looping if file doesn't exist. + // Continue looping if sync log file doesn't exist. // This has to wait for closing recordInfoChan - for r := range recordInfoChan { if recLogFile == "" { continue @@ -387,6 +447,8 @@ func findRecordsNotInSync() { // Deleted: Records which are deleted in source but not in destination // Note: Its not possible to find deleted unsynced records in A-A topology func validateAndFind(srcRecordset *as.Recordset, dstRecordset *as.Recordset, recordInfoChan chan string, setname string) { + Logger.Info("Thread to fetch and match src and dst records. SET: %s", setname) + Logger.Info("Find Updated, Inserted record if not in sync.") sStat := setStats[setname] L1: for { @@ -394,21 +456,21 @@ L1: select { case srcRec := <-srcRecordset.Records: - // If scan bucket is giving more then sampled object - sStat.recNotInSyncTotal = sStat.recNotInSyncInserted + - sStat.recNotInSyncUpdated + sStat.recNotInSyncDeleted - - if srcRec == nil || (sStat.recNotInSyncTotal >= sStat.nSampleObj) { - break L1 + // Break If scan bucket is giving more then sampled object + if srcRec == nil || (sStat.scanReq >= sStat.nSampleObj) { + Logger.Info("Src, Sample limit reached or No record left to match. SET: %s", setname) + Logger.Info("Src, Scanned records: %s, Sample Size: %s. SET: %s", + strconv.Itoa(sStat.scanReq), + strconv.Itoa(sStat.nSampleObj), setname) + break L1 } + sStat.scanReq++ // TODO: Add LUT check for record. LUT. skip if srcRecord.LUT > Timestamp - dstRec,_ := dstClient.Get(readPolicy, srcRec.Key, binList...) - //panicOnError(err) - // Check if rec exist in dst, - // If not, New record inserted. log it. Add gen = 0 for new rec + //panicOnError(err) + // If rec doesn't exist in dst, it's new insert in src. log it. Add gen = 0 for new rec if dstRec == nil || len(dstRec.Bins) == 0 { var gen uint32 = 0 if dstRec != nil { @@ -416,135 +478,87 @@ L1: } recordInfoChan <- getRecordLogInfoLine(insertedOp, srcRec.Key, gen) - gStat.recNotInSyncInserted++ sStat.recNotInSyncInserted++ + gStat.recNotInSyncInserted++ + Logger.Debug("Record op Insert. gStat_RecNotInSync: %s, setStat_RecNotInSync: %s. SET: %s", + strconv.Itoa(gStat.recNotInSyncInserted), + strconv.Itoa(sStat.recNotInSyncInserted), setname) continue } - // Check if src and dst record match or not. - // If not,record Updated. log this record to copy this into dst + // src and dst record doesn't match. Record Updated. log it. if !reflect.DeepEqual(srcRec.Bins, dstRec.Bins) { - fmt.Println(srcRec) - fmt.Println(dstRec) + fmt.Println("src") + fmt.Println("dst") fmt.Println() recordInfoChan <- getRecordLogInfoLine(updatedOp, srcRec.Key, dstRec.Generation) - gStat.recNotInSyncUpdated++ sStat.recNotInSyncUpdated++ + gStat.recNotInSyncUpdated++ + Logger.Debug("Record op Update. gStat_RecNotInSync: %s, setStat_RecNotInSync: %s. SET: %s", + strconv.Itoa(gStat.recNotInSyncUpdated), + strconv.Itoa(sStat.recNotInSyncUpdated), setname) } case err := <-srcRecordset.Errors: - // if there was an error, stop - // TODO: fix rand to not pick same bucket by two threads. Fixed but - // iterate over a dict - panicOnError(err) + Logger.Debug("Record read error: %s. SET: %s", err.Error(), setname) + sStat.reqErr++ + //fmt.Println(err) continue } } + // Don't go for checking deletes if syncDelete == false { return } + Logger.Info("Find Deleted record if not in sync. SET: %s", setname) L2: for { select { case dstRec := <-dstRecordset.Records: - if dstRec == nil { + // Break If scan bucket is giving more then sampled object + if (dstRec == nil) { + Logger.Info("Dst: No record left to match. SET: %s", setname) break L2 } + sStat.scanReq++ - srcRec, err := srcClient.Get(readPolicy, dstRec.Key, binList...) - panicOnError(err) + srcRec, _ := srcClient.Get(readPolicy, dstRec.Key, binList...) + //panicOnError(err) if srcRec == nil || len(srcRec.Bins) == 0 { - recordInfoChan <- getRecordLogInfoLine(deletedOp, dstRec.Key, dstRec.Generation) - gStat.recNotInSyncDeleted++ sStat.recNotInSyncDeleted++ - fmt.Println(dstRec.Key.SetName()) + gStat.recNotInSyncDeleted++ + Logger.Debug("Record op Delete. gStat_RecNotInSync: %s, setStat_RecNotInSync: %s. SET: %s", + strconv.Itoa(gStat.recNotInSyncDeleted), + strconv.Itoa(sStat.recNotInSyncDeleted), setname) continue } case err := <-srcRecordset.Errors: - // if there was an error, stop - panicOnError(err) + Logger.Debug("Record read error: %s. SET: %s", err.Error(), setname) + sStat.reqErr++ + //fmt.Println(err) + continue } } } -// Scan will Put only RecordQueueSize num of record in channel. This can be -// increased by using scanPolicy.RecordQueueSize. - -// totalObj: total master object for given namespace -// bucketSize: random number (t/20, t/10), t: totalObj -// nSampleBucket: (sampleSize / bucketSize + 1) -// nTotalBucket: (totalObj / bucketSize + 1) -func getRecordset(client *as.Client, ns string, set string, totalObj int, binList []string, modAfter int64, modBefore int64) []*as.Recordset { - - randSeed := time.Now().UnixNano() - sampleSize := totalObj * samplePer / 100 - - s := rand.NewSource(randSeed) - r1 := rand.New(s) - - ranNumMax := sampleSize / 10 - if ranNumMax == 0 { - ranNumMax = sampleSize - } - - // Minimum 1/20th of sample size - // Get fully divisible - bucketSize := r1.Intn(ranNumMax / 2 + 1) + ranNumMax / 2 - - if bucketSize == 0 { - bucketSize = ranNumMax - } - - var nSampleBucket int32 = int32(sampleSize / (bucketSize)) - var nTotalBucket int32 = int32(totalObj / (bucketSize)) - - /* - fmt.Println("SS, BS, nS, nT") - fmt.Println(sampleSize) - fmt.Println(bucketSize) - fmt.Println(nSampleBucket) - fmt.Println(nTotalBucket) - fmt.Println("**********") - */ - - recordsetArr := make([]*as.Recordset, nSampleBucket) - randIdArr := make([]bool, nTotalBucket) - - r2 := rand.New(s) - var i int32 - for i = 0 ; i < nSampleBucket; i++ { +// Scan all records in given timerange +func getRecordset(client *as.Client, ns string, set string, binList []string, modAfter int64, modBefore int64) *as.Recordset { + Logger.Info("Send query and create RecordSet. NS: %s, SET: %s, BINLIST: %s", ns, set, binList) + stm := as.NewStatement(ns, set, binList...) - stm := as.NewStatement(ns, set, binList...) + createTimeRangeStm(stm, modAfter, modBefore) - createTimeRangeStm(stm, modAfter, modBefore) + recordset, err := client.Query(queryPolicy, stm) - // Never generate same id so prevent collide - var id int64 - id = int64(r2.Int31n(nTotalBucket)) - for randIdArr[id] == true { - id = int64(r2.Int31n(nTotalBucket)) - // keep running - } - - randIdArr[id] = true - - stm.SetPredExp( - as.NewPredExpRecDigestModulo(nTotalBucket), - as.NewPredExpIntegerValue(id), - as.NewPredExpIntegerEqual(), - ) - recordset, err := client.Query(nil, stm) - panicOnError(err) - recordsetArr[i] = recordset - } - return recordsetArr + panicOnError(err) + return recordset } @@ -570,14 +584,60 @@ func createTimeRangeStm(stm *as.Statement, modAfter int64, modBefore int64) { } +// Get replication factor, return 0 if stat not present +func getReplicationFact(client *as.Client, ns string) int { + for _, node := range client.GetNodes() { + info, err := requestNodeNamespace(node, ns) + panicOnError(err) + if replFact, ok := info["repl-factor"]; ok { + r, err := strconv.Atoi(replFact) + panicOnError(err) + return r + } + } + return 0 +} + + +// RequestNodeStats returns statistics for the specified node as a map +func requestNodeNamespace(node *as.Node, ns string) (map[string]string, error) { + infoMap, err := as.RequestNodeInfo(node, "namespace/" + ns) + if err != nil { + return nil, err + } + + res := map[string]string{} + + v, exists := infoMap["namespace/" + ns] + if !exists { + return res, nil + } + + values := strings.Split(v, ";") + for i := range values { + kv := strings.Split(values[i], "=") + if len(kv) > 1 { + res[kv[0]] = kv[1] + } + } + + return res, nil +} + + // Request set statistics from AS server and parse it to create a map for sets. // {set: {node1: {setstats...}, node2: {setstats..}}} func getSetMap(client *as.Client, ns string) map[string]map[string]map[string]string { + allSets := map[string]map[string]map[string]string{} + for _, node := range client.GetNodes() { + // map[sets/:ns=test:set=testset:disable-eviction=false; // ns=test:set=bar:objects=10000;] - setInfo, _ := as.RequestNodeInfo(node, "sets/") + + setInfo, err := as.RequestNodeInfo(node, "sets/") + panicOnError(err) setList := strings.Split(setInfo["sets/"], ";") for _, setItem := range(setList) { @@ -601,8 +661,8 @@ func getSetMap(client *as.Client, ns string) map[string]map[string]map[string]st return allSets } -// Compute number of objects for given set -// TODO: check if these are master object + +// Compute number of objects for given set, its total object (including replica) func getObjectCount(setmap map[string]map[string]string, ns string) int { nObj := 0 for _, m := range setmap { @@ -619,13 +679,19 @@ func getObjectCount(setmap map[string]map[string]string, ns string) int { //---------------------------------------------------------------------- // Do Sync and helpers //---------------------------------------------------------------------- + // Main func to sync all records from sync.log file func doSync() { - rdThread := 4 - failedRecChan := make(chan string, 10) - rdChannel := make(chan string, 50) + // Number of threads to read from sunc.log file + rdThread := 100 + // Channel to store record info, which failed to sync + failedRecChan := make(chan string, 100) + // Store rocods fetched from sync.log file + rdChannel := make(chan string, 100) + wg := new(sync.WaitGroup) + // data sync will need sync log file. file, err := os.OpenFile(recLogFile, os.O_APPEND|os.O_RDWR, 0600) if err != nil { panicOnError(err) @@ -645,7 +711,7 @@ func doSync() { } // Track total records synced at staring of each second. - // And upcoming next second time. + // And upcoming next second time. will be used to throttle tps go func() { for { timeWEnd = time.Now().Add(time.Second) @@ -655,7 +721,7 @@ func doSync() { } }() - // Read all record info line in channel buffer. + // Read all record infoLine in channel buffer. go func() { for scanner.Scan() { rdChannel <- scanner.Text() @@ -669,11 +735,13 @@ func doSync() { go validateAndSync(rdChannel, failedRecChan, wg) } + // Wait for all the threads to finish sync go func() { wg.Wait() close(failedRecChan) }() + // Append failed to sync records again in sync log file for r := range failedRecChan { if _, err = file.WriteString(r); err != nil { panicOnError(err) @@ -693,6 +761,7 @@ func validateAndSync(rdChannel <-chan string, failedRecChan chan string, wg *syn // Throttle if TPS exceed limit, sleep for remaining time (1sec - time) calcTotalRecSynced() for tps > 0 && ((gStat.recSyncedTotal - recSyncedTotalOld) > tps) { + fmt.Println("Sleeping... ") time.Sleep(timeWEnd.Sub(time.Now())) } syncPassed := true @@ -724,6 +793,11 @@ func validateAndSync(rdChannel <-chan string, failedRecChan chan string, wg *syn srcRec, err := srcClient.Get(readPolicy, recKey, binList...) //panicOnError(err) + // In sync only case initialize stats + if _, ok := setStats[recKey.SetName()]; !ok { + setStats[recKey.SetName()] = &TStats{} + } + // Delete record, skip if its not recorded in sync.log file if srcRec == nil || len(srcRec.Bins) == 0 { syncPassed = syncDeletedRecord(recKey, op, writePolicy) @@ -859,6 +933,9 @@ func syncInsertedUpdatedRecord(srcRec *as.Record, op string, writePolicy *as.Wri return true } +//---------------------------------------------------------------------------- +// Other helper functions +//---------------------------------------------------------------------------- // Parse rec info line from sync.log file and get as.Key func getKeyFromString(keyString string) (*as.Key, error) { @@ -879,11 +956,11 @@ func calcTotalRecSynced() { // Main stats line printer func printLine(setStatsMeta []string, unsyncStr string, syncStr string) { for _, m := range setStatsMeta { - fmt.Printf("%20s", m) + fmt.Printf("%30s", m) } if !syncOnly { - fmt.Printf("%48s", unsyncStr) + fmt.Printf("%60s", unsyncStr) } if !findOnly { @@ -926,6 +1003,8 @@ func printStat(ns string, set string, stat *TStats) { strconv.Itoa(stat.genErr) + ")" } printLine(setStatsMeta, unsyncStr, syncStr) + + fmt.Println(stat.reqErr) } @@ -933,8 +1012,13 @@ func printStat(ns string, set string, stat *TStats) { func printAllStats() { nObj := 0 nSampleObj := 0 - fmt.Println("\n****** set stats *********") + fmt.Println("\n****** Data Sync Output***") + // Print metainfo + fmt.Println("Modified after: " + modAfterString) + fmt.Println("Modified before: " + modBeforeString) + + fmt.Println("\n****** set stats *********") // Print header metaList := []string{"Namespace", "Set", "Total_Records", "Sampled_Records"} unsyncStr := "Unsync(Total, Updated, Inserted, Deleted)" @@ -992,7 +1076,7 @@ func parseHeaderLine(headerLine string) { modBeforeString = getValidateArg(headerOps[2], "mod_before") modBefore = timeStringToTimestamp(modBeforeString) - namespace = getValidateArg(headerOps[3], "ns") + namespace = getValidateArg(headerOps[3], "namespace") binString = getValidateArg(headerOps[4], "bins") if binString != "" { @@ -1013,8 +1097,7 @@ func getValidateArg(str string, validStr string) string { // Create a line containing op,digest,set,gen info. func getRecordLogInfoLine(op string, key *as.Key, gen uint32) string { - // insertedOp + ":" + srcRec.Key.String() + "\n" - // op:digest:dstRecordGen + // op:digest:setname:dstRecordGen if op == "" { fmt.Println("null op") } @@ -1039,28 +1122,9 @@ func getKeyDegestString(key *as.Key) string { func panicOnError(err error) { if err != nil { + Logger.Error(err.Error()) panic(err) } } -// Not in use currently -func getGlobalStats(gS *TStats, setS map[string]*TStats) { - - for _, statsObj := range setS { - gS.nObj += statsObj.nObj - gS.nSampleObj += statsObj.nSampleObj - - gS.recNotInSyncTotal += statsObj.recNotInSyncTotal - gS.recNotInSyncUpdated += statsObj.recNotInSyncUpdated - gS.recNotInSyncInserted += statsObj.recNotInSyncInserted - gS.recNotInSyncDeleted += statsObj.recNotInSyncDeleted - - gS.recSyncedTotal += statsObj.recSyncedTotal - gS.recSyncedUpdated += statsObj.recSyncedUpdated - gS.recSyncedInserted += statsObj.recSyncedInserted - gS.recSyncedDeleted += statsObj.recSyncedDeleted - - gS.genErr += statsObj.genErr - } -}