Skip to content

Commit

Permalink
Fix data race in snapshots caught in stress tests (#85)
Browse files Browse the repository at this point in the history
can_add_event and snapshotting both look at key fulfillments

We have a race because we were using the slice from the original
solicitation without making a copy, then mutating it when fulfillments
come in.
The fix is to clone the solicitation before saving it to the snapshot,
so that all future mutations happen on a copy.

We had another bug - sort.SortedSlice is just a constructor for a type
and does nothing. whoops. Move the logic to the rules, use slice.sort,
and force sorting on the client. The solicitations were sorted, but not
the fulfillments looks like, so just fix that and move on.

```
WARNING: DATA RACE
Read at 0x00c005898880 by goroutine 87477:
  github.com/river-build/river/core/node/events.removeCommon()
      /Users/austinellis/hnt/river/core/node/events/snapshot.go:481 +0x1e0
  github.com/river-build/river/core/node/events.applyKeyFulfillment()
      /Users/austinellis/hnt/river/core/node/events/snapshot.go:712 +0x108
  github.com/river-build/river/core/node/events.(*streamViewImpl).GetKeySolicitations.func1()
      /Users/austinellis/hnt/river/core/node/events/stream_viewstate_joinable.go:124 +0x4fc
  github.com/river-build/river/core/node/events.(*minipoolInstance).forEachEvent()
      /Users/austinellis/hnt/river/core/node/events/minipool.go:33 +0xa4
  github.com/river-build/river/core/node/events.(*streamViewImpl).forEachEvent()
      /Users/austinellis/hnt/river/core/node/events/stream_view.go:432 +0x1b0
  github.com/river-build/river/core/node/events.(*streamViewImpl).GetKeySolicitations()
      /Users/austinellis/hnt/river/core/node/events/stream_viewstate_joinable.go:133 +0x138
  github.com/river-build/river/core/node/rules.(*aeKeyFulfillmentRules).validKeyFulfillment()
      /Users/austinellis/hnt/river/core/node/rules/can_add_event.go:1107 +0x16c
  github.com/river-build/river/core/node/rules.(*aeKeyFulfillmentRules).validKeyFulfillment-fm()
      <autogenerated>:1 +0x34
  github.com/river-build/river/core/node/rules.runChecksAE()
      /Users/austinellis/hnt/river/core/node/rules/rule_builder_ae.go:75 +0x118
  github.com/river-build/river/core/node/rules.(*ruleBuilderAEImpl).run()
      /Users/austinellis/hnt/river/core/node/rules/rule_builder_ae.go:101 +0x64
  github.com/river-build/river/core/node/rules.CanAddEvent()
      /Users/austinellis/hnt/river/core/node/rules/can_add_event.go:145 +0x3fc
  github.com/river-build/river/core/node/rpc.(*Service).addParsedEvent()
      /Users/austinellis/hnt/river/core/node/rpc/add_event.go:75 +0x13c
  github.com/river-build/river/core/node/rpc.(*Service).localAddEvent()
      /Users/austinellis/hnt/river/core/node/rpc/add_event.go:42 +0x468
  github.com/river-build/river/core/node/rpc.(*Service).addEventImpl()
      /Users/austinellis/hnt/river/core/node/rpc/forwarder.go:452 +0x38c
  github.com/river-build/river/core/node/rpc.(*Service).AddEvent()
      /Users/austinellis/hnt/river/core/node/rpc/forwarder.go:424 +0xfc
  github.com/river-build/river/core/node/protocol/protocolconnect.StreamServiceHandler.AddEvent-fm()
      <autogenerated>:1 +0x60
  connectrpc.com/connect.NewUnaryHandler[go.shape.40e197435c9ac50507e53ee22aa4c74a454c17ed4e26d5c0330b9b0f17660b02,go.shape.struct { github.com/river-build/river/core/node/protocol.state google.golang.org/protobuf/internal/impl.MessageState; github.com/river-build/river/core/node/protocol.sizeCache int32; github.com/river-build/river/core/node/protocol.unknownFields []uint8; Error *github.com/river-build/river/core/node/protocol.AddEventResponse_Error "protobuf:\"bytes,1,opt,name=error,proto3\" json:\"error,omitempty\"" }].func1()
      /Users/austinellis/go/pkg/mod/connectrpc.com/[email protected]/handler.go:52 +0x1d8
  github.com/river-build/river/core/node/rpc.(*Service).initHandlers.NewTimeoutInterceptor.func2.1()
      /Users/austinellis/hnt/river/core/node/rpc/timeout_interceptor.go:24 +0xc4
  github.com/river-build/river/core/node/rpc.(*Service).initHandlers.NewMetricsInterceptor.func1.1()
      /Users/austinellis/hnt/river/core/node/rpc/metrics_interceptor.go:32 +0x210
  connectrpc.com/connect.NewUnaryHandler[go.shape.40e197435c9ac50507e53ee22aa4c74a454c17ed4e26d5c0330b9b0f17660b02,go.shape.struct { github.com/river-build/river/core/node/protocol.state google.golang.org/protobuf/internal/impl.MessageState; github.com/river-build/river/core/node/protocol.sizeCache int32; github.com/river-build/river/core/node/protocol.unknownFields []uint8; Error *github.com/river-build/river/core/node/protocol.AddEventResponse_Error "protobuf:\"bytes,1,opt,name=error,proto3\" json:\"error,omitempty\"" }].func2()
      /Users/austinellis/go/pkg/mod/connectrpc.com/[email protected]/handler.go:84 +0x4e8
  connectrpc.com/connect.(*Handler).ServeHTTP()
      /Users/austinellis/go/pkg/mod/connectrpc.com/[email protected]/handler.go:265 +0xaa0
  github.com/river-build/river/core/node/protocol/protocolconnect.NewStreamServiceHandler.func1()
      /Users/austinellis/hnt/river/core/node/protocol/protocolconnect/protocol.connect.go:371 +0x134
  net/http.HandlerFunc.ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2166 +0x48
  github.com/river-build/river/core/node/rpc.(*httpHandler).ServeHTTP()
      /Users/austinellis/hnt/river/core/node/rpc/http_handler.go:61 +0x8d0
  net/http.(*ServeMux).ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2683 +0x17c
  gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http.TraceAndServe()
      /Users/austinellis/go/pkg/mod/gopkg.in/!data!dog/[email protected]/contrib/net/http/trace.go:75 +0x6bc
  gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http.(*ServeMux).ServeHTTP()
      /Users/austinellis/go/pkg/mod/gopkg.in/!data!dog/[email protected]/contrib/net/http/http.go:60 +0x388
  github.com/river-build/river/core/node/rpc.(*Service).runHttpServer.(*Cors).Handler.func3()
      /Users/austinellis/go/pkg/mod/github.com/rs/[email protected]/cors.go:236 +0x2d8
  net/http.HandlerFunc.ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2166 +0x48
  net/http.serverHandler.ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3137 +0x2a8
  net/http.(*conn).serve()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2039 +0xf28
  net/http.(*Server).Serve.gowrap3()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3285 +0x4c

Previous write at 0x00c005898880 by goroutine 87224:
  runtime.slicecopy()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/runtime/slice.go:325 +0x0
  github.com/river-build/river/core/node/events.removeCommon()
      /Users/austinellis/hnt/river/core/node/events/snapshot.go:494 +0x46c
  github.com/river-build/river/core/node/events.applyKeyFulfillment()
      /Users/austinellis/hnt/river/core/node/events/snapshot.go:712 +0x108
  github.com/river-build/river/core/node/events.(*streamViewImpl).GetKeySolicitations.func1()
      /Users/austinellis/hnt/river/core/node/events/stream_viewstate_joinable.go:124 +0x4fc
  github.com/river-build/river/core/node/events.(*minipoolInstance).forEachEvent()
      /Users/austinellis/hnt/river/core/node/events/minipool.go:33 +0xa4
  github.com/river-build/river/core/node/events.(*streamViewImpl).forEachEvent()
      /Users/austinellis/hnt/river/core/node/events/stream_view.go:432 +0x1b0
  github.com/river-build/river/core/node/events.(*streamViewImpl).GetKeySolicitations()
      /Users/austinellis/hnt/river/core/node/events/stream_viewstate_joinable.go:133 +0x138
  github.com/river-build/river/core/node/rules.(*aeKeyFulfillmentRules).validKeyFulfillment()
      /Users/austinellis/hnt/river/core/node/rules/can_add_event.go:1107 +0x16c
  github.com/river-build/river/core/node/rules.(*aeKeyFulfillmentRules).validKeyFulfillment-fm()
      <autogenerated>:1 +0x34
  github.com/river-build/river/core/node/rules.runChecksAE()
      /Users/austinellis/hnt/river/core/node/rules/rule_builder_ae.go:75 +0x118
  github.com/river-build/river/core/node/rules.(*ruleBuilderAEImpl).run()
      /Users/austinellis/hnt/river/core/node/rules/rule_builder_ae.go:101 +0x64
  github.com/river-build/river/core/node/rules.CanAddEvent()
      /Users/austinellis/hnt/river/core/node/rules/can_add_event.go:145 +0x3fc
  github.com/river-build/river/core/node/rpc.(*Service).addParsedEvent()
      /Users/austinellis/hnt/river/core/node/rpc/add_event.go:75 +0x13c
  github.com/river-build/river/core/node/rpc.(*Service).localAddEvent()
      /Users/austinellis/hnt/river/core/node/rpc/add_event.go:42 +0x468
  github.com/river-build/river/core/node/rpc.(*Service).addEventImpl()
      /Users/austinellis/hnt/river/core/node/rpc/forwarder.go:452 +0x38c
  github.com/river-build/river/core/node/rpc.(*Service).AddEvent()
      /Users/austinellis/hnt/river/core/node/rpc/forwarder.go:424 +0xfc
  github.com/river-build/river/core/node/protocol/protocolconnect.StreamServiceHandler.AddEvent-fm()
      <autogenerated>:1 +0x60
  connectrpc.com/connect.NewUnaryHandler[go.shape.40e197435c9ac50507e53ee22aa4c74a454c17ed4e26d5c0330b9b0f17660b02,go.shape.struct { github.com/river-build/river/core/node/protocol.state google.golang.org/protobuf/internal/impl.MessageState; github.com/river-build/river/core/node/protocol.sizeCache int32; github.com/river-build/river/core/node/protocol.unknownFields []uint8; Error *github.com/river-build/river/core/node/protocol.AddEventResponse_Error "protobuf:\"bytes,1,opt,name=error,proto3\" json:\"error,omitempty\"" }].func1()
      /Users/austinellis/go/pkg/mod/connectrpc.com/[email protected]/handler.go:52 +0x1d8
  github.com/river-build/river/core/node/rpc.(*Service).initHandlers.NewTimeoutInterceptor.func2.1()
      /Users/austinellis/hnt/river/core/node/rpc/timeout_interceptor.go:24 +0xc4
  github.com/river-build/river/core/node/rpc.(*Service).initHandlers.NewMetricsInterceptor.func1.1()
      /Users/austinellis/hnt/river/core/node/rpc/metrics_interceptor.go:32 +0x210
  connectrpc.com/connect.NewUnaryHandler[go.shape.40e197435c9ac50507e53ee22aa4c74a454c17ed4e26d5c0330b9b0f17660b02,go.shape.struct { github.com/river-build/river/core/node/protocol.state google.golang.org/protobuf/internal/impl.MessageState; github.com/river-build/river/core/node/protocol.sizeCache int32; github.com/river-build/river/core/node/protocol.unknownFields []uint8; Error *github.com/river-build/river/core/node/protocol.AddEventResponse_Error "protobuf:\"bytes,1,opt,name=error,proto3\" json:\"error,omitempty\"" }].func2()
      /Users/austinellis/go/pkg/mod/connectrpc.com/[email protected]/handler.go:84 +0x4e8
  connectrpc.com/connect.(*Handler).ServeHTTP()
      /Users/austinellis/go/pkg/mod/connectrpc.com/[email protected]/handler.go:265 +0xaa0
  github.com/river-build/river/core/node/protocol/protocolconnect.NewStreamServiceHandler.func1()
      /Users/austinellis/hnt/river/core/node/protocol/protocolconnect/protocol.connect.go:371 +0x134
  net/http.HandlerFunc.ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2166 +0x48
  github.com/river-build/river/core/node/rpc.(*httpHandler).ServeHTTP()
      /Users/austinellis/hnt/river/core/node/rpc/http_handler.go:61 +0x8d0
  net/http.(*ServeMux).ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2683 +0x17c
  gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http.TraceAndServe()
      /Users/austinellis/go/pkg/mod/gopkg.in/!data!dog/[email protected]/contrib/net/http/trace.go:75 +0x6bc
  gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http.(*ServeMux).ServeHTTP()
      /Users/austinellis/go/pkg/mod/gopkg.in/!data!dog/[email protected]/contrib/net/http/http.go:60 +0x388
  github.com/river-build/river/core/node/rpc.(*Service).runHttpServer.(*Cors).Handler.func3()
      /Users/austinellis/go/pkg/mod/github.com/rs/[email protected]/cors.go:236 +0x2d8
  net/http.HandlerFunc.ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2166 +0x48
  net/http.serverHandler.ServeHTTP()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3137 +0x2a8
  net/http.(*conn).serve()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:2039 +0xf28
  net/http.(*Server).Serve.gowrap3()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3285 +0x4c

Goroutine 87477 (running) created at:
  net/http.(*Server).Serve()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3285 +0x674
  net/http.(*Server).ServeTLS()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3325 +0x4fc
  github.com/river-build/river/core/node/rpc.(*Service).serveTLS()
      /Users/austinellis/hnt/river/core/node/rpc/server.go:410 +0x74
  github.com/river-build/river/core/node/rpc.(*Service).runHttpServer.gowrap1()
      /Users/austinellis/hnt/river/core/node/rpc/server.go:392 +0x34

Goroutine 87224 (running) created at:
  net/http.(*Server).Serve()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3285 +0x674
  net/http.(*Server).ServeTLS()
      /opt/homebrew/Cellar/go/1.22.3/libexec/src/net/http/server.go:3325 +0x4fc
  github.com/river-build/river/core/node/rpc.(*Service).serveTLS()
      /Users/austinellis/hnt/river/core/node/rpc/server.go:410 +0x74
  github.com/river-build/river/core/node/rpc.(*Service).runHttpServer.gowrap1()
      /Users/austinellis/hnt/river/core/node/rpc/server.go:392 +0x34
```

fix
  • Loading branch information
texuf authored May 29, 2024
1 parent f10e0ef commit e161dac
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 11 deletions.
7 changes: 6 additions & 1 deletion core/encryption/src/decryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,9 @@ export abstract class BaseDecryptionExtensions {
streamId,
userAddress: item.fromUserAddress,
deviceKey: item.solicitation.deviceKey,
sessionIds: item.solicitation.isNewDevice ? [] : sessions.map((x) => x.sessionId),
sessionIds: item.solicitation.isNewDevice
? []
: sessions.map((x) => x.sessionId).sort(),
})

if (!error) {
Expand All @@ -726,6 +728,9 @@ export abstract class BaseDecryptionExtensions {
item,
sessions,
})
} else if (!error.msg.includes('DUPLICATE_EVENT')) {
// duplicate events are expected, we can ignore them, others are not
this.log.error('failed to send key fulfillment', error)
}
}

Expand Down
10 changes: 5 additions & 5 deletions core/node/events/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package events
import (
"bytes"
"slices"
"sort"

"google.golang.org/protobuf/proto"
. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/events/migrations"
. "github.com/river-build/river/core/node/protocol"
Expand Down Expand Up @@ -694,9 +694,9 @@ func applyKeySolicitation(member *MemberPayload_Snapshot_Member, keySolicitation
i++
}
}
// sort the event keys in the new event
event := keySolicitation
event.SessionIds = sort.StringSlice(event.SessionIds)
// clone to avoid data race
event := proto.Clone(keySolicitation).(*MemberPayload_KeySolicitation)

// append it
MAX_DEVICES := 10
startIndex := max(0, i-MAX_DEVICES)
Expand All @@ -709,7 +709,7 @@ func applyKeyFulfillment(member *MemberPayload_Snapshot_Member, keyFulfillment *
// clear out any fulfilled session ids for the device key
for _, event := range member.Solicitations {
if event.DeviceKey == keyFulfillment.DeviceKey {
event.SessionIds = removeCommon(event.SessionIds, sort.StringSlice(keyFulfillment.SessionIds))
event.SessionIds = removeCommon(event.SessionIds, keyFulfillment.SessionIds)
event.IsNewDevice = false
break
}
Expand Down
2 changes: 1 addition & 1 deletion core/node/rpc/add_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *Service) localAddEvent(
return connect.NewResponse(&AddEventResponse{
Error: &AddEventResponse_Error{
Code: riverError.Code,
Msg: riverError.Msg,
Msg: riverError.Error(),
Funcs: riverError.Funcs,
},
}), nil
Expand Down
14 changes: 11 additions & 3 deletions core/node/rules/can_add_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"context"
"log/slog"
"sort"
"slices"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -1096,6 +1096,10 @@ func (ru *aeKeySolicitationRules) validKeySolicitation() (bool, error) {
return false, RiverError(Err_INVALID_ARGUMENT, "session ids are required for existing devices")
}

if !slices.IsSorted(ru.solicitation.SessionIds) {
return false, RiverError(Err_INVALID_ARGUMENT, "session ids must be sorted")
}

return true, nil
}

Expand All @@ -1109,16 +1113,20 @@ func (ru *aeKeyFulfillmentRules) validKeyFulfillment() (bool, error) {
return false, err
}

if len(ru.fulfillment.SessionIds) > 0 && !slices.IsSorted(ru.fulfillment.SessionIds) {
return false, RiverError(Err_INVALID_ARGUMENT, "session ids are required")
}

// loop over solicitations, see if the device key exists
for _, solicitation := range solicitations {
if solicitation.DeviceKey == ru.fulfillment.DeviceKey {
if solicitation.IsNewDevice {
return true, nil
}
if hasCommon(solicitation.SessionIds, sort.StringSlice(ru.fulfillment.SessionIds)) {
if hasCommon(solicitation.SessionIds, ru.fulfillment.SessionIds) {
return true, nil
}
return false, RiverError(Err_INVALID_ARGUMENT, "solicitation with common session ids not found")
return false, RiverError(Err_DUPLICATE_EVENT, "solicitation with common session ids not found")
}
}
return false, RiverError(Err_INVALID_ARGUMENT, "solicitation with matching device key not found")
Expand Down
2 changes: 1 addition & 1 deletion core/sdk/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ describe('clientTest', () => {
})
await expect(
bobsClient.makeEventAndAddToStream(bobsClient.userSettingsStreamId!, payload),
).rejects.toThrow('INVALID_ARGUMENT')
).rejects.toThrow('DUPLICATE_EVENT')
})

test('bobCreatesUnamedSpaceAndStream', async () => {
Expand Down

0 comments on commit e161dac

Please sign in to comment.