-
Notifications
You must be signed in to change notification settings - Fork 122
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Sreekanth <[email protected]>
- Loading branch information
Showing
24 changed files
with
751 additions
and
814 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package tracker | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/numaproj/numaflow/pkg/isb" | ||
) | ||
|
||
// MessageTracker is used to store a key value pair for string and *ReadMessage | ||
// as it can be accessed by concurrent goroutines, we keep all operations | ||
// under a mutex | ||
type MessageTracker struct { | ||
lock sync.RWMutex | ||
m map[string]*isb.ReadMessage | ||
} | ||
|
||
// NewMessageTracker initializes a new instance of a Tracker | ||
func NewMessageTracker(messages []*isb.ReadMessage) *MessageTracker { | ||
m := make(map[string]*isb.ReadMessage, len(messages)) | ||
for _, msg := range messages { | ||
id := msg.ReadOffset.String() | ||
m[id] = msg | ||
} | ||
return &MessageTracker{ | ||
m: m, | ||
lock: sync.RWMutex{}, | ||
} | ||
} | ||
|
||
// Remove will remove the entry for a given id and return the stored value corresponding to this id. | ||
// A `nil` return value indicates that the id doesn't exist in the tracker. | ||
func (t *MessageTracker) Remove(id string) *isb.ReadMessage { | ||
t.lock.Lock() | ||
defer t.lock.Unlock() | ||
item, ok := t.m[id] | ||
if !ok { | ||
return nil | ||
} | ||
delete(t.m, id) | ||
return item | ||
} | ||
|
||
// IsEmpty is a helper function which checks if the Tracker map is empty | ||
// return true if empty | ||
func (t *MessageTracker) IsEmpty() bool { | ||
t.lock.RLock() | ||
defer t.lock.RUnlock() | ||
return len(t.m) == 0 | ||
} | ||
|
||
// Len returns the number of messages currently stored in the tracker | ||
func (t *MessageTracker) Len() int { | ||
t.lock.RLock() | ||
defer t.lock.RUnlock() | ||
return len(t.m) | ||
} |
30 changes: 16 additions & 14 deletions
30
pkg/udf/rpc/tracker_test.go → pkg/isb/tracker/message_tracker_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,39 @@ | ||
package rpc | ||
package tracker | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/numaproj/numaflow/pkg/isb" | ||
"github.com/numaproj/numaflow/pkg/isb/testutils" | ||
) | ||
|
||
func TestTracker_AddRequest(t *testing.T) { | ||
tr := NewTracker() | ||
readMessages := testutils.BuildTestReadMessages(3, time.Unix(1661169600, 0), nil) | ||
for _, msg := range readMessages { | ||
tr.addRequest(&msg) | ||
messages := make([]*isb.ReadMessage, len(readMessages)) | ||
for i, msg := range readMessages { | ||
messages[i] = &msg | ||
} | ||
tr := NewMessageTracker(messages) | ||
id := readMessages[0].ReadOffset.String() | ||
m, ok := tr.getRequest(id) | ||
assert.True(t, ok) | ||
m := tr.Remove(id) | ||
assert.NotNil(t, m) | ||
assert.Equal(t, readMessages[0], *m) | ||
} | ||
|
||
func TestTracker_RemoveRequest(t *testing.T) { | ||
tr := NewTracker() | ||
readMessages := testutils.BuildTestReadMessages(3, time.Unix(1661169600, 0), nil) | ||
for _, msg := range readMessages { | ||
tr.addRequest(&msg) | ||
messages := make([]*isb.ReadMessage, len(readMessages)) | ||
for i, msg := range readMessages { | ||
messages[i] = &msg | ||
} | ||
tr := NewMessageTracker(messages) | ||
id := readMessages[0].ReadOffset.String() | ||
m, ok := tr.getRequest(id) | ||
assert.True(t, ok) | ||
m := tr.Remove(id) | ||
assert.NotNil(t, m) | ||
assert.Equal(t, readMessages[0], *m) | ||
tr.removeRequest(id) | ||
_, ok = tr.getRequest(id) | ||
assert.False(t, ok) | ||
m = tr.Remove(id) | ||
assert.Nil(t, m) | ||
} |
Oops, something went wrong.