diff --git a/pmtiles/makesync.go b/pmtiles/makesync.go index b0753df..cf90d1f 100644 --- a/pmtiles/makesync.go +++ b/pmtiles/makesync.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "context" - "crypto/md5" "fmt" "github.com/dustin/go-humanize" "github.com/schollz/progressbar/v3" @@ -14,8 +13,10 @@ import ( "log" "os" "runtime" + "sort" "strconv" "strings" + "sync" "time" ) @@ -24,6 +25,7 @@ type Block struct { Start uint64 // the start tileID Offset uint64 // the offset in the file, in bytes Length uint64 // the length, in bytes + Data []byte } type Result struct { @@ -31,6 +33,12 @@ type Result struct { Hash uint64 } +type Syncline struct { + Offset uint64 + Length uint64 + Hash uint64 +} + func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error { ctx := context.Background() start := time.Now() @@ -96,21 +104,23 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb defer output.Close() output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version))) - if checksum == "md5" { - localfile, err := os.Open(file) - if err != nil { - panic(err) - } - defer localfile.Close() - reader := bufio.NewReaderSize(localfile, 64*1024*1024) - md5hasher := md5.New() - if _, err := io.Copy(md5hasher, reader); err != nil { - panic(err) - } - md5checksum := md5hasher.Sum(nil) - fmt.Printf("Completed md5 in %v.\n", time.Since(start)) - output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum))) + // open a 2nd copy as a plain file + localfile, err := os.Open(file) + if err != nil { + panic(err) } + defer localfile.Close() + reader := bufio.NewReaderSize(localfile, 64*1024*1024) + + // if checksum == "md5" { + // md5hasher := md5.New() + // if _, err := io.Copy(md5hasher, reader); err != nil { + // panic(err) + // } + // md5checksum := md5hasher.Sum(nil) + // fmt.Printf("Completed md5 in %v.\n", time.Since(start)) + // output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum))) + // } output.Write([]byte("hash=fnv1a\n")) output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes))) @@ -122,64 +132,34 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb var current Block - GetHash := func(offset uint64, length uint64) uint64 { - hasher := fnv.New64a() - r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length)) - if err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(hasher, r); err != nil { - log.Fatal(err) - } - r.Close() - return hasher.Sum64() - } - tasks := make(chan Block, 10000) - intermediate := make(chan Result, 10000) + + var wg sync.WaitGroup + var mu sync.Mutex + items := make(map[uint64]Syncline) errs, _ := errgroup.WithContext(ctx) + // workers for i := 0; i < runtime.GOMAXPROCS(0); i++ { + wg.Add(1) + hasher := fnv.New64a() errs.Go(func() error { for block := range tasks { - intermediate <- Result{block, GetHash(block.Offset, block.Length)} + if _, err := io.Copy(hasher, bytes.NewReader(block.Data)); err != nil { + return err + } + mu.Lock() + items[block.Start] = Syncline{block.Offset, block.Length, hasher.Sum64()} + mu.Unlock() + + hasher.Reset() } + wg.Done() return nil }) } - done := make(chan struct{}) - - go func() { - buffer := make(map[uint64]Result) - nextIndex := uint64(0) - - for i := range intermediate { - buffer[i.Block.Index] = i - - for { - if next, ok := buffer[nextIndex]; ok { - - output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash))) - - delete(buffer, nextIndex) - nextIndex++ - - if next.Block.Offset+next.Block.Length == header.TileDataLength { - close(intermediate) - } - - } else { - break - } - } - } - - done <- struct{}{} - }() - current_index := uint64(0) blocks := 0 @@ -196,7 +176,13 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb panic("Invalid clustering of archive detected - check with verify") } else { if current.Length+uint64(e.Length) > block_size_bytes { - tasks <- Block{current.Index, current.Start, current.Offset, current.Length} + data := make([]byte, current.Length) + _, err := io.ReadFull(reader, data) + if err != nil { + panic(err) + } + + tasks <- Block{current.Index, current.Start, current.Offset, current.Length, data} blocks += 1 current_index += 1 @@ -210,22 +196,35 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb } }) - tasks <- Block{current.Index, current.Start, current.Offset, current.Length} + data := make([]byte, current.Length) + _, err = io.ReadFull(reader, data) + if err != nil { + return err + } + tasks <- Block{current.Index, current.Start, current.Offset, current.Length, data} blocks += 1 close(tasks) - <-done + wg.Wait() + + // serialize the hash + + var keys []uint64 + for k := range items { + keys = append(keys, k) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + for _, k := range keys { + syncline := items[k] + output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash))) + } + fmt.Printf("Created syncfile with %d blocks.\n", blocks) fmt.Printf("Completed makesync in %v.\n", time.Since(start)) return nil } -type Syncline struct { - Offset uint64 - Length uint64 - Hash uint64 -} - func Sync(logger *log.Logger, file string, syncfile string) error { start := time.Now() total_remote_bytes := uint64(0)