From 9f7e2bd6e2fc99a7d6041205b01cee6c736ce69e Mon Sep 17 00:00:00 2001 From: Sud Date: Fri, 11 Aug 2017 01:42:27 -0700 Subject: [PATCH] Changed sampling algo. Added logger --- logger/.logger.go.swp | Bin 0 -> 12288 bytes logger/logger.go | 111 +++++++++++ syncCluster.go | 448 ++++++++++++++++++++++++------------------ 3 files changed, 367 insertions(+), 192 deletions(-) create mode 100644 logger/.logger.go.swp create mode 100644 logger/logger.go diff --git a/logger/.logger.go.swp b/logger/.logger.go.swp new file mode 100644 index 0000000000000000000000000000000000000000..927872cc1e39642f83db68961bf6c19acd6850d4 GIT binary patch literal 12288 zcmeI2PmB{)9LFC9#fk{gi^0o}f~ITQ>Fz?rT_mCGA8lM(-R`p4crerH>vY)Zy!z(N zmWF^D4bclqP~u6C#+c~Un0PSJ7{f&`DnyNmH%~^?=*{n&nZj&b0wKsjUbCNeI{nS> z{oeQYd%tP3uQ__SdWiN~dl(+u7(4gr_T~4V-OkSKVl0eW`s`I7Q8NlubX7BU0uIZr z=t*&;97+d=Vb6AWa_p&2)U+JYE!$k;18;?2J}NoB#{*SzMcEgw%Vqjvx#H@>8VjR< zQ6N@e3)?+DHmVPI=6BG2_bwV;i~>dhqkvJsC}0#Y3K#{90!D#bNCDN`z@Eo^Z%gMq z{5`!PeLsA?g(NXkH3}F7i~>dhqkvJsC}0#Y3K#{90!9I&!2eDG8~@^8+{jqz4onUI z{~!MTKfjf+Z@?K)2fM%yaPc0-eg*G=7PudLy@jz4!3W?S@HTi8oC2?a7r+T{983WV zl)!^v6W9R$x*NX1_uv9J4?YH`!E4|Y7yt(}K^^P^6W~#>75urGu|L2C@FjQ)yb4Z& zm%&S557-EPyNj{!z_;KF@HzMlya62WG}sFs0XxACK;V}gV`srhFaS+(7#sj&-~q4= z{Cy|l0l$Nb;1lp7=zv);1uEbG*bKhf#MnpREI0#BgV(_`fWYnGXXNN7@FVyEybD&q z0vHEnpz~M+=3x{t3K#`$r~=1X4u3h{mDXIm4a<5;dVy*e+Cp}1RistRvPM|WYm>I| z`6~D6vHdhNKTpS4I|>{s-~)O%*~B$MGg1nfUQMBbGZ#(>Uf`C?Hg<;{J3fMI%jLwD zaAim~b$J-tE+;*ba48wd_I=S0NeOBR3WbF`&UFU7RGF_;YmcuV^pY)uYm8Z&-lUML zwb}am@m7PjxW;(3=}n4vdgkEb6$GB<&B(pRh-2I94>?||Q#nQUha z(gGP8WhBlR>68pDvUAFq#45lh?0d@Tq;S%Cl1 z61u#W_a_lDmTU1gm+>m^Mv5P`76V^6D+LIst&?+e$(eQ5eS=?VD(PY$8c@hpsMG7B zH6(*uG|k&~rtTywuTadydeR`-U>dsJmAuE67QM#lF{M-Rfoq-Z5V=R z9^sCneuqo0qaXIGkiw3L{1!DiW)>*)s!W&C$(76 z+H5{wj8D&nsx|A|Za9JqBv%o-%|oz<3TX`myr1|fj10Rjngm<5B_-#fR>ns6+4MZb z)w4pLW33afh)yFw{KNj;rmD46TS#01NsXjm>?!7Qg&Da$48Xsxj) zluacXV~)VIslbvkvql&()$GzBVUIgr+vACTKC*GCboDZgRZX@t$1&!$etDM5i%>tq zKF_>vPsn&c?2lg2_c`r`OMJyIlkassWh)OcQ53WwpU%^q=kOrpmbPeco;`PI)HE`O zd^>O>?ba2mTn0M69+zD&3}df$(9&*}stZrl7Z(Uh%~xs*)tLs>=V_{5o31WY>ovTa zrAlp?o~+iUi^Q>k8?o@CJ&AiWVWOA?c5S!fkXoL;yEjoIt$zot7jo7>1Y`%4uQaII z$kV|}quMCyfY4*KYssIBJPAk50ogr_;n+BzO>L}HlESLNJzGXw=4!4AK%kZQ%49j!XshL`1rZjGirWY)Wh`RQGL{r)DYm*~- zc-LN|vmMvMAT#B!zC%xYDM;~eI> vNQb%9*&@V91ziF8^rHO8zBK45#3((tqiB3|Y modbefore.") } @@ -228,22 +272,25 @@ func readFlags() { func initSyncLogFile() { + Logger.Info("Init sync log file: %s", recLogFile) if recLogFile == "" { + Logger.Debug("No path for sync file. Returning without initialization. ") return } if _, err := os.Stat(recLogFile); !os.IsNotExist(err) && !syncOnly { - if !overWriteF { - fmt.Println("Record log file already exist. Please remove it: " + recLogFile) - os.Exit(0) - } else { + if removeFiles { + Logger.Info("Remove old Record log file: %s", recLogFile) os.Remove(recLogFile) + } else { + panicOnError(errors.New("Record log file already exist. Use -r or Please remove it: " + recLogFile)) } } // Create file and write header if not synconly if !syncOnly { // create and write header in file + Logger.Info("Create new Record log file: %s", recLogFile) file, err := os.OpenFile(recLogFile, os.O_CREATE|os.O_WRONLY, 0600) if err != nil { panicOnError(err) @@ -266,22 +313,21 @@ func getLogFileHeader(version string, modAfter int64, modBefore int64, ns string hd := headerDel return "version:" + version + hd + "mod_after:" + modAfterString + hd + - "mod_before:" + modBeforeString + hd + "ns:" + ns + hd + "bins:" + binString + + "mod_before:" + modBeforeString + hd + "namespace:" + ns + hd + "bins:" + binString + "\n" + "Action" + hd + "Digest" + hd + "Set" + hd + "Gen" + "\n" } // Init scan, clinet policies func initPolicies() { - scanPolicy = as.NewScanPolicy() - scanPolicy.ConcurrentNodes = true - scanPolicy.Priority = priority - + Logger.Info("Init all client, query policies.") readPolicy = as.NewPolicy() // Get only checksum for record from server + /* if useCksm { readPolicy.ChecksumOnly = true; } + */ srcClientPolicy = as.NewClientPolicy() srcClientPolicy.User = srcUser @@ -290,14 +336,17 @@ func initPolicies() { dstClientPolicy = as.NewClientPolicy() dstClientPolicy.User = dstUser dstClientPolicy.Password = dstPass + + queryPolicy = as.NewQueryPolicy() + queryPolicy.Priority = priority } func getClient(policy *as.ClientPolicy, host string) (*as.Client, error) { + Logger.Info("Connect to host: %s", host) hostInfo := strings.Split(host, ":") if len(hostInfo) < 2 { - err = fmt.Errorf("Wrong host format. it should be (x.x.x.x:yyyy).") - panicOnError(err) + panicOnError(errors.New("Wrong host format. it should be (x.x.x.x:yyyy).")) } ip := hostInfo[0] @@ -312,10 +361,13 @@ func getClient(policy *as.ClientPolicy, host string) (*as.Client, error) { //---------------------------------------------------------------------- // Main func to found records not in sync func findRecordsNotInSync() { - findSyncThread := 2 + Logger.Info("Find records not in sync") wg := new(sync.WaitGroup) + var dstRecordset *as.Recordset = nil - recordInfoChan := make(chan string, 50*findSyncThread) + + // Channel to store unsync record's info, max 100 record at a time + recordInfoChan := make(chan string, 100000) // Open record log file to write found unsync records var file *os.File = nil @@ -327,38 +379,47 @@ func findRecordsNotInSync() { defer file.Close() } - allSetMap := getSetMap(srcClient, namespace) - for setname, v := range allSetMap { + // Get replication factor + replFact := getReplicationFact(srcClient, namespace) + if replFact == 0 { + panicOnError(errors.New("Coundn't get replication factor for NS: " + namespace + ". check Config.")) + } + + // Parsed sets Stats fetched from source aesospike server + allSetStatsMap := getSetMap(srcClient, namespace) + + // Scan records from source and validate them by running + // multiple validation threads + for setname, statsMap := range allSetStatsMap { + if set != "" && set != setname { continue } - nObj := getObjectCount(v, namespace) + + // It gives all objects so divide by repl factor + nObj := getObjectCount(statsMap, namespace) + nObj = nObj / replFact if nObj == 0 { continue } - srcRecordsetArr := getRecordset(srcClient, namespace, setname, nObj, binList, modAfter, modBefore) - + // Update set stats setStats[setname] = &TStats{} - (setStats[setname]).nObj = nObj - (setStats[setname]).nSampleObj = nObj * samplePer / 100 + setStats[setname].nObj = nObj + setStats[setname].nSampleObj = nObj * samplePer / 100 + + srcRecordset := getRecordset(srcClient, namespace, setname, binList, modAfter, modBefore) - // Allocate different record channels to different threads - // Each thread will validate and find few set of records if they are in - // sync or not. + // Run multiple thread to fetch records from queryRecordQueue + // and validate those records to see if they are in sync or not for w := 0; w < findSyncThread; w++ { wg.Add(1) - bucket := (len(srcRecordsetArr) / findSyncThread) + 1 - go func(mul int, setname string) { + go func(setname string) { defer wg.Done() - for i := (bucket * mul); i < (bucket * (mul + 1)); i++ { - if i >= len(srcRecordsetArr) { - break - } - validateAndFind(srcRecordsetArr[i], dstRecordset, recordInfoChan, setname) - } - }(w, setname) + + validateAndFind(srcRecordset, dstRecordset, recordInfoChan, setname) + }(setname) } } @@ -368,9 +429,8 @@ func findRecordsNotInSync() { close(recordInfoChan) }() - // Continue looping if file doesn't exist. + // Continue looping if sync log file doesn't exist. // This has to wait for closing recordInfoChan - for r := range recordInfoChan { if recLogFile == "" { continue @@ -387,6 +447,8 @@ func findRecordsNotInSync() { // Deleted: Records which are deleted in source but not in destination // Note: Its not possible to find deleted unsynced records in A-A topology func validateAndFind(srcRecordset *as.Recordset, dstRecordset *as.Recordset, recordInfoChan chan string, setname string) { + Logger.Info("Thread to fetch and match src and dst records. SET: %s", setname) + Logger.Info("Find Updated, Inserted record if not in sync.") sStat := setStats[setname] L1: for { @@ -394,21 +456,21 @@ L1: select { case srcRec := <-srcRecordset.Records: - // If scan bucket is giving more then sampled object - sStat.recNotInSyncTotal = sStat.recNotInSyncInserted + - sStat.recNotInSyncUpdated + sStat.recNotInSyncDeleted - - if srcRec == nil || (sStat.recNotInSyncTotal >= sStat.nSampleObj) { - break L1 + // Break If scan bucket is giving more then sampled object + if srcRec == nil || (sStat.scanReq >= sStat.nSampleObj) { + Logger.Info("Src, Sample limit reached or No record left to match. SET: %s", setname) + Logger.Info("Src, Scanned records: %s, Sample Size: %s. SET: %s", + strconv.Itoa(sStat.scanReq), + strconv.Itoa(sStat.nSampleObj), setname) + break L1 } + sStat.scanReq++ // TODO: Add LUT check for record. LUT. skip if srcRecord.LUT > Timestamp - dstRec,_ := dstClient.Get(readPolicy, srcRec.Key, binList...) - //panicOnError(err) - // Check if rec exist in dst, - // If not, New record inserted. log it. Add gen = 0 for new rec + //panicOnError(err) + // If rec doesn't exist in dst, it's new insert in src. log it. Add gen = 0 for new rec if dstRec == nil || len(dstRec.Bins) == 0 { var gen uint32 = 0 if dstRec != nil { @@ -416,135 +478,87 @@ L1: } recordInfoChan <- getRecordLogInfoLine(insertedOp, srcRec.Key, gen) - gStat.recNotInSyncInserted++ sStat.recNotInSyncInserted++ + gStat.recNotInSyncInserted++ + Logger.Debug("Record op Insert. gStat_RecNotInSync: %s, setStat_RecNotInSync: %s. SET: %s", + strconv.Itoa(gStat.recNotInSyncInserted), + strconv.Itoa(sStat.recNotInSyncInserted), setname) continue } - // Check if src and dst record match or not. - // If not,record Updated. log this record to copy this into dst + // src and dst record doesn't match. Record Updated. log it. if !reflect.DeepEqual(srcRec.Bins, dstRec.Bins) { - fmt.Println(srcRec) - fmt.Println(dstRec) + fmt.Println("src") + fmt.Println("dst") fmt.Println() recordInfoChan <- getRecordLogInfoLine(updatedOp, srcRec.Key, dstRec.Generation) - gStat.recNotInSyncUpdated++ sStat.recNotInSyncUpdated++ + gStat.recNotInSyncUpdated++ + Logger.Debug("Record op Update. gStat_RecNotInSync: %s, setStat_RecNotInSync: %s. SET: %s", + strconv.Itoa(gStat.recNotInSyncUpdated), + strconv.Itoa(sStat.recNotInSyncUpdated), setname) } case err := <-srcRecordset.Errors: - // if there was an error, stop - // TODO: fix rand to not pick same bucket by two threads. Fixed but - // iterate over a dict - panicOnError(err) + Logger.Debug("Record read error: %s. SET: %s", err.Error(), setname) + sStat.reqErr++ + //fmt.Println(err) continue } } + // Don't go for checking deletes if syncDelete == false { return } + Logger.Info("Find Deleted record if not in sync. SET: %s", setname) L2: for { select { case dstRec := <-dstRecordset.Records: - if dstRec == nil { + // Break If scan bucket is giving more then sampled object + if (dstRec == nil) { + Logger.Info("Dst: No record left to match. SET: %s", setname) break L2 } + sStat.scanReq++ - srcRec, err := srcClient.Get(readPolicy, dstRec.Key, binList...) - panicOnError(err) + srcRec, _ := srcClient.Get(readPolicy, dstRec.Key, binList...) + //panicOnError(err) if srcRec == nil || len(srcRec.Bins) == 0 { - recordInfoChan <- getRecordLogInfoLine(deletedOp, dstRec.Key, dstRec.Generation) - gStat.recNotInSyncDeleted++ sStat.recNotInSyncDeleted++ - fmt.Println(dstRec.Key.SetName()) + gStat.recNotInSyncDeleted++ + Logger.Debug("Record op Delete. gStat_RecNotInSync: %s, setStat_RecNotInSync: %s. SET: %s", + strconv.Itoa(gStat.recNotInSyncDeleted), + strconv.Itoa(sStat.recNotInSyncDeleted), setname) continue } case err := <-srcRecordset.Errors: - // if there was an error, stop - panicOnError(err) + Logger.Debug("Record read error: %s. SET: %s", err.Error(), setname) + sStat.reqErr++ + //fmt.Println(err) + continue } } } -// Scan will Put only RecordQueueSize num of record in channel. This can be -// increased by using scanPolicy.RecordQueueSize. - -// totalObj: total master object for given namespace -// bucketSize: random number (t/20, t/10), t: totalObj -// nSampleBucket: (sampleSize / bucketSize + 1) -// nTotalBucket: (totalObj / bucketSize + 1) -func getRecordset(client *as.Client, ns string, set string, totalObj int, binList []string, modAfter int64, modBefore int64) []*as.Recordset { - - randSeed := time.Now().UnixNano() - sampleSize := totalObj * samplePer / 100 - - s := rand.NewSource(randSeed) - r1 := rand.New(s) - - ranNumMax := sampleSize / 10 - if ranNumMax == 0 { - ranNumMax = sampleSize - } - - // Minimum 1/20th of sample size - // Get fully divisible - bucketSize := r1.Intn(ranNumMax / 2 + 1) + ranNumMax / 2 - - if bucketSize == 0 { - bucketSize = ranNumMax - } - - var nSampleBucket int32 = int32(sampleSize / (bucketSize)) - var nTotalBucket int32 = int32(totalObj / (bucketSize)) - - /* - fmt.Println("SS, BS, nS, nT") - fmt.Println(sampleSize) - fmt.Println(bucketSize) - fmt.Println(nSampleBucket) - fmt.Println(nTotalBucket) - fmt.Println("**********") - */ - - recordsetArr := make([]*as.Recordset, nSampleBucket) - randIdArr := make([]bool, nTotalBucket) - - r2 := rand.New(s) - var i int32 - for i = 0 ; i < nSampleBucket; i++ { +// Scan all records in given timerange +func getRecordset(client *as.Client, ns string, set string, binList []string, modAfter int64, modBefore int64) *as.Recordset { + Logger.Info("Send query and create RecordSet. NS: %s, SET: %s, BINLIST: %s", ns, set, binList) + stm := as.NewStatement(ns, set, binList...) - stm := as.NewStatement(ns, set, binList...) + createTimeRangeStm(stm, modAfter, modBefore) - createTimeRangeStm(stm, modAfter, modBefore) + recordset, err := client.Query(queryPolicy, stm) - // Never generate same id so prevent collide - var id int64 - id = int64(r2.Int31n(nTotalBucket)) - for randIdArr[id] == true { - id = int64(r2.Int31n(nTotalBucket)) - // keep running - } - - randIdArr[id] = true - - stm.SetPredExp( - as.NewPredExpRecDigestModulo(nTotalBucket), - as.NewPredExpIntegerValue(id), - as.NewPredExpIntegerEqual(), - ) - recordset, err := client.Query(nil, stm) - panicOnError(err) - recordsetArr[i] = recordset - } - return recordsetArr + panicOnError(err) + return recordset } @@ -570,14 +584,60 @@ func createTimeRangeStm(stm *as.Statement, modAfter int64, modBefore int64) { } +// Get replication factor, return 0 if stat not present +func getReplicationFact(client *as.Client, ns string) int { + for _, node := range client.GetNodes() { + info, err := requestNodeNamespace(node, ns) + panicOnError(err) + if replFact, ok := info["repl-factor"]; ok { + r, err := strconv.Atoi(replFact) + panicOnError(err) + return r + } + } + return 0 +} + + +// RequestNodeStats returns statistics for the specified node as a map +func requestNodeNamespace(node *as.Node, ns string) (map[string]string, error) { + infoMap, err := as.RequestNodeInfo(node, "namespace/" + ns) + if err != nil { + return nil, err + } + + res := map[string]string{} + + v, exists := infoMap["namespace/" + ns] + if !exists { + return res, nil + } + + values := strings.Split(v, ";") + for i := range values { + kv := strings.Split(values[i], "=") + if len(kv) > 1 { + res[kv[0]] = kv[1] + } + } + + return res, nil +} + + // Request set statistics from AS server and parse it to create a map for sets. // {set: {node1: {setstats...}, node2: {setstats..}}} func getSetMap(client *as.Client, ns string) map[string]map[string]map[string]string { + allSets := map[string]map[string]map[string]string{} + for _, node := range client.GetNodes() { + // map[sets/:ns=test:set=testset:disable-eviction=false; // ns=test:set=bar:objects=10000;] - setInfo, _ := as.RequestNodeInfo(node, "sets/") + + setInfo, err := as.RequestNodeInfo(node, "sets/") + panicOnError(err) setList := strings.Split(setInfo["sets/"], ";") for _, setItem := range(setList) { @@ -601,8 +661,8 @@ func getSetMap(client *as.Client, ns string) map[string]map[string]map[string]st return allSets } -// Compute number of objects for given set -// TODO: check if these are master object + +// Compute number of objects for given set, its total object (including replica) func getObjectCount(setmap map[string]map[string]string, ns string) int { nObj := 0 for _, m := range setmap { @@ -619,13 +679,19 @@ func getObjectCount(setmap map[string]map[string]string, ns string) int { //---------------------------------------------------------------------- // Do Sync and helpers //---------------------------------------------------------------------- + // Main func to sync all records from sync.log file func doSync() { - rdThread := 4 - failedRecChan := make(chan string, 10) - rdChannel := make(chan string, 50) + // Number of threads to read from sunc.log file + rdThread := 100 + // Channel to store record info, which failed to sync + failedRecChan := make(chan string, 100) + // Store rocods fetched from sync.log file + rdChannel := make(chan string, 100) + wg := new(sync.WaitGroup) + // data sync will need sync log file. file, err := os.OpenFile(recLogFile, os.O_APPEND|os.O_RDWR, 0600) if err != nil { panicOnError(err) @@ -645,7 +711,7 @@ func doSync() { } // Track total records synced at staring of each second. - // And upcoming next second time. + // And upcoming next second time. will be used to throttle tps go func() { for { timeWEnd = time.Now().Add(time.Second) @@ -655,7 +721,7 @@ func doSync() { } }() - // Read all record info line in channel buffer. + // Read all record infoLine in channel buffer. go func() { for scanner.Scan() { rdChannel <- scanner.Text() @@ -669,11 +735,13 @@ func doSync() { go validateAndSync(rdChannel, failedRecChan, wg) } + // Wait for all the threads to finish sync go func() { wg.Wait() close(failedRecChan) }() + // Append failed to sync records again in sync log file for r := range failedRecChan { if _, err = file.WriteString(r); err != nil { panicOnError(err) @@ -693,6 +761,7 @@ func validateAndSync(rdChannel <-chan string, failedRecChan chan string, wg *syn // Throttle if TPS exceed limit, sleep for remaining time (1sec - time) calcTotalRecSynced() for tps > 0 && ((gStat.recSyncedTotal - recSyncedTotalOld) > tps) { + fmt.Println("Sleeping... ") time.Sleep(timeWEnd.Sub(time.Now())) } syncPassed := true @@ -724,6 +793,11 @@ func validateAndSync(rdChannel <-chan string, failedRecChan chan string, wg *syn srcRec, err := srcClient.Get(readPolicy, recKey, binList...) //panicOnError(err) + // In sync only case initialize stats + if _, ok := setStats[recKey.SetName()]; !ok { + setStats[recKey.SetName()] = &TStats{} + } + // Delete record, skip if its not recorded in sync.log file if srcRec == nil || len(srcRec.Bins) == 0 { syncPassed = syncDeletedRecord(recKey, op, writePolicy) @@ -859,6 +933,9 @@ func syncInsertedUpdatedRecord(srcRec *as.Record, op string, writePolicy *as.Wri return true } +//---------------------------------------------------------------------------- +// Other helper functions +//---------------------------------------------------------------------------- // Parse rec info line from sync.log file and get as.Key func getKeyFromString(keyString string) (*as.Key, error) { @@ -879,11 +956,11 @@ func calcTotalRecSynced() { // Main stats line printer func printLine(setStatsMeta []string, unsyncStr string, syncStr string) { for _, m := range setStatsMeta { - fmt.Printf("%20s", m) + fmt.Printf("%30s", m) } if !syncOnly { - fmt.Printf("%48s", unsyncStr) + fmt.Printf("%60s", unsyncStr) } if !findOnly { @@ -926,6 +1003,8 @@ func printStat(ns string, set string, stat *TStats) { strconv.Itoa(stat.genErr) + ")" } printLine(setStatsMeta, unsyncStr, syncStr) + + fmt.Println(stat.reqErr) } @@ -933,8 +1012,13 @@ func printStat(ns string, set string, stat *TStats) { func printAllStats() { nObj := 0 nSampleObj := 0 - fmt.Println("\n****** set stats *********") + fmt.Println("\n****** Data Sync Output***") + // Print metainfo + fmt.Println("Modified after: " + modAfterString) + fmt.Println("Modified before: " + modBeforeString) + + fmt.Println("\n****** set stats *********") // Print header metaList := []string{"Namespace", "Set", "Total_Records", "Sampled_Records"} unsyncStr := "Unsync(Total, Updated, Inserted, Deleted)" @@ -992,7 +1076,7 @@ func parseHeaderLine(headerLine string) { modBeforeString = getValidateArg(headerOps[2], "mod_before") modBefore = timeStringToTimestamp(modBeforeString) - namespace = getValidateArg(headerOps[3], "ns") + namespace = getValidateArg(headerOps[3], "namespace") binString = getValidateArg(headerOps[4], "bins") if binString != "" { @@ -1013,8 +1097,7 @@ func getValidateArg(str string, validStr string) string { // Create a line containing op,digest,set,gen info. func getRecordLogInfoLine(op string, key *as.Key, gen uint32) string { - // insertedOp + ":" + srcRec.Key.String() + "\n" - // op:digest:dstRecordGen + // op:digest:setname:dstRecordGen if op == "" { fmt.Println("null op") } @@ -1039,28 +1122,9 @@ func getKeyDegestString(key *as.Key) string { func panicOnError(err error) { if err != nil { + Logger.Error(err.Error()) panic(err) } } -// Not in use currently -func getGlobalStats(gS *TStats, setS map[string]*TStats) { - - for _, statsObj := range setS { - gS.nObj += statsObj.nObj - gS.nSampleObj += statsObj.nSampleObj - - gS.recNotInSyncTotal += statsObj.recNotInSyncTotal - gS.recNotInSyncUpdated += statsObj.recNotInSyncUpdated - gS.recNotInSyncInserted += statsObj.recNotInSyncInserted - gS.recNotInSyncDeleted += statsObj.recNotInSyncDeleted - - gS.recSyncedTotal += statsObj.recSyncedTotal - gS.recSyncedUpdated += statsObj.recSyncedUpdated - gS.recSyncedInserted += statsObj.recSyncedInserted - gS.recSyncedDeleted += statsObj.recSyncedDeleted - - gS.genErr += statsObj.genErr - } -}