Skip to content

Commit

Permalink
use mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Jan 6, 2024
1 parent fdab3ef commit 1e2f3bc
Showing 1 changed file with 23 additions and 39 deletions.
62 changes: 23 additions & 39 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"log"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -129,7 +131,11 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
var current Block

tasks := make(chan Block, 10000)
intermediate := make(chan Result, 10000)

var wg sync.WaitGroup
var mu sync.Mutex

synclines := make(map[uint64]Syncline)

errs, _ := errgroup.WithContext(ctx)
// workers
Expand All @@ -147,47 +153,18 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
}
r.Close()

intermediate <- Result{block, hasher.Sum64()}
sum64 := hasher.Sum64()
mu.Lock()
synclines[block.Start] = Syncline{block.Offset, block.Length, sum64}
mu.Unlock()

hasher.Reset()
}
wg.Done()
return nil
})
}

done := make(chan struct{})

synclines := make([][]uint64, 0)

go func() {
buffer := make(map[uint64]Result)
nextIndex := uint64(0)

for i := range intermediate {
buffer[i.Block.Index] = i

for {
if next, ok := buffer[nextIndex]; ok {

// output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))

synclines = append(synclines, []uint64{next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash})

delete(buffer, nextIndex)
nextIndex++

if next.Block.Offset+next.Block.Length == header.TileDataLength {
close(intermediate)
}

} else {
break
}
}
}

done <- struct{}{}
}()

current_index := uint64(0)

blocks := 0
Expand Down Expand Up @@ -222,10 +199,17 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
blocks += 1
close(tasks)

<-done
wg.Wait()

var keys []uint64
for k := range synclines {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

for _, s := range synclines {
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", s[0], s[1], s[2], s[3])))
for _, k := range keys {
syncline := synclines[k]
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash)))
}

fmt.Printf("Created syncfile with %d blocks.\n", blocks)
Expand Down

0 comments on commit 1e2f3bc

Please sign in to comment.