From 447033be6ff3721a58c36572b8bfff408e2148a1 Mon Sep 17 00:00:00 2001 From: Scott Cotton Date: Thu, 16 Sep 2021 16:56:28 +0200 Subject: [PATCH] fix monotonicity of doc ids the concurrent shatter had left open the possibility of having non-monotonically increasing document ids. Fix with a sync.Cond --- cmd/dupi/extract.go | 10 ---------- dmd/adder.go | 1 + post/t.go | 6 ++++++ shatter.go | 38 ++++++++++++++++++++++++++++---------- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/cmd/dupi/extract.go b/cmd/dupi/extract.go index 07379bf..a1e6cd8 100644 --- a/cmd/dupi/extract.go +++ b/cmd/dupi/extract.go @@ -53,16 +53,6 @@ func (x *extractCmd) Run(args []string) error { defer x.index.Close() query := x.index.StartQuery(dupi.QueryMaxBlot) shape := []dupi.Blot{ - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, - {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}, {Blot: 0, Docs: make([]dupi.Doc, 0, 32)}} for { diff --git a/dmd/adder.go b/dmd/adder.go index 48be32a..96b9edc 100644 --- a/dmd/adder.go +++ b/dmd/adder.go @@ -42,6 +42,7 @@ func (t *Adder) Add(fid, start, end uint32) (uint32, error) { return 0, err } } + n = uint32(len(t.buf)) t.buf = append(t.buf, fields{fid, start, end}) return n + t.flushed, nil } diff --git a/post/t.go b/post/t.go index bf0de69..a20a0e5 100644 --- a/post/t.go +++ b/post/t.go @@ -16,6 +16,8 @@ // dupi blots with dupi internal document ids. package post +import "fmt" + // a post is a tuple of document id, blot type T uint64 @@ -23,6 +25,10 @@ func (p T) Docid() uint32 { return uint32(p >> 32) } +func (p T) String() string { + return fmt.Sprintf("<%d,%x>", p.Docid(), p.Blot()&0xffff) +} + func (p T) Blot() uint32 { return uint32(p) } diff --git a/shatter.go b/shatter.go index d1f2293..ab10cee 100644 --- a/shatter.go +++ b/shatter.go @@ -22,10 +22,6 @@ import ( "github.com/go-air/dupi/token" ) -type shardMsg struct { - posts []post.T -} - type shatterReq struct { docid uint32 offset uint32 @@ -37,12 +33,13 @@ func startShatter(ns, n, s int, tf token.TokenizerFunc, blotcfg *blotter.Config, chns []chan []post.T) (chan *shatterReq, error) { rch := make(chan *shatterReq) + mono := newMono() for i := 0; i < ns; i++ { bler, err := blotter.FromConfig(blotcfg) if err != nil { return nil, err } - sh := newShatter(n, s, tf, bler) + sh := newShatter(n, s, tf, bler, mono) copy(sh.shardChns, chns) go func(sh *shatter) { for { @@ -60,6 +57,16 @@ func startShatter(ns, n, s int, return rch, nil } +type mono struct { + docid uint32 + cond *sync.Cond +} + +func newMono() *mono { + var mu sync.Mutex + return &mono{cond: sync.NewCond(&mu)} +} + type shatter struct { tokfn token.TokenizerFunc tokb []token.T @@ -67,15 +74,17 @@ type shatter struct { seqlen int d [][]post.T shardChns []chan []post.T + mono *mono } -func newShatter(n, s int, tf token.TokenizerFunc, bler blotter.T) *shatter { +func newShatter(n, s int, tf token.TokenizerFunc, bler blotter.T, mono *mono) *shatter { res := &shatter{ tokfn: tf, bler: bler, seqlen: s, shardChns: make([]chan []post.T, n), - d: make([][]post.T, n)} + d: make([][]post.T, n), + mono: mono} for i := range res.shardChns { res.shardChns[i] = make(chan []post.T) } @@ -100,10 +109,15 @@ func (s *shatter) do(did, offset uint32, msg []byte) { default: } } - s.send() + s.send(did) } -func (s *shatter) send() { +func (s *shatter) send(did uint32) { + s.mono.cond.L.Lock() + for s.mono.docid != did-1 { + s.mono.cond.Wait() + } + var wg sync.WaitGroup for i, ps := range s.d { wg.Add(1) @@ -111,15 +125,19 @@ func (s *shatter) send() { defer wg.Done() s.shardChns[i] <- ps <-s.shardChns[i] - s.d[i] = nil //ps[:0] + s.d[i] = nil //ps[:0] (was racy) }(i, ps) } wg.Wait() + s.mono.docid = did + s.mono.cond.Broadcast() + s.mono.cond.L.Unlock() } func (s *shatter) blot(docid, b uint32) { n := uint32(len(s.d)) + i := b % n s.d[i] = append(s.d[i], post.Make(docid, b/n)) }