diff --git a/proto/lavanet/lava/pairing/relay.proto b/proto/lavanet/lava/pairing/relay.proto index a74bd761ee..09b0f0a6d0 100644 --- a/proto/lavanet/lava/pairing/relay.proto +++ b/proto/lavanet/lava/pairing/relay.proto @@ -95,16 +95,23 @@ message RelayReply { } message QualityOfServiceReport{ + // Latency of provider answers in milliseconds, range 0-inf, lower is better string latency = 1 [ (gogoproto.moretags) = "yaml:\"Latency\"", (gogoproto.customtype) = "github.com/cosmos/cosmos-sdk/types.Dec", (gogoproto.nullable) = false ]; + + // Percentage of times the provider returned a non-error response, range 0-1, higher is better string availability = 2 [ (gogoproto.moretags) = "yaml:\"availability\"", (gogoproto.customtype) = "github.com/cosmos/cosmos-sdk/types.Dec", (gogoproto.nullable) = false ]; + + // Amount of time the provider is not synced (have the latest block) in milliseconds, range 0-inf, lower is better. + // Example: in ETH we have 15sec block time. So sync = 15000 means that the provider is one block + // behind the actual latest block. string sync = 3 [ (gogoproto.moretags) = "yaml:\"sync\"", (gogoproto.customtype) = "github.com/cosmos/cosmos-sdk/types.Dec", diff --git a/testutil/common/tester.go b/testutil/common/tester.go index 55bcf924ea..4e7bd2445e 100644 --- a/testutil/common/tester.go +++ b/testutil/common/tester.go @@ -1091,6 +1091,15 @@ func (ts *Tester) GetNextMonth(from time.Time) int64 { return utils.NextMonth(from).UTC().Unix() } +func (ts *Tester) BlockTimeDefault() time.Duration { + return ts.Keepers.Downtime.GetParams(ts.Ctx).DowntimeDuration +} + +func (ts *Tester) EpochTimeDefault() time.Duration { + epochBlocks := ts.Keepers.Epochstorage.GetParams(ts.Ctx).EpochBlocks + return ts.BlockTimeDefault() * time.Duration(epochBlocks) +} + func (ts *Tester) AdvanceToBlock(block uint64) { if block < ts.BlockHeight() { panic("AdvanceToBlock: block in the past: " + diff --git a/utils/convert.go b/utils/convert.go index 0a39cc7093..3a1808bda6 100644 --- a/utils/convert.go +++ b/utils/convert.go @@ -1,5 +1,13 @@ package utils +import "math" + +func SafeUint64ToInt64Convert(val uint64) int64 { + if val > math.MaxInt64 { + val = math.MaxInt64 + } + return int64(val) +} func Btof(b bool) float64 { if b { return 1 diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index a1ec9c0f3f..b06acfba22 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -47,6 +47,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen return nil, err } addressEpochBadgeMap := map[string]BadgeData{} + sessionRelaysAmount := map[uint64]int{} for _, relay := range msg.Relays { if relay.Badge != nil { mapKey := types.CreateAddressEpochBadgeMapKey(relay.Badge.Address, relay.Badge.Epoch) @@ -67,10 +68,15 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen addressEpochBadgeMap[mapKey] = badgeData } } + if _, ok := sessionRelaysAmount[relay.SessionId]; !ok { + sessionRelaysAmount[relay.SessionId] = 1 + } else { + sessionRelaysAmount[relay.SessionId]++ + } } var rejectedCu uint64 // aggregated rejected CU (due to badge CU overuse or provider double spending) - rejected_relays_num := len(msg.Relays) + rejectedRelaysNum := len(msg.Relays) for relayIdx, relay := range msg.Relays { rejectedCu += relay.CuSum providerAddr, err := sdk.AccAddressFromBech32(relay.Provider) @@ -170,6 +176,22 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen k.handleBadgeCu(ctx, badgeData, relay.Provider, relay.CuSum, newBadgeTimerExpiry) } + // update the reputation's epoch QoS score + // the excellece QoS report can be nil when the provider and consumer geolocations are not equal + if relay.QosExcellenceReport != nil { + err = k.aggregateReputationEpochQosScore(ctx, project.Subscription, relay, sessionRelaysAmount[relay.SessionId]) + if err != nil { + return nil, utils.LavaFormatWarning("RelayPayment: could not update reputation epoch QoS score", err, + utils.LogAttr("consumer", project.Subscription), + utils.LogAttr("project", project.Index), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + utils.LogAttr("qos_excellence_report", relay.QosExcellenceReport.String()), + utils.LogAttr("sync_factor", k.ReputationLatencyOverSyncFactor(ctx).String()), + ) + } + } + // TODO: add support for spec changes spec, found := k.specKeeper.GetSpec(ctx, relay.SpecId) if !found || !spec.Enabled { @@ -290,11 +312,11 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen ) } rejectedCu -= relay.CuSum - rejected_relays_num-- + rejectedRelaysNum-- } // if all relays failed, fail the TX - if rejected_relays_num != 0 { + if rejectedRelaysNum != 0 { return nil, utils.LavaFormatWarning("relay payment failed", fmt.Errorf("all relays rejected"), utils.Attribute{Key: "provider", Value: msg.Creator}, utils.Attribute{Key: "description", Value: msg.DescriptionString}, @@ -473,3 +495,40 @@ func (k Keeper) handleBadgeCu(ctx sdk.Context, badgeData BadgeData, provider str badgeUsedCuMapEntry.UsedCu += relayCuSum k.SetBadgeUsedCu(ctx, badgeUsedCuMapEntry) } + +func (k Keeper) aggregateReputationEpochQosScore(ctx sdk.Context, subscription string, relay *types.RelaySession, relaysAmount int) error { + sub, found := k.subscriptionKeeper.GetSubscription(ctx, subscription) + if !found { + return utils.LavaFormatError("RelayPayment: could not get cluster for reputation score update", fmt.Errorf("relay consumer's subscription not found"), + utils.LogAttr("subscription", subscription), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + ) + } + + syncFactor := k.ReputationLatencyOverSyncFactor(ctx) + score, err := relay.QosExcellenceReport.ComputeQosExcellenceForReputation(syncFactor) + if err != nil { + return utils.LavaFormatWarning("RelayPayment: could not compute qos excellence score", err, + utils.LogAttr("consumer", subscription), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + utils.LogAttr("qos_excellence_report", relay.QosExcellenceReport.String()), + utils.LogAttr("sync_factor", syncFactor.String()), + ) + } + + stakeEntry, found := k.epochStorageKeeper.GetStakeEntryCurrent(ctx, relay.SpecId, relay.Provider) + if !found { + return utils.LavaFormatWarning("RelayPayment: could not get stake entry for reputation", fmt.Errorf("stake entry not found"), + utils.LogAttr("consumer", subscription), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + ) + } + effectiveStake := sdk.NewCoin(stakeEntry.Stake.Denom, stakeEntry.TotalStake()) + + // note the current weight used is by relay num. In the future, it might change + k.UpdateReputationEpochQosScore(ctx, relay.SpecId, sub.Cluster, relay.Provider, score, utils.SafeUint64ToInt64Convert(uint64(relaysAmount)), effectiveStake) + return nil +} diff --git a/x/pairing/keeper/msg_server_relay_payment_test.go b/x/pairing/keeper/msg_server_relay_payment_test.go index 2103c22bcb..02b3ea75d4 100644 --- a/x/pairing/keeper/msg_server_relay_payment_test.go +++ b/x/pairing/keeper/msg_server_relay_payment_test.go @@ -1063,3 +1063,201 @@ func TestPairingCaching(t *testing.T) { require.Equal(t, totalCU*3, sub.Sub.MonthCuTotal-sub.Sub.MonthCuLeft) } } + +// TestUpdateReputationEpochQosScore tests the update of the reputation's epoch qos score +// Scenarios: +// 1. provider1 sends relay -> its reputation is updated (epoch score and time last updated), +// also, provider2 reputation is not updated +func TestUpdateReputationEpochQosScore(t *testing.T) { + ts := newTester(t) + ts.setupForPayments(2, 1, 0) // 2 providers, 1 client, default providers-to-pair + + consumerAcc, consumer := ts.GetAccount(common.CONSUMER, 0) + _, provider1 := ts.GetAccount(common.PROVIDER, 0) + _, provider2 := ts.GetAccount(common.PROVIDER, 1) + qos := &types.QualityOfServiceReport{ + Latency: sdk.OneDec(), + Availability: sdk.NewDecWithPrec(1, 1), + Sync: sdk.OneDec(), + } + + res, err := ts.QuerySubscriptionCurrent(consumer) + require.NoError(t, err) + cluster := res.Sub.Cluster + + // set default reputations for both providers. Advance epoch to change the current block time + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider1, types.NewReputation(ts.Ctx)) + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider2, types.NewReputation(ts.Ctx)) + ts.AdvanceEpoch() + + // send relay payment msg from provider1 + relaySession := ts.newRelaySession(provider1, 0, 100, ts.BlockHeight(), 1) + relaySession.QosExcellenceReport = qos + sig, err := sigs.Sign(consumerAcc.SK, *relaySession) + require.NoError(t, err) + relaySession.Sig = sig + + payment := types.MsgRelayPayment{ + Creator: provider1, + Relays: []*types.RelaySession{relaySession}, + } + ts.relayPaymentWithoutPay(payment, true) + + // get both providers reputation: provider1 should have its epoch score and time last updated changed, + // provider2 should have nothing change from the default + r1, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider1) + require.True(t, found) + r2, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider2) + require.True(t, found) + + require.Greater(t, r1.TimeLastUpdated, r2.TimeLastUpdated) + epochScore1, err := r1.EpochScore.Score.Resolve() + require.NoError(t, err) + epochScore2, err := r2.EpochScore.Score.Resolve() + require.NoError(t, err) + variance1, err := r1.EpochScore.Variance.Resolve() + require.NoError(t, err) + variance2, err := r2.EpochScore.Variance.Resolve() + require.NoError(t, err) + require.True(t, epochScore1.GT(epochScore2)) // score is higher because QoS is bad + require.True(t, variance1.GT(variance2)) // variance is higher because the QoS is significantly differnet from DefaultQos + + entry, found := ts.Keepers.Epochstorage.GetStakeEntryCurrent(ts.Ctx, ts.spec.Index, provider1) + require.True(t, found) + require.True(t, entry.Stake.IsEqual(r1.Stake)) +} + +// TestUpdateReputationEpochQosScoreTruncation tests the following scenarios: +// 1. stabilization period has not passed -> no truncation +// 2. stabilization period passed -> with truncation (score update is smaller than the first one) +// note, this test works since we use a bad QoS report (compared to default) so we know that the score should +// increase (which is considered worse) +func TestUpdateReputationEpochQosScoreTruncation(t *testing.T) { + // these will be used to compare the score change with/without truncation + scoreUpdates := []sdk.Dec{} + + // we set the stabilization period to 2 epochs time. Advancing one epoch means we won't truncate, + // advancing 3 means we will truncate. + epochsToAdvance := []uint64{1, 3} + + for i := range epochsToAdvance { + ts := newTester(t) + ts.setupForPayments(1, 1, 0) // 1 provider, 1 client, default providers-to-pair + + consumerAcc, consumer := ts.GetAccount(common.CONSUMER, 0) + _, provider1 := ts.GetAccount(common.PROVIDER, 0) + qos := &types.QualityOfServiceReport{ + Latency: sdk.NewDec(1000), + Availability: sdk.OneDec(), + Sync: sdk.NewDec(1000), + } + + resQCurrent, err := ts.QuerySubscriptionCurrent(consumer) + require.NoError(t, err) + cluster := resQCurrent.Sub.Cluster + + // set stabilization period to be 2*epoch time + resQParams, err := ts.Keepers.Pairing.Params(ts.GoCtx, &types.QueryParamsRequest{}) + require.NoError(t, err) + resQParams.Params.ReputationVarianceStabilizationPeriod = int64(ts.EpochTimeDefault().Seconds()) + ts.Keepers.Pairing.SetParams(ts.Ctx, resQParams.Params) + + // set default reputation + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider1, types.NewReputation(ts.Ctx)) + + // advance epochs + ts.AdvanceEpochs(epochsToAdvance[i]) + + // send relay payment msg from provider1 + relaySession := ts.newRelaySession(provider1, 0, 100, ts.BlockHeight(), 1) + relaySession.QosExcellenceReport = qos + sig, err := sigs.Sign(consumerAcc.SK, *relaySession) + require.NoError(t, err) + relaySession.Sig = sig + + payment := types.MsgRelayPayment{ + Creator: provider1, + Relays: []*types.RelaySession{relaySession}, + } + ts.relayPaymentWithoutPay(payment, true) + + // get update of epoch score + r, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider1) + require.True(t, found) + epochScoreNoTruncation, err := r.EpochScore.Score.Resolve() + require.NoError(t, err) + defaultEpochScore, err := types.ZeroQosScore.Score.Resolve() + require.NoError(t, err) + scoreUpdates = append(scoreUpdates, epochScoreNoTruncation.Sub(defaultEpochScore)) + } + + // require that the score update that was not truncated is larger than the one that was truncated + require.True(t, scoreUpdates[0].GT(scoreUpdates[1])) +} + +// TestUpdateReputationEpochQosScoreTruncation tests the following scenario: +// 1. relay num is the reputation update weight. More relays = bigger update +func TestUpdateReputationEpochQosScoreRelayNumWeight(t *testing.T) { + // these will be used to compare the score change with high/low relay numbers + scoreUpdates := []sdk.Dec{} + + // we set the number of clients to be 1 and 2 to test different relay amount in + // a session + clientsAmount := []int{2, 1} + + for i := range clientsAmount { + ts := newTester(t) + ts.setupForPayments(1, clientsAmount[i], 0) // 1 provider, clientsAmount[i] clients, default providers-to-pair + _, provider1 := ts.GetAccount(common.PROVIDER, 0) + relays := []*types.RelaySession{} + cluster := "" + for j := 0; j < clientsAmount[i]; j++ { + consumerAcc, consumer := ts.GetAccount(common.CONSUMER, j) + + qos := &types.QualityOfServiceReport{ + Latency: sdk.NewDec(1000), + Availability: sdk.OneDec(), + Sync: sdk.NewDec(1000), + } + + resQCurrent, err := ts.QuerySubscriptionCurrent(consumer) + require.NoError(t, err) + cluster = resQCurrent.Sub.Cluster + + // set stabilization period to be 2*epoch time to avoid truncation + resQParams, err := ts.Keepers.Pairing.Params(ts.GoCtx, &types.QueryParamsRequest{}) + require.NoError(t, err) + resQParams.Params.ReputationVarianceStabilizationPeriod = int64(ts.EpochTimeDefault().Seconds()) + ts.Keepers.Pairing.SetParams(ts.Ctx, resQParams.Params) + + // set default reputation + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider1, types.NewReputation(ts.Ctx)) + ts.AdvanceEpoch() + + // send relay payment msg from provider1 + relaySession := ts.newRelaySession(provider1, 0, 100, ts.BlockHeight(), uint64(j)) + relaySession.QosExcellenceReport = qos + sig, err := sigs.Sign(consumerAcc.SK, *relaySession) + require.NoError(t, err) + relaySession.Sig = sig + relays = append(relays, relaySession) + } + payment := types.MsgRelayPayment{ + Creator: provider1, + Relays: relays, + } + ts.relayPaymentWithoutPay(payment, true) + + // get update of epoch score + r, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider1) + require.True(t, found) + epochScoreNoTruncation, err := r.EpochScore.Score.Resolve() + require.NoError(t, err) + defaultEpochScore, err := types.ZeroQosScore.Score.Resolve() + require.NoError(t, err) + scoreUpdates = append(scoreUpdates, epochScoreNoTruncation.Sub(defaultEpochScore)) + } + + // require that the score update that was with 1000 relay num is larger than the one with one relay num + require.True(t, scoreUpdates[0].GT(scoreUpdates[1])) +} diff --git a/x/pairing/keeper/reputation.go b/x/pairing/keeper/reputation.go index 8109c2a222..f303813fc2 100644 --- a/x/pairing/keeper/reputation.go +++ b/x/pairing/keeper/reputation.go @@ -97,6 +97,31 @@ func (k Keeper) GetAllReputation(ctx sdk.Context) []types.ReputationGenesis { return entries } +// UpdateReputationEpochQosScore updates the epoch QoS score of the provider's reputation using the score from the relay +// payment's QoS excellence report +func (k Keeper) UpdateReputationEpochQosScore(ctx sdk.Context, chainID string, cluster string, provider string, score math.LegacyDec, weight int64, stake sdk.Coin) { + // get current reputation and get parameters for the epoch score update + r, found := k.GetReputation(ctx, chainID, cluster, provider) + truncate := false + if found { + stabilizationPeriod := k.ReputationVarianceStabilizationPeriod(ctx) + if r.ShouldTruncate(stabilizationPeriod, ctx.BlockTime().UTC().Unix()) { + truncate = true + } + } else { + // new reputation score is not truncated and its decay factor is equal to 1 + r = types.NewReputation(ctx) + } + + // calculate the updated QoS epoch score + r.EpochScore.Update(score, truncate, weight) + + // update the reputation and set + r.TimeLastUpdated = ctx.BlockTime().UTC().Unix() + r.Stake = stake + k.SetReputation(ctx, chainID, cluster, provider, r) +} + // GetReputationScore returns the current reputation pairing score func (k Keeper) GetReputationScore(ctx sdk.Context, chainID string, cluster string, provider string) (val math.LegacyDec, found bool) { block := uint64(ctx.BlockHeight()) diff --git a/x/pairing/types/QualityOfServiceReport.go b/x/pairing/types/QualityOfServiceReport.go index d5bb0b4836..063a8f8836 100644 --- a/x/pairing/types/QualityOfServiceReport.go +++ b/x/pairing/types/QualityOfServiceReport.go @@ -3,7 +3,9 @@ package types import ( "fmt" + "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/v4/utils" ) func (qos *QualityOfServiceReport) ComputeQoS() (sdk.Dec, error) { @@ -24,3 +26,27 @@ func (qos *QualityOfServiceReport) ComputeQoSExcellence() (sdk.Dec, error) { } return qos.Availability.Quo(qos.Sync).Quo(qos.Latency).ApproxRoot(3) } + +// ComputeQosExcellenceForReputation computes the score from the QoS excellence report to update the provider's reputation +// report score = latency + sync*syncFactor + ((1/availability) - 1) * FailureCost (note: larger value is worse) +func (qos QualityOfServiceReport) ComputeQosExcellenceForReputation(syncFactor math.LegacyDec) (math.LegacyDec, error) { + if qos.Availability.LT(sdk.ZeroDec()) || + qos.Latency.LT(sdk.ZeroDec()) || + qos.Sync.LT(sdk.ZeroDec()) || syncFactor.LT(sdk.ZeroDec()) { + return sdk.ZeroDec(), utils.LavaFormatWarning("ComputeQosExcellenceForReputation: compute failed", fmt.Errorf("QoS excellence scores is below 0"), + utils.LogAttr("availability", qos.Availability.String()), + utils.LogAttr("sync", qos.Sync.String()), + utils.LogAttr("latency", qos.Latency.String()), + ) + } + + latency := qos.Latency + sync := qos.Sync.Mul(syncFactor) + availability := math.LegacyNewDec(FailureCost) + if !qos.Availability.IsZero() { + availability = availability.Mul((math.LegacyOneDec().Quo(qos.Availability)).Sub(math.LegacyOneDec())) + } else { + availability = math.LegacyMaxSortableDec.QuoInt64(2) // on qs.Availability = 0 we take the largest score possible + } + return latency.Add(sync).Add(availability), nil +} diff --git a/x/pairing/types/QualityOfServiceReport_test.go b/x/pairing/types/QualityOfServiceReport_test.go index 83a15ce6d6..9ed38e134d 100644 --- a/x/pairing/types/QualityOfServiceReport_test.go +++ b/x/pairing/types/QualityOfServiceReport_test.go @@ -3,11 +3,12 @@ package types import ( "testing" + "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" ) -func TestQosReport(t *testing.T) { +func createTestQosReportScores(forReputation bool) ([]math.LegacyDec, error) { qos1 := &QualityOfServiceReport{ Latency: sdk.MustNewDecFromStr("1.5"), Availability: sdk.MustNewDecFromStr("1"), @@ -29,20 +30,71 @@ func TestQosReport(t *testing.T) { Sync: sdk.MustNewDecFromStr("0.5"), } - qos1Res, errQos1 := qos1.ComputeQoSExcellence() - qos2Res, errQos2 := qos2.ComputeQoSExcellence() - qos3Res, errQos3 := qos3.ComputeQoSExcellence() - qos4Res, errQos4 := qos4.ComputeQoSExcellence() - require.NoError(t, errQos1) - require.NoError(t, errQos2) - require.NoError(t, errQos3) - require.NoError(t, errQos4) - require.True(t, qos1Res.LT(qos2Res)) - require.True(t, qos1Res.LT(qos3Res)) - require.True(t, qos1Res.LT(qos4Res)) - - require.True(t, qos2Res.GT(qos3Res)) - require.True(t, qos2Res.GT(qos4Res)) - - require.True(t, qos4Res.LT(qos3Res)) + res := []math.LegacyDec{} + if forReputation { + syncFactor := sdk.MustNewDecFromStr("0.5") + qos1Res, errQos1 := qos1.ComputeQosExcellenceForReputation(syncFactor) + if errQos1 != nil { + return nil, errQos1 + } + qos2Res, errQos2 := qos2.ComputeQosExcellenceForReputation(syncFactor) + if errQos2 != nil { + return nil, errQos2 + } + qos3Res, errQos3 := qos3.ComputeQosExcellenceForReputation(syncFactor) + if errQos3 != nil { + return nil, errQos3 + } + qos4Res, errQos4 := qos4.ComputeQosExcellenceForReputation(syncFactor) + if errQos4 != nil { + return nil, errQos4 + } + res = append(res, qos1Res, qos2Res, qos3Res, qos4Res) + } else { + qos1Res, errQos1 := qos1.ComputeQoSExcellence() + if errQos1 != nil { + return nil, errQos1 + } + qos2Res, errQos2 := qos2.ComputeQoSExcellence() + if errQos2 != nil { + return nil, errQos2 + } + qos3Res, errQos3 := qos3.ComputeQoSExcellence() + if errQos3 != nil { + return nil, errQos3 + } + qos4Res, errQos4 := qos4.ComputeQoSExcellence() + if errQos4 != nil { + return nil, errQos4 + } + res = append(res, qos1Res, qos2Res, qos3Res, qos4Res) + } + + return res, nil +} + +func TestQosReport(t *testing.T) { + res, err := createTestQosReportScores(false) + require.NoError(t, err) + require.True(t, res[0].LT(res[1])) + require.True(t, res[0].LT(res[2])) + require.True(t, res[0].LT(res[3])) + + require.True(t, res[1].GT(res[2])) + require.True(t, res[1].GT(res[3])) + + require.True(t, res[3].LT(res[2])) +} + +func TestQosReportForReputation(t *testing.T) { + res, err := createTestQosReportScores(true) + require.NoError(t, err) + require.True(t, res[0].GT(res[1])) + require.True(t, res[0].GT(res[2])) + require.True(t, res[0].LT(res[3])) + + require.True(t, res[1].LT(res[2])) + require.True(t, res[1].LT(res[3])) + + require.True(t, res[3].GT(res[2])) } diff --git a/x/pairing/types/qos_score.go b/x/pairing/types/qos_score.go index eac0175d0a..443e1eba6a 100644 --- a/x/pairing/types/qos_score.go +++ b/x/pairing/types/qos_score.go @@ -7,11 +7,16 @@ import ( "github.com/lavanet/lava/v4/utils" ) -// zero QoS score is: score = 0, var = 0 -var ZeroQosScore = QosScore{ - Score: Frac{Num: math.LegacyZeroDec(), Denom: math.LegacySmallestDec()}, - Variance: Frac{Num: math.LegacyZeroDec(), Denom: math.LegacySmallestDec()}, -} +var ( + FailureCost int64 = 3000 // failed relay cost for QoS excellence report computation in milliseconds + TruncateStdMargin int64 = 3 // number of standard deviations that determine the truncation limit + + // zero QoS score is: score = 0, var = 0 + ZeroQosScore = QosScore{ + Score: Frac{Num: math.LegacyZeroDec(), Denom: math.LegacySmallestDec()}, + Variance: Frac{Num: math.LegacyZeroDec(), Denom: math.LegacySmallestDec()}, + } +) func NewFrac(num math.LegacyDec, denom math.LegacyDec) (Frac, error) { if denom.Equal(math.LegacyZeroDec()) { @@ -39,3 +44,45 @@ func NewQosScore(score Frac, variance Frac) QosScore { func (qs QosScore) Equal(other QosScore) bool { return qs.Score.Equal(other.Score) && qs.Variance.Equal(other.Variance) } + +// Validate validates that both nums are non-negative and both denoms are strictly positive (non-zero) +func (qs QosScore) Validate() bool { + if qs.Score.Num.IsNegative() || !qs.Score.Denom.IsPositive() || qs.Variance.Num.IsNegative() || + !qs.Variance.Denom.IsPositive() { + return false + } + return true +} + +// Update updates a QosScore with a new score from the QoS excellence report. The new score is truncated by the +// current variance. Then, it's updated using the weight (which is currently the relay num) +func (qs *QosScore) Update(score math.LegacyDec, truncate bool, weight int64) { + if truncate { + score = qs.truncate(score) + } + + // updated_variance_num = qos_variance_num + (qos_score_num - score)^2 * weight + // updated_score_denom = qos_score_denom + weight + qs.Variance.Num = qs.Variance.Num.Add((qs.Score.Num.Sub(score)).Power(2).MulInt64(weight)) + qs.Variance.Denom = qs.Variance.Denom.Add(math.LegacyNewDec(weight)) + + // updated_score_num = qos_score_num + score * weight + // updated_score_denom = qos_score_denom + weight + qs.Score.Num = qs.Score.Num.Add(score.MulInt64(weight)) + qs.Score.Denom = qs.Score.Denom.Add(math.LegacyNewDec(weight)) +} + +// Truncate truncates the QoS excellece report score by the current QoS score variance +func (qs QosScore) truncate(score math.LegacyDec) math.LegacyDec { + std, err := qs.Variance.Num.ApproxSqrt() + if err != nil { + utils.LavaFormatError("QosScore truncate: truncate failed, could not calculate qos variance sqrt", err, + utils.LogAttr("qos_score_variance", qs.Variance.String()), + ) + return score + } + + // truncated score = max(min(score, qos_score_num + 3*std), qos_score_num - 3*std) + return math.LegacyMaxDec(math.LegacyMinDec(score, qs.Score.Num.Add(std.MulInt64(TruncateStdMargin))), + qs.Score.Num.Sub(std.MulInt64(TruncateStdMargin))) +} diff --git a/x/pairing/types/relay.pb.go b/x/pairing/types/relay.pb.go index 49f66b8769..e68dd39c6c 100644 --- a/x/pairing/types/relay.pb.go +++ b/x/pairing/types/relay.pb.go @@ -765,9 +765,14 @@ func (m *RelayReply) GetMetadata() []Metadata { } type QualityOfServiceReport struct { - Latency github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,1,opt,name=latency,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"latency" yaml:"Latency"` + // Latency of provider answers in milliseconds, range 0-inf, lower is better + Latency github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,1,opt,name=latency,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"latency" yaml:"Latency"` + // Percentage of times the provider returned a non-error response, range 0-1, higher is better Availability github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,2,opt,name=availability,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"availability" yaml:"availability"` - Sync github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,3,opt,name=sync,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"sync" yaml:"sync"` + // Amount of time the provider is not synced (have the latest block) in milliseconds, range 0-inf, lower is better. + // Example: in ETH we have 15sec block time. So sync = 15000 means that the provider is one block + // behind the actual latest block. + Sync github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,3,opt,name=sync,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"sync" yaml:"sync"` } func (m *QualityOfServiceReport) Reset() { *m = QualityOfServiceReport{} } diff --git a/x/pairing/types/reputation.go b/x/pairing/types/reputation.go index 5043b7ef56..787f90b0ea 100644 --- a/x/pairing/types/reputation.go +++ b/x/pairing/types/reputation.go @@ -30,6 +30,13 @@ func (r Reputation) Equal(other Reputation) bool { r.Stake.IsEqual(other.Stake) } +// ShouldTruncate checks if the ReputationVarianceStabilizationPeriod has passed since +// the reputation's creation. If so, QoS score reports should be truncated before they're added to the +// reputation's epoch QoS score. +func (r Reputation) ShouldTruncate(stabilizationPeriod int64, currentTime int64) bool { + return r.CreationTime+stabilizationPeriod < currentTime +} + // ReputationScoreKey returns a key for the reputations fixation store (reputationsFS) func ReputationScoreKey(chainID string, cluster string, provider string) string { return chainID + " " + cluster + " " + provider diff --git a/x/pairing/types/reputation_test.go b/x/pairing/types/reputation_test.go new file mode 100644 index 0000000000..944b633874 --- /dev/null +++ b/x/pairing/types/reputation_test.go @@ -0,0 +1,39 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestShouldTruncate tests the should truncate method +func TestShouldTruncate(t *testing.T) { + tests := []struct { + name string + creationTime int64 + stabilizationPeriod int64 + currentTime int64 + truncate bool + }{ + { + name: "stabilization time not passed", + creationTime: 1, + stabilizationPeriod: 1, + currentTime: 3, + truncate: true, + }, + { + name: "stabilization time passed", + creationTime: 3, + stabilizationPeriod: 1, + currentTime: 3, + truncate: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reputation := Reputation{CreationTime: tt.creationTime} + require.Equal(t, tt.truncate, reputation.ShouldTruncate(tt.stabilizationPeriod, tt.currentTime)) + }) + } +}