Skip to content

Commit

Permalink
Add updatedAtEventNum to the channel metadata (#70)
Browse files Browse the repository at this point in the history
I created a cache key in the client that depended on the updatedAtHash
in the channel metadata, but that has was updating every time we had a
snapshot, causing a flicker in the ui when we refreshed all channels.
This will allow us to have proper cache invalidation.

I’ve split the protobuf into two new messages, one for the event and one
for the snapshot. They share the same base properties and property
numbers so i’m expecting this to smothly go in without any conflicts.
  • Loading branch information
texuf authored May 31, 2024
1 parent 2e088a9 commit a7012c0
Show file tree
Hide file tree
Showing 19 changed files with 1,254 additions and 1,061 deletions.
18 changes: 9 additions & 9 deletions core/node/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ func Make_ChannelPayload_Inception(
ChannelPayload: &ChannelPayload{
Content: &ChannelPayload_Inception_{
Inception: &ChannelPayload_Inception{
StreamId: streamId[:],
SpaceId: spaceId[:],
Settings: settings,
StreamId: streamId[:],
SpaceId: spaceId[:],
Settings: settings,
},
},
},
Expand Down Expand Up @@ -280,18 +280,18 @@ func Make_SpacePayload_Membership(op MembershipOp, userId string, initiatorId st
return Make_MemberPayload_Membership(op, userAddress, initiatorAddress, nil)
}

func Make_SpacePayload_Channel(
func Make_SpacePayload_ChannelUpdate(
op ChannelOp,
channelId StreamId,
originEvent *EventRef,
) *StreamEvent_SpacePayload {
return &StreamEvent_SpacePayload{
SpacePayload: &SpacePayload{
Content: &SpacePayload_Channel_{
Channel: &SpacePayload_Channel{
Op: op,
ChannelId: channelId[:],
OriginEvent: originEvent,
Content: &SpacePayload_Channel{
Channel: &SpacePayload_ChannelUpdate{
Op: op,
ChannelId: channelId[:],
OriginEvent: originEvent,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/migrations/snapshot_migration_0001.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func snapshot_migration_0001(iSnapshot *Snapshot) *Snapshot {
if snapshot.SpaceContent != nil {
snapshot.SpaceContent.Channels = slices.CompactFunc(
snapshot.SpaceContent.Channels,
func(i, j *SpacePayload_Channel) bool {
func(i, j *SpacePayload_ChannelMetadata) bool {
return bytes.Equal(i.ChannelId, j.ChannelId)
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSnapshotMigration0001(t *testing.T) {
badSpaceChannel := &Snapshot{
Content: &Snapshot_SpaceContent{
SpaceContent: &SpacePayload_Snapshot{
Channels: []*SpacePayload_Channel{
Channels: []*SpacePayload_ChannelMetadata{
{
ChannelId: channelId[:],
},
Expand Down
26 changes: 15 additions & 11 deletions core/node/events/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ func Update_Snapshot(iSnapshot *Snapshot, event *ParsedEvent, miniblockNum int64
iSnapshot = migrations.MigrateSnapshot(iSnapshot)
switch payload := event.Event.Payload.(type) {
case *StreamEvent_SpacePayload:
return update_Snapshot_Space(iSnapshot, payload.SpacePayload, eventNum, event.Hash.Bytes())
return update_Snapshot_Space(iSnapshot, payload.SpacePayload, eventNum)
case *StreamEvent_ChannelPayload:
return update_Snapshot_Channel(iSnapshot, payload.ChannelPayload)
case *StreamEvent_DmChannelPayload:
return update_Snapshot_DmChannel(iSnapshot, payload.DmChannelPayload, miniblockNum, event.Hash.Bytes())
return update_Snapshot_DmChannel(iSnapshot, payload.DmChannelPayload)
case *StreamEvent_GdmChannelPayload:
return update_Snapshot_GdmChannel(iSnapshot, payload.GdmChannelPayload, miniblockNum, event.Hash.Bytes())
case *StreamEvent_UserPayload:
Expand All @@ -181,16 +181,22 @@ func Update_Snapshot(iSnapshot *Snapshot, event *ParsedEvent, miniblockNum int64
}
}

func update_Snapshot_Space(iSnapshot *Snapshot, spacePayload *SpacePayload, eventNum int64, eventHash []byte) error {
func update_Snapshot_Space(iSnapshot *Snapshot, spacePayload *SpacePayload, eventNum int64) error {
snapshot := iSnapshot.Content.(*Snapshot_SpaceContent)
if snapshot == nil {
return RiverError(Err_INVALID_ARGUMENT, "blockheader snapshot is not a space snapshot")
}
switch content := spacePayload.Content.(type) {
case *SpacePayload_Inception_:
return RiverError(Err_INVALID_ARGUMENT, "cannot update blockheader with inception event")
case *SpacePayload_Channel_:
snapshot.SpaceContent.Channels = insertChannel(snapshot.SpaceContent.Channels, content.Channel)
case *SpacePayload_Channel:
channel := &SpacePayload_ChannelMetadata{
ChannelId: content.Channel.ChannelId,
Op: content.Channel.Op,
OriginEvent: content.Channel.OriginEvent,
UpdatedAtEventNum: eventNum,
}
snapshot.SpaceContent.Channels = insertChannel(snapshot.SpaceContent.Channels, channel)
return nil
default:
return RiverError(Err_INVALID_ARGUMENT, "unknown space payload type %T", spacePayload.Content)
Expand All @@ -216,8 +222,6 @@ func update_Snapshot_Channel(iSnapshot *Snapshot, channelPayload *ChannelPayload
func update_Snapshot_DmChannel(
iSnapshot *Snapshot,
dmChannelPayload *DmChannelPayload,
eventNum int64,
eventHash []byte,
) error {
snapshot := iSnapshot.Content.(*Snapshot_DmChannelContent)
if snapshot == nil {
Expand Down Expand Up @@ -533,24 +537,24 @@ func removeSorted[T any, K any](elements []*T, key K, cmp func(K, K) int, keyFn
return elements
}

func findChannel(channels []*SpacePayload_Channel, channelId []byte) (*SpacePayload_Channel, error) {
func findChannel(channels []*SpacePayload_ChannelMetadata, channelId []byte) (*SpacePayload_ChannelMetadata, error) {
return findSorted(
channels,
channelId,
bytes.Compare,
func(channel *SpacePayload_Channel) []byte {
func(channel *SpacePayload_ChannelMetadata) []byte {
return channel.ChannelId
},
)
}

func insertChannel(channels []*SpacePayload_Channel, newChannels ...*SpacePayload_Channel) []*SpacePayload_Channel {
func insertChannel(channels []*SpacePayload_ChannelMetadata, newChannels ...*SpacePayload_ChannelMetadata) []*SpacePayload_ChannelMetadata {
for _, channel := range newChannels {
channels = insertSorted(
channels,
channel,
bytes.Compare,
func(channel *SpacePayload_Channel) []byte {
func(channel *SpacePayload_ChannelMetadata) []byte {
return channel.ChannelId
},
)
Expand Down
13 changes: 9 additions & 4 deletions core/node/events/stream_viewstate_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type SpaceStreamView interface {
JoinableStreamView
GetSpaceInception() (*SpacePayload_Inception, error)
GetSpaceSnapshotContent() (*SpacePayload_Snapshot, error)
GetChannelInfo(channelId shared.StreamId) (*SpacePayload_Channel, error)
GetChannelInfo(channelId shared.StreamId) (*SpacePayload_ChannelMetadata, error)
}

var _ SpaceStreamView = (*streamViewImpl)(nil)
Expand All @@ -35,7 +35,7 @@ func (r *streamViewImpl) GetSpaceSnapshotContent() (*SpacePayload_Snapshot, erro
}
}

func (r *streamViewImpl) GetChannelInfo(channelId shared.StreamId) (*SpacePayload_Channel, error) {
func (r *streamViewImpl) GetChannelInfo(channelId shared.StreamId) (*SpacePayload_ChannelMetadata, error) {
snap, err := r.GetSpaceSnapshotContent()
if err != nil {
return nil, err
Expand All @@ -46,9 +46,14 @@ func (r *streamViewImpl) GetChannelInfo(channelId shared.StreamId) (*SpacePayloa
switch payload := e.Event.Payload.(type) {
case *StreamEvent_SpacePayload:
switch spacePayload := payload.SpacePayload.Content.(type) {
case *SpacePayload_Channel_:
case *SpacePayload_Channel:
if channelId.EqualsBytes(spacePayload.Channel.ChannelId) {
channel = spacePayload.Channel
channel = &SpacePayload_ChannelMetadata{
ChannelId: spacePayload.Channel.ChannelId,
Op: spacePayload.Channel.Op,
OriginEvent: spacePayload.Channel.OriginEvent,
UpdatedAtEventNum: eventNum,
}
}
default:
break
Expand Down
Loading

0 comments on commit a7012c0

Please sign in to comment.