Skip to content

Commit

Permalink
wip on faster makesync
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Jan 3, 2024
1 parent 87ba271 commit 2ed75c2
Showing 1 changed file with 69 additions and 70 deletions.
139 changes: 69 additions & 70 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"crypto/md5"
"fmt"
"github.com/dustin/go-humanize"
"github.com/schollz/progressbar/v3"
Expand All @@ -14,8 +13,10 @@ import (
"log"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
)

Expand All @@ -24,13 +25,20 @@ type Block struct {
Start uint64 // the start tileID
Offset uint64 // the offset in the file, in bytes
Length uint64 // the length, in bytes
Data []byte
}

type Result struct {
Block Block
Hash uint64
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error {
ctx := context.Background()
start := time.Now()
Expand Down Expand Up @@ -96,21 +104,23 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
defer output.Close()
output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version)))

if checksum == "md5" {
localfile, err := os.Open(file)
if err != nil {
panic(err)
}
defer localfile.Close()
reader := bufio.NewReaderSize(localfile, 64*1024*1024)
md5hasher := md5.New()
if _, err := io.Copy(md5hasher, reader); err != nil {
panic(err)
}
md5checksum := md5hasher.Sum(nil)
fmt.Printf("Completed md5 in %v.\n", time.Since(start))
output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum)))
// open a 2nd copy as a plain file
localfile, err := os.Open(file)
if err != nil {
panic(err)
}
defer localfile.Close()
reader := bufio.NewReaderSize(localfile, 64*1024*1024)

// if checksum == "md5" {
// md5hasher := md5.New()
// if _, err := io.Copy(md5hasher, reader); err != nil {
// panic(err)
// }
// md5checksum := md5hasher.Sum(nil)
// fmt.Printf("Completed md5 in %v.\n", time.Since(start))
// output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum)))
// }

output.Write([]byte("hash=fnv1a\n"))
output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes)))
Expand All @@ -122,64 +132,34 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb

var current Block

GetHash := func(offset uint64, length uint64) uint64 {
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}

tasks := make(chan Block, 10000)
intermediate := make(chan Result, 10000)

var wg sync.WaitGroup
var mu sync.Mutex
items := make(map[uint64]Syncline)

errs, _ := errgroup.WithContext(ctx)

// workers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
wg.Add(1)
hasher := fnv.New64a()
errs.Go(func() error {
for block := range tasks {
intermediate <- Result{block, GetHash(block.Offset, block.Length)}
if _, err := io.Copy(hasher, bytes.NewReader(block.Data)); err != nil {
return err
}
mu.Lock()
items[block.Start] = Syncline{block.Offset, block.Length, hasher.Sum64()}
mu.Unlock()

hasher.Reset()
}
wg.Done()
return nil
})
}

done := make(chan struct{})

go func() {
buffer := make(map[uint64]Result)
nextIndex := uint64(0)

for i := range intermediate {
buffer[i.Block.Index] = i

for {
if next, ok := buffer[nextIndex]; ok {

output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))

delete(buffer, nextIndex)
nextIndex++

if next.Block.Offset+next.Block.Length == header.TileDataLength {
close(intermediate)
}

} else {
break
}
}
}

done <- struct{}{}
}()

current_index := uint64(0)

blocks := 0
Expand All @@ -196,7 +176,13 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
panic("Invalid clustering of archive detected - check with verify")
} else {
if current.Length+uint64(e.Length) > block_size_bytes {
tasks <- Block{current.Index, current.Start, current.Offset, current.Length}
data := make([]byte, current.Length)
_, err := io.ReadFull(reader, data)
if err != nil {
panic(err)
}

tasks <- Block{current.Index, current.Start, current.Offset, current.Length, data}
blocks += 1

current_index += 1
Expand All @@ -210,22 +196,35 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
}
})

tasks <- Block{current.Index, current.Start, current.Offset, current.Length}
data := make([]byte, current.Length)
_, err = io.ReadFull(reader, data)
if err != nil {
return err
}
tasks <- Block{current.Index, current.Start, current.Offset, current.Length, data}
blocks += 1
close(tasks)

<-done
wg.Wait()

// serialize the hash

var keys []uint64
for k := range items {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

for _, k := range keys {
syncline := items[k]
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash)))
}

fmt.Printf("Created syncfile with %d blocks.\n", blocks)
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Sync(logger *log.Logger, file string, syncfile string) error {
start := time.Now()
total_remote_bytes := uint64(0)
Expand Down

0 comments on commit 2ed75c2

Please sign in to comment.