Skip to content

Commit

Permalink
Merge branch 'main' into CNS-shares-dont-affect-iprpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaroms authored Jan 28, 2025
2 parents 1f9fcc7 + 20ec632 commit 8a42644
Show file tree
Hide file tree
Showing 32 changed files with 1,005 additions and 291 deletions.
6 changes: 3 additions & 3 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ type ConflictHandlerInterface interface {
}

type ProviderInfo struct {
ProviderAddress string
ProviderQoSExcellenceSummery sdk.Dec // the number represents the average qos for this provider session
ProviderStake sdk.Coin
ProviderAddress string
ProviderReputationSummary sdk.Dec // the number represents the average qos for this provider session
ProviderStake sdk.Coin
}

type RelayResult struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/v4/protocol/chainlib"
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/qos"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -202,9 +203,10 @@ func TestConsensusHashesInsertion(t *testing.T) {

func TestQoS(t *testing.T) {
decToSet, _ := sdk.NewDecFromStr("0.05") // test values fit 0.05 Availability requirements
lavasession.AvailabilityPercentage = decToSet
qos.AvailabilityPercentage = decToSet
rand.InitRandomSeed()
chainsToTest := []string{"APT1", "LAV1", "ETH1"}

for i := 0; i < 10; i++ {
for _, chainID := range chainsToTest {
t.Run(chainID, func(t *testing.T) {
Expand Down Expand Up @@ -282,54 +284,63 @@ func TestQoS(t *testing.T) {
currentLatency := time.Millisecond
expectedLatency := time.Millisecond
latestServicedBlock := expectedBH
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(1), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(1), singleConsumerSession.QoSInfo.TotalSyncScore)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(1), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(1), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(1), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(1), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport := singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.OneDec(), lastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

latestServicedBlock = expectedBH + 1
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(2), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(2), singleConsumerSession.QoSInfo.TotalSyncScore)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

singleConsumerSession.QoSInfo.TotalRelays++ // this is how we add a failure
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(3), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.TotalSyncScore)

require.Equal(t, sdk.ZeroDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(2), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(2), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(2), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(2), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.OneDec(), lastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

singleConsumerSession.QoSManager.AddFailedRelay(epoch, singleConsumerSession.SessionId) // this is how we add a failure
singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(3), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(4), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(3), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(3), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.ZeroDec(), lastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.OneDec(), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

latestServicedBlock = expectedBH - 1 // is one block below threshold
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(5), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(4), singleConsumerSession.QoSInfo.TotalSyncScore)

require.Equal(t, sdk.ZeroDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.75"), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(4), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(5), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(3), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(4), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.ZeroDec(), lastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.75"), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

latestServicedBlock = expectedBH + 1
// add in a loop so availability goes above 95%
for i := 5; i < 100; i++ {
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
}
require.Equal(t, sdk.MustNewDecFromStr("0.8"), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.MustNewDecFromStr("0.8"), lastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

finalizationInsertionsSpreadBlocks := []finalizationTestInsertion{
finalizationInsertionForProviders(chainID, epoch, 200, 0, 1, true, "", blocksInFinalizationProof, blockDistanceForFinalizedData)[0],
Expand Down
6 changes: 3 additions & 3 deletions protocol/lavaprotocol/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re
return nil
}

copiedQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastQoSReport)
copiedExcellenceQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastExcellenceQoSReport)
copiedQOS := copyQoSServiceReport(singleConsumerSession.QoSManager.GetLastQoSReport(uint64(epoch), singleConsumerSession.SessionId))
copiedReputation := copyQoSServiceReport(singleConsumerSession.QoSManager.GetLastReputationQoSReport(uint64(epoch), singleConsumerSession.SessionId)) // copy reputation report for the node

return &pairingtypes.RelaySession{
SpecId: chainID,
Expand All @@ -87,7 +87,7 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re
LavaChainId: lavaChainID,
Sig: nil,
Badge: nil,
QosExcellenceReport: copiedExcellenceQOS,
QosExcellenceReport: copiedReputation,
}
}

Expand Down
5 changes: 3 additions & 2 deletions protocol/lavaprotocol/response_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/lavanet/lava/v4/protocol/lavaprotocol/finalizationverification"
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/qos"
"github.com/lavanet/lava/v4/utils/sigs"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
Expand All @@ -29,7 +30,7 @@ func TestSignAndExtractResponse(t *testing.T) {
singleConsumerSession := &lavasession.SingleConsumerSession{
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
QoSManager: qos.NewQoSManager(),
SessionId: 123,
Parent: nil,
RelayNum: 1,
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestSignAndExtractResponseLatest(t *testing.T) {
singleConsumerSession := &lavasession.SingleConsumerSession{
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
QoSManager: qos.NewQoSManager(),
SessionId: 123,
Parent: nil,
RelayNum: 1,
Expand Down
3 changes: 2 additions & 1 deletion protocol/lavaprotocol/reuqest_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/qos"
"github.com/lavanet/lava/v4/utils/sigs"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
"github.com/stretchr/testify/require"
Expand All @@ -18,7 +19,7 @@ func TestSignAndExtract(t *testing.T) {
singleConsumerSession := &lavasession.SingleConsumerSession{
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
QoSManager: qos.NewQoSManager(),
SessionId: 123,
Parent: nil,
RelayNum: 1,
Expand Down
12 changes: 4 additions & 8 deletions protocol/lavasession/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
sdkerrors "cosmossdk.io/errors"
"golang.org/x/exp/slices"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/gogo/status"
"github.com/lavanet/lava/v4/protocol/chainlib/chainproxy"
"github.com/lavanet/lava/v4/utils"
Expand Down Expand Up @@ -48,14 +47,11 @@ const (
unixPrefix = "unix:"
)

var AvailabilityPercentage sdk.Dec = sdk.NewDecWithPrec(1, 1) // TODO move to params pairing
const (
PercentileToCalculateLatency = 0.9
MinProvidersForSync = 0.6
OptimizerPerturbation = 0.10
LatencyThresholdStatic = 1 * time.Second
LatencyThresholdSlope = 1 * time.Millisecond
StaleEpochDistance = 3 // relays done 3 epochs back are ready to be rewarded
OptimizerPerturbation = 0.10
LatencyThresholdStatic = 1 * time.Second
LatencyThresholdSlope = 1 * time.Millisecond
StaleEpochDistance = 3 // relays done 3 epochs back are ready to be rewarded

)

Expand Down
28 changes: 16 additions & 12 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,12 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
ReportedProviders: reportedProviders,
}

// adding qos summery for error parsing.
// adding qos summary for error parsing.
// consumer session is locked here so its ok to read the qos report.
sessionInfo.QoSSummeryResult = consumerSession.getQosComputedResultOrZero()
sessionInfo.QoSSummaryResult = consumerSession.getQosComputedResultOrZero()
sessions[providerAddress] = sessionInfo

qosReport, _ := csm.providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress)
qosReport, _ := csm.providerOptimizer.GetReputationReportForProvider(providerAddress)
if csm.rpcEndpoint.Geolocation != uint64(endpoint.endpoint.Geolocation) {
// rawQosReport is used only when building the relay payment message to be used to update
// the provider's reputation on-chain. If the consumer and provider don't share geolocation
Expand Down Expand Up @@ -936,7 +936,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
consumerSession.BlockListed = true
}

consumerSession.QoSInfo.TotalRelays++
consumerSession.QoSManager.AddFailedRelay(consumerSession.epoch, consumerSession.SessionId)
consumerSession.ConsecutiveErrors = append(consumerSession.ConsecutiveErrors, errorReceived)
// copy consecutive errors for report.
errorsForConsumerSession := consumerSession.ConsecutiveErrors
Expand Down Expand Up @@ -1047,7 +1047,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.ConsecutiveErrors = []error{}
consumerSession.LatestBlock = latestServicedBlock // update latest serviced block
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
consumerSession.QoSManager.CalculateQoS(csm.atomicReadCurrentEpoch(), consumerSession.SessionId, consumerSession.Parent.PublicLavaAddress, currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
if !isHangingApi {
// append relay data only for non hanging apis
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, specComputeUnits, uint64(latestServicedBlock))
Expand All @@ -1066,21 +1066,25 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC
info := csm.RPCEndpoint()
apiInterface := info.ApiInterface
chainId := info.ChainID

var lastQos *pairingtypes.QualityOfServiceReport
var lastQosExcellence *pairingtypes.QualityOfServiceReport
if consumerSession.QoSInfo.LastQoSReport != nil {
qos := *consumerSession.QoSInfo.LastQoSReport
lastQoSReport := consumerSession.QoSManager.GetLastQoSReport(csm.atomicReadCurrentEpoch(), consumerSession.SessionId)
if lastQoSReport != nil {
qos := *lastQoSReport
lastQos = &qos
}
if consumerSession.QoSInfo.LastExcellenceQoSReport != nil {
qosEx := *consumerSession.QoSInfo.LastExcellenceQoSReport
lastQosExcellence = &qosEx

var lastReputation *pairingtypes.QualityOfServiceReport
lastReputationReport := consumerSession.QoSManager.GetLastReputationQoSReport(csm.atomicReadCurrentEpoch(), consumerSession.SessionId)
if lastReputationReport != nil {
qosRep := *lastReputationReport
lastReputation = &qosRep
}
publicProviderAddress := consumerSession.Parent.PublicLavaAddress
publicProviderEndpoint := consumerSession.Parent.Endpoints[0].NetworkAddress

go func() {
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, publicProviderEndpoint, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful)
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, publicProviderEndpoint, lastQos, lastReputation, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful)
}()
}

Expand Down
Loading

0 comments on commit 8a42644

Please sign in to comment.