From 25418eebeb06f968d2d17f4812c934a57affb6fe Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Tue, 9 May 2023 16:35:24 -0400 Subject: [PATCH] feat: Limit to number of anchors to sync per run Added a configurable upper limit to the number of anchor activities to synchronize per task run. When the limit is reached then the task is run at an accelerated interval until all activities are processed. closes #1550 Signed-off-by: Bob Stasyszyn --- cmd/orb-server/startcmd/params.go | 76 +++-- cmd/orb-server/startcmd/start.go | 8 +- internal/pkg/log/fields.go | 276 ++++++++++-------- internal/pkg/log/fields_test.go | 261 +++++++++-------- .../anchorsynctask/activitysynctask.go | 230 ++++++++++----- .../anchorsynctask/activitysynctask_test.go | 32 +- pkg/activitypub/service/mocks/taskmgr.go | 13 +- pkg/taskmgr/taskmgr.go | 72 +++-- pkg/taskmgr/taskmgr_test.go | 12 +- test/bdd/fixtures/docker-compose.yml | 102 +++++-- 10 files changed, 677 insertions(+), 405 deletions(-) diff --git a/cmd/orb-server/startcmd/params.go b/cmd/orb-server/startcmd/params.go index 809aa31c6..2f8fda2c2 100644 --- a/cmd/orb-server/startcmd/params.go +++ b/cmd/orb-server/startcmd/params.go @@ -121,7 +121,9 @@ const ( defaultTaskMgrCheckInterval = 10 * time.Second defaultDataExpiryCheckInterval = time.Minute defaultAnchorSyncInterval = time.Minute - defaultAnchorSyncMinActivityAge = time.Minute + defaultAnchorSyncAcceleratedInterval = 15 * time.Second + defaultAnchorSyncMinActivityAge = 10 * time.Minute + defaultAnchorSyncMaxActivities = 500 defaultVCTProofMonitoringInterval = 10 * time.Second defaultVCTLogMonitoringInterval = 10 * time.Second defaultVCTLogMonitoringMaxTreeSize = 50000 @@ -632,10 +634,22 @@ const ( "this service is following. Defaults to 1m if not set. " + commonEnvVarUsageText + anchorSyncIntervalEnvKey + anchorSyncMaxActivitiesFlagName = "sync-max-activities" + anchorSyncMaxActivitiesEnvKey = "ANCHOR_EVENT_SYNC_MAX_ACTIVITIES" + anchorSyncMaxActivitiesFlagUsage = "The maximum number of activities to be synchronized in a single task run. Defaults to 500 if not set. " + + commonEnvVarUsageText + anchorSyncMaxActivitiesEnvKey + + anchorSyncAcceleratedIntervalFlagName = "sync-accelerated-interval" + anchorSyncAcceleratedIntervalEnvKey = "ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL" + anchorSyncNextIntervalFlagUsage = "The interval in which to run the activity sync task after the maximum number of activities " + + "(specified by sync-max-activities) have been processed for the current task run. This should be smaller than the default interval " + + "in order to accelerate processing. Defaults to 15s if not set. " + + commonEnvVarUsageText + anchorSyncAcceleratedIntervalEnvKey + anchorSyncMinActivityAgeFlagName = "sync-min-activity-age" anchorSyncMinActivityAgeEnvKey = "ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE" anchorSyncMinActivityAgeFlagUsage = "The minimum age of an activity to be synchronized. The activity will be " + - "processed only if its age is greater than this value. Defaults to 1m if not set. " + + "processed only if its age is greater than this value. Defaults to 10m if not set. " + commonEnvVarUsageText + anchorSyncMinActivityAgeEnvKey activityPubClientCacheSizeFlagName = "apclient-cache-size" @@ -1584,13 +1598,15 @@ func getVCTParams(cmd *cobra.Command) (*vctParams, error) { } type activityPubParams struct { - pageSize int - anchorSyncPeriod time.Duration - anchorSyncMinActivityAge time.Duration - clientCacheSize int - clientCacheExpiration time.Duration - iriCacheSize int - iriCacheExpiration time.Duration + pageSize int + anchorSyncPeriod time.Duration + anchorSyncAcceleratedPeriod time.Duration + anchorSyncMinActivityAge time.Duration + anchorSyncMaxActivities int + clientCacheSize int + clientCacheExpiration time.Duration + iriCacheSize int + iriCacheExpiration time.Duration } func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) { @@ -1599,7 +1615,7 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) { return nil, fmt.Errorf("%s: %w", activityPubPageSizeFlagName, err) } - syncPeriod, minActivityAge, err := getAnchorSyncParameters(cmd) + syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, err := getAnchorSyncParameters(cmd) if err != nil { return nil, err } @@ -1615,13 +1631,15 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) { } return &activityPubParams{ - pageSize: activityPubPageSize, - anchorSyncPeriod: syncPeriod, - anchorSyncMinActivityAge: minActivityAge, - clientCacheSize: apClientCacheSize, - clientCacheExpiration: apClientCacheExpiration, - iriCacheSize: apIRICacheSize, - iriCacheExpiration: apIRICacheExpiration, + pageSize: activityPubPageSize, + anchorSyncPeriod: syncPeriod, + anchorSyncAcceleratedPeriod: acceleratedSyncPeriod, + anchorSyncMinActivityAge: minActivityAge, + anchorSyncMaxActivities: maxActivities, + clientCacheSize: apClientCacheSize, + clientCacheExpiration: apClientCacheExpiration, + iriCacheSize: apIRICacheSize, + iriCacheExpiration: apIRICacheExpiration, }, nil } @@ -2182,19 +2200,33 @@ func getActivityPubCacheParameters(cmd *cobra.Command, params *cacheParams) (int return cacheSize, cacheExpiration, nil } -func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, minActivityAge time.Duration, err error) { +func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, acceleratedSyncPeriod, + minActivityAge time.Duration, maxActivities int, err error, +) { syncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncIntervalFlagName, anchorSyncIntervalEnvKey, defaultAnchorSyncInterval) if err != nil { - return 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err) + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err) + } + + acceleratedSyncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncAcceleratedIntervalFlagName, anchorSyncAcceleratedIntervalEnvKey, + defaultAnchorSyncAcceleratedInterval) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err) } minActivityAge, err = cmdutil.GetDuration(cmd, anchorSyncMinActivityAgeFlagName, anchorSyncMinActivityAgeEnvKey, defaultAnchorSyncMinActivityAge) if err != nil { - return 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err) + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err) + } + + maxActivities, err = cmdutil.GetInt(cmd, anchorSyncMaxActivitiesFlagName, anchorSyncMaxActivitiesEnvKey, + defaultAnchorSyncMaxActivities) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err) } - return syncPeriod, minActivityAge, nil + return syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, nil } func newAPServiceParams(apServiceID, externalEndpoint string, @@ -2377,6 +2409,8 @@ func createFlags(startCmd *cobra.Command) { startCmd.Flags().StringP(httpTimeoutFlagName, "", "", httpTimeoutFlagUsage) startCmd.Flags().StringP(httpDialTimeoutFlagName, "", "", httpDialTimeoutFlagUsage) startCmd.Flags().StringP(anchorSyncIntervalFlagName, anchorSyncIntervalFlagShorthand, "", anchorSyncIntervalFlagUsage) + startCmd.Flags().StringP(anchorSyncAcceleratedIntervalFlagName, "", "", anchorSyncNextIntervalFlagUsage) + startCmd.Flags().StringP(anchorSyncMaxActivitiesFlagName, "", "", anchorSyncMaxActivitiesFlagUsage) startCmd.Flags().StringP(anchorSyncMinActivityAgeFlagName, "", "", anchorSyncMinActivityAgeFlagUsage) startCmd.Flags().StringP(vctProofMonitoringIntervalFlagName, "", "", vctProofMonitoringIntervalFlagUsage) startCmd.Flags().StringP(vctProofMonitoringExpiryPeriodFlagName, "", "", vctProofMonitoringExpiryPeriodFlagUsage) diff --git a/cmd/orb-server/startcmd/start.go b/cmd/orb-server/startcmd/start.go index aeeb3338e..910640e35 100644 --- a/cmd/orb-server/startcmd/start.go +++ b/cmd/orb-server/startcmd/start.go @@ -873,9 +873,11 @@ func startOrbServices(parameters *orbParameters) error { err = anchorsynctask.Register( anchorsynctask.Config{ - ServiceIRI: parameters.apServiceParams.serviceIRI(), - Interval: parameters.activityPub.anchorSyncPeriod, - MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge, + ServiceIRI: parameters.apServiceParams.serviceIRI(), + Interval: parameters.activityPub.anchorSyncPeriod, + AcceleratedInterval: parameters.activityPub.anchorSyncAcceleratedPeriod, + MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge, + MaxActivitiesToSync: parameters.activityPub.anchorSyncMaxActivities, }, taskMgr, apClient, apStore, storeProviders.provider, func() apspi.InboxHandler { diff --git a/internal/pkg/log/fields.go b/internal/pkg/log/fields.go index 39e9412c5..b99c7e88d 100644 --- a/internal/pkg/log/fields.go +++ b/internal/pkg/log/fields.go @@ -19,135 +19,138 @@ import ( // Log Fields. const ( - FieldURI = "uri" - FieldURIs = "uris" - FieldURL = "url" - FieldSenderURL = "sender" - FieldConfig = "config" - FieldServiceName = "service" - FieldServiceIRI = "serviceIri" - FieldServiceEndpoint = "serviceEndpoint" - FieldActorID = "actorId" - FieldOriginActorID = "originActorId" - FieldActivityType = "activityType" - FieldActivityID = "activityId" - FieldMessageID = "messageId" - FieldData = "data" - FieldMetadata = "metadata" - FieldRequestURL = "requestUrl" - FieldRequestHeaders = "requestHeaders" - FieldRequestBody = "requestBody" - FieldSize = "size" - FieldMaxSize = "maxSize" - FieldCacheExpiration = "cacheExpiration" - FieldCacheRefreshInterval = "cacheRefreshInterval" - FieldCacheRefreshAttempts = "cacheRefreshAttempts" - FieldTarget = "target" - FieldTargets = "targets" - FieldHTTPMethod = "httpMethod" - FieldParameter = "parameter" - FieldParameters = "parameters" - FieldAcceptListType = "acceptListType" - FieldAdditions = "additions" - FieldDeletions = "deletions" - FieldReferenceType = "referenceType" - FieldAnchorURI = "anchorUri" - FieldAnchorURIs = "anchorURIs" - FieldAnchorHash = "anchorHash" - FieldAnchorEventURI = "anchorEventUri" - FieldObjectIRI = "objectIri" - FieldReferenceIRI = "reference" - FieldKeyID = "keyId" - FieldKeyType = "keyType" - FieldKeyOwner = "keyOwner" - FieldCurrent = "current" - FieldNext = "next" - FieldTotal = "total" - FieldMinimum = "minimum" - FieldType = "type" - FieldQuery = "query" - FieldSuffix = "suffix" - FieldSuffixes = "suffixes" - FieldVerifiableCredential = "vc" - FieldVerifiableCredentialID = "vcId" - FieldHash = "hash" - FieldHashlink = "hashlink" - FieldLocalHashlink = "localHashlink" - FieldParent = "parent" - FieldParents = "parents" - FieldProof = "proof" - FieldCreatedTime = "createdTime" - FieldWitnessURI = "witnessUri" - FieldWitnessURIs = "witnessURIs" - FieldWitnessPolicy = "witnessPolicy" - FieldAnchorOrigin = "anchorOrigin" - FieldAnchorOriginEndpoint = "anchorOriginEndpoint" - FieldOperationType = "operationType" - FieldOperation = "operation" - FieldCoreIndex = "coreIndex" - FieldKey = "key" - FieldValue = "value" - FieldCID = "cid" - FieldResolvedCID = "resolvedCid" - FieldAnchorCID = "anchorCid" - FieldCIDVersion = "cidVersion" - FieldMultihash = "multihash" - FieldCASData = "casData" - FieldDomain = "domain" - FieldLink = "link" - FieldLinks = "links" - FieldTaskMgrInstanceID = "taskMgrInstanceId" - FieldTaskID = "taskId" - FieldRetries = "retries" - FieldMaxRetries = "maxRetries" - FieldSubscriberPoolSize = "subscriberPoolSize" - FieldTaskMonitorInterval = "taskMonitorInterval" - FieldTaskExpiration = "taskExpiration" - FieldDeliveryDelay = "deliveryDelay" - FieldOperationID = "operationId" - FieldPermitHolder = "permitHolder" - FieldTimeSinceLastUpdate = "timeSinceLastUpdate" - FieldGenesisTime = "genesisTime" - FieldSidetreeProtocol = "sidetreeProtocol" - FieldSidetreeTxn = "sidetreeTxn" - FieldDID = "did" - FieldHRef = "href" - FieldID = "id" - FieldResource = "resource" - FieldResolutionResult = "resolutionResult" - FieldResolutionModel = "resolutionModel" - FieldResolutionEndpoints = "resolutionEndpoints" - FieldAuthToken = "authToken" - FieldAuthTokens = "authTokens" - FieldAddress = "address" - FieldAttributedTo = "attributedTo" - FieldAnchorLink = "anchorLink" - FieldAnchorLinkset = "anchorLinkset" - FieldVersion = "version" - FieldDeliveryAttempts = "deliveryAttempts" - FieldProperty = "property" - FieldStorageName = "storeName" - FieldIssuer = "issuer" - FieldStatus = "status" - FieldLogURL = "logURL" - FieldNamespace = "namespace" - FieldCanonicalRef = "canonicalRef" - FieldAnchorString = "anchorString" - FieldJRD = "jrd" - FieldBackoff = "backoff" - FieldTimeout = "timeout" - FieldMaxTime = "maxTime" - FieldLogMonitor = "logMonitor" - FieldLogMonitors = "logMonitors" - FieldIndex = "index" - FieldFromIndex = "fromIndex" - FieldToIndex = "toIndex" - FieldSource = "source" - FieldAge = "age" - FieldMinAge = "minAge" - FieldLogSpec = "logSpec" - FieldTracingProvider = "tracingProvider" - FieldMaxOperationsToRepost = "maxOperationsToRepost" + FieldURI = "uri" + FieldURIs = "uris" + FieldURL = "url" + FieldSenderURL = "sender" + FieldConfig = "config" + FieldServiceName = "service" + FieldServiceIRI = "serviceIri" + FieldServiceEndpoint = "serviceEndpoint" + FieldActorID = "actorId" + FieldOriginActorID = "originActorId" + FieldActivityType = "activityType" + FieldActivityID = "activityId" + FieldMessageID = "messageId" + FieldData = "data" + FieldMetadata = "metadata" + FieldRequestURL = "requestUrl" + FieldRequestHeaders = "requestHeaders" + FieldRequestBody = "requestBody" + FieldSize = "size" + FieldMaxSize = "maxSize" + FieldCacheExpiration = "cacheExpiration" + FieldCacheRefreshInterval = "cacheRefreshInterval" + FieldCacheRefreshAttempts = "cacheRefreshAttempts" + FieldTarget = "target" + FieldTargets = "targets" + FieldHTTPMethod = "httpMethod" + FieldParameter = "parameter" + FieldParameters = "parameters" + FieldAcceptListType = "acceptListType" + FieldAdditions = "additions" + FieldDeletions = "deletions" + FieldReferenceType = "referenceType" + FieldAnchorURI = "anchorUri" + FieldAnchorURIs = "anchorURIs" + FieldAnchorHash = "anchorHash" + FieldAnchorEventURI = "anchorEventUri" + FieldObjectIRI = "objectIri" + FieldReferenceIRI = "reference" + FieldKeyID = "keyId" + FieldKeyType = "keyType" + FieldKeyOwner = "keyOwner" + FieldCurrent = "current" + FieldNext = "next" + FieldTotal = "total" + FieldMinimum = "minimum" + FieldType = "type" + FieldQuery = "query" + FieldSuffix = "suffix" + FieldSuffixes = "suffixes" + FieldVerifiableCredential = "vc" + FieldVerifiableCredentialID = "vcId" + FieldHash = "hash" + FieldHashlink = "hashlink" + FieldLocalHashlink = "localHashlink" + FieldParent = "parent" + FieldParents = "parents" + FieldProof = "proof" + FieldCreatedTime = "createdTime" + FieldWitnessURI = "witnessUri" + FieldWitnessURIs = "witnessURIs" + FieldWitnessPolicy = "witnessPolicy" + FieldAnchorOrigin = "anchorOrigin" + FieldAnchorOriginEndpoint = "anchorOriginEndpoint" + FieldOperationType = "operationType" + FieldOperation = "operation" + FieldCoreIndex = "coreIndex" + FieldKey = "key" + FieldValue = "value" + FieldCID = "cid" + FieldResolvedCID = "resolvedCid" + FieldAnchorCID = "anchorCid" + FieldCIDVersion = "cidVersion" + FieldMultihash = "multihash" + FieldCASData = "casData" + FieldDomain = "domain" + FieldLink = "link" + FieldLinks = "links" + FieldTaskMgrInstanceID = "taskMgrInstanceId" + FieldTaskID = "taskId" + FieldRetries = "retries" + FieldMaxRetries = "maxRetries" + FieldSubscriberPoolSize = "subscriberPoolSize" + FieldTaskMonitorInterval = "taskMonitorInterval" + FieldTaskExpiration = "taskExpiration" + FieldDeliveryDelay = "deliveryDelay" + FieldOperationID = "operationId" + FieldPermitHolder = "permitHolder" + FieldTimeSinceLastUpdate = "timeSinceLastUpdate" + FieldGenesisTime = "genesisTime" + FieldSidetreeProtocol = "sidetreeProtocol" + FieldSidetreeTxn = "sidetreeTxn" + FieldDID = "did" + FieldHRef = "href" + FieldID = "id" + FieldResource = "resource" + FieldResolutionResult = "resolutionResult" + FieldResolutionModel = "resolutionModel" + FieldResolutionEndpoints = "resolutionEndpoints" + FieldAuthToken = "authToken" + FieldAuthTokens = "authTokens" + FieldAddress = "address" + FieldAttributedTo = "attributedTo" + FieldAnchorLink = "anchorLink" + FieldAnchorLinkset = "anchorLinkset" + FieldVersion = "version" + FieldDeliveryAttempts = "deliveryAttempts" + FieldProperty = "property" + FieldStorageName = "storeName" + FieldIssuer = "issuer" + FieldStatus = "status" + FieldLogURL = "logURL" + FieldNamespace = "namespace" + FieldCanonicalRef = "canonicalRef" + FieldAnchorString = "anchorString" + FieldJRD = "jrd" + FieldBackoff = "backoff" + FieldTimeout = "timeout" + FieldMaxTime = "maxTime" + FieldLogMonitor = "logMonitor" + FieldLogMonitors = "logMonitors" + FieldIndex = "index" + FieldFromIndex = "fromIndex" + FieldToIndex = "toIndex" + FieldSource = "source" + FieldAge = "age" + FieldMinAge = "minAge" + FieldLogSpec = "logSpec" + FieldTracingProvider = "tracingProvider" + FieldMaxOperationsToRepost = "maxOperationsToRepost" + FieldMaxActivitiesToSync = "maxActivitiesToSync" + FieldNumActivitiesSynced = "numActivitiesSynced" + FieldNextActivitySyncInterval = "nextActivitySyncInterval" ) // WithMessageID sets the message-id field. @@ -886,6 +889,21 @@ func WithMaxOperationsToRepost(value int) zap.Field { return zap.Int(FieldMaxOperationsToRepost, value) } +// WithMaxActivitiesToSync sets the maxActivitiesToSync field. +func WithMaxActivitiesToSync(value int) zap.Field { + return zap.Int(FieldMaxActivitiesToSync, value) +} + +// WithNumActivitiesSynced sets the maxActivitiesSynced field. +func WithNumActivitiesSynced(value int) zap.Field { + return zap.Int(FieldNumActivitiesSynced, value) +} + +// WithNextActivitySyncInterval sets the nextActivitySyncInterval field. +func WithNextActivitySyncInterval(value time.Duration) zap.Field { + return zap.Duration(FieldNextActivitySyncInterval, value) +} + type jsonMarshaller struct { key string obj interface{} diff --git a/internal/pkg/log/fields_test.go b/internal/pkg/log/fields_test.go index 5c8215fd4..1f62ec264 100644 --- a/internal/pkg/log/fields_test.go +++ b/internal/pkg/log/fields_test.go @@ -62,7 +62,8 @@ func TestStandardFields(t *testing.T) { WithProof([]byte(`{"id":"https://example.com/proof1"}`)), WithCreatedTime(now), WithWitnessURI(u1), WithWitnessURIs(u1, u2), WithWitnessPolicy("some policy"), WithAnchorOrigin(u1.String()), WithOperationType("Create"), WithCoreIndex("1234"), - WithMaxOperationsToRepost(300), + WithMaxOperationsToRepost(300), WithMaxActivitiesToSync(11), WithNextActivitySyncInterval(3*time.Second), + WithNumActivitiesSynced(123), ) t.Logf(stdOut.String()) @@ -119,6 +120,9 @@ func TestStandardFields(t *testing.T) { require.Equal(t, "Create", l.OperationType) require.Equal(t, "1234", l.CoreIndex) require.Equal(t, 300, l.MaxOperationsToRepost) + require.Equal(t, 11, l.MaxActivitiesToSync) + require.Equal(t, "3s", l.NextActivitySyncInterval) + require.Equal(t, 123, l.NumActivitiesSynced) }) t.Run("json fields 2", func(t *testing.T) { @@ -319,132 +323,135 @@ type logData struct { Msg string `json:"msg"` Error string `json:"error"` - MessageID string `json:"messageId"` - Data string `json:"data"` - ActorID string `json:"actorId"` - ActivityID string `json:"activityId"` - ActivityType string `json:"activityType"` - ServiceIri string `json:"serviceIri"` - Service string `json:"service"` - ServiceEndpoint string `json:"serviceEndpoint"` - Size int `json:"size"` - CacheExpiration string `json:"cacheExpiration"` - Target string `json:"target"` - Parameter string `json:"parameter"` - ReferenceType string `json:"referenceType"` - URI string `json:"uri"` - URIs []string `json:"uris"` - Sender string `json:"sender"` - AnchorURI string `json:"anchorUri"` - AnchorEventURI string `json:"anchorEventUri"` - Config *mockObject `json:"config"` - AcceptListType string `json:"acceptListType"` - Additions []string `json:"additions"` - Deletions []string `json:"deletions"` - RequestURL string `json:"requestUrl"` - RequestHeaders map[string][]string `json:"requestHeaders"` - RequestBody string `json:"requestBody"` - ObjectIRI string `json:"objectIri"` - Reference string `json:"reference"` - KeyID string `json:"keyId"` - KeyOwnerID string `json:"keyOwner"` - KeyType string `json:"keyType"` - Current string `json:"current"` - Next string `json:"next"` - Total int `json:"total"` - Minimum int `json:"minimum"` - Type string `json:"type"` - Query *mockObject `json:"query"` - AnchorHash string `json:"anchorHash"` - Suffix string `json:"suffix"` - VerifiableCredential string `json:"vc"` - VerifiableCredentialID string `json:"vcId"` - Hashlink string `json:"hashlink"` - Parent string `json:"parent"` - Parents []string `json:"parents"` - Proof string `json:"proof"` - CreatedTime string `json:"createdTime"` - WitnessURI string `json:"witnessUri"` - WitnessURIs []string `json:"WitnessURIs"` //nolint:tagliatelle - WitnessPolicy string `json:"witnessPolicy"` - AnchorOrigin string `json:"anchorOrigin"` - OperationType string `json:"operationType"` - CoreIndex string `json:"coreIndex"` - Hash string `json:"hash"` - AnchorOriginEndpoint *mockObject `json:"anchorOriginEndpoint"` - Key string `json:"key"` - CID string `json:"cid"` - ResolvedCID string `json:"resolvedCid"` - AnchorCID string `json:"anchorCid"` - CIDVersion int `json:"cidVersion"` - Multihash string `json:"multihash"` - CASData string `json:"casData"` - Domain string `json:"domain"` - Link string `json:"link"` - Links []string `json:"links"` - TaskMgrInstanceID string `json:"taskMgrInstanceId"` - Retries int `json:"retries"` - MaxRetries int `json:"maxRetries"` - SubscriberPoolSize int `json:"subscriberPoolSize"` - TaskMonitorInterval string `json:"taskMonitorInterval"` - TaskExpiration string `json:"taskExpiration"` - DeliveryDelay string `json:"deliveryDelay"` - OperationID string `json:"operationId"` - PermitHolder string `json:"permitHolder"` - TimeSinceLastUpdate string `json:"timeSinceLastUpdate"` - GenesisTime int `json:"genesisTime"` - DID string `json:"did"` - HRef string `json:"href"` - ID string `json:"id"` - Resource string `json:"resource"` - ResolutionResult *mockObject `json:"resolutionResult"` - ResolutionModel *mockObject `json:"resolutionModel"` - ResolutionEndpoints []string `json:"resolutionEndpoints"` - Metadata *mockObject `json:"metadata"` - SidetreeProtocol *mockObject `json:"sidetreeProtocol"` - OriginActorID string `json:"originActorId"` - Targets []string `json:"targets"` - HTTPMethod string `json:"httpMethod"` - Suffixes []string `json:"suffixes"` - LocalHashlink string `json:"localHashlink"` - AuthToken string `json:"authToken"` - AuthTokens []string `json:"authTokens"` - Address string `json:"address"` - AttributedTo string `json:"attributedTo"` - AnchorLinkset string `json:"anchorLinkset"` - Version string `json:"version"` - MaxSize int `json:"maxSize"` - Parameters *mockObject `json:"parameters"` - URL string `json:"url"` - AnchorURIs []string `json:"anchorURIs"` //nolint:tagliatelle - Operation *mockObject `json:"operation"` - Value string `json:"value"` - TaskID string `json:"taskId"` - SidetreeTxn *mockObject `json:"sidetreeTxn"` - AnchorLink string `json:"anchorLink"` - DeliveryAttempts int `json:"deliveryAttempts"` - Property string `json:"property"` - StoreName string `json:"storeName"` - Issuer string `json:"issuer"` - Status string `json:"status"` - LogURL string `json:"logUrl"` - Namespace string `json:"namespace"` - CanonicalRef string `json:"canonicalRef"` - AnchorString string `json:"anchorString"` - JRD *mockObject `json:"jrd"` - Backoff string `json:"backoff"` - Timeout string `json:"timeout"` - LogMonitor *mockObject `json:"logMonitor"` - LogMonitors []*mockObject `json:"logMonitors"` - MaxTime string `json:"maxTime"` - Index int `json:"index"` - FromIndex int `json:"fromIndex"` - ToIndex int `json:"toIndex"` - Source string `json:"source"` - Age string `json:"age"` - MinAge string `json:"minAge"` - LogSpec string `json:"logSpec"` - MaxOperationsToRepost int `json:"maxOperationsToRepost"` + MessageID string `json:"messageId"` + Data string `json:"data"` + ActorID string `json:"actorId"` + ActivityID string `json:"activityId"` + ActivityType string `json:"activityType"` + ServiceIri string `json:"serviceIri"` + Service string `json:"service"` + ServiceEndpoint string `json:"serviceEndpoint"` + Size int `json:"size"` + CacheExpiration string `json:"cacheExpiration"` + Target string `json:"target"` + Parameter string `json:"parameter"` + ReferenceType string `json:"referenceType"` + URI string `json:"uri"` + URIs []string `json:"uris"` + Sender string `json:"sender"` + AnchorURI string `json:"anchorUri"` + AnchorEventURI string `json:"anchorEventUri"` + Config *mockObject `json:"config"` + AcceptListType string `json:"acceptListType"` + Additions []string `json:"additions"` + Deletions []string `json:"deletions"` + RequestURL string `json:"requestUrl"` + RequestHeaders map[string][]string `json:"requestHeaders"` + RequestBody string `json:"requestBody"` + ObjectIRI string `json:"objectIri"` + Reference string `json:"reference"` + KeyID string `json:"keyId"` + KeyOwnerID string `json:"keyOwner"` + KeyType string `json:"keyType"` + Current string `json:"current"` + Next string `json:"next"` + Total int `json:"total"` + Minimum int `json:"minimum"` + Type string `json:"type"` + Query *mockObject `json:"query"` + AnchorHash string `json:"anchorHash"` + Suffix string `json:"suffix"` + VerifiableCredential string `json:"vc"` + VerifiableCredentialID string `json:"vcId"` + Hashlink string `json:"hashlink"` + Parent string `json:"parent"` + Parents []string `json:"parents"` + Proof string `json:"proof"` + CreatedTime string `json:"createdTime"` + WitnessURI string `json:"witnessUri"` + WitnessURIs []string `json:"WitnessURIs"` //nolint:tagliatelle + WitnessPolicy string `json:"witnessPolicy"` + AnchorOrigin string `json:"anchorOrigin"` + OperationType string `json:"operationType"` + CoreIndex string `json:"coreIndex"` + Hash string `json:"hash"` + AnchorOriginEndpoint *mockObject `json:"anchorOriginEndpoint"` + Key string `json:"key"` + CID string `json:"cid"` + ResolvedCID string `json:"resolvedCid"` + AnchorCID string `json:"anchorCid"` + CIDVersion int `json:"cidVersion"` + Multihash string `json:"multihash"` + CASData string `json:"casData"` + Domain string `json:"domain"` + Link string `json:"link"` + Links []string `json:"links"` + TaskMgrInstanceID string `json:"taskMgrInstanceId"` + Retries int `json:"retries"` + MaxRetries int `json:"maxRetries"` + SubscriberPoolSize int `json:"subscriberPoolSize"` + TaskMonitorInterval string `json:"taskMonitorInterval"` + TaskExpiration string `json:"taskExpiration"` + DeliveryDelay string `json:"deliveryDelay"` + OperationID string `json:"operationId"` + PermitHolder string `json:"permitHolder"` + TimeSinceLastUpdate string `json:"timeSinceLastUpdate"` + GenesisTime int `json:"genesisTime"` + DID string `json:"did"` + HRef string `json:"href"` + ID string `json:"id"` + Resource string `json:"resource"` + ResolutionResult *mockObject `json:"resolutionResult"` + ResolutionModel *mockObject `json:"resolutionModel"` + ResolutionEndpoints []string `json:"resolutionEndpoints"` + Metadata *mockObject `json:"metadata"` + SidetreeProtocol *mockObject `json:"sidetreeProtocol"` + OriginActorID string `json:"originActorId"` + Targets []string `json:"targets"` + HTTPMethod string `json:"httpMethod"` + Suffixes []string `json:"suffixes"` + LocalHashlink string `json:"localHashlink"` + AuthToken string `json:"authToken"` + AuthTokens []string `json:"authTokens"` + Address string `json:"address"` + AttributedTo string `json:"attributedTo"` + AnchorLinkset string `json:"anchorLinkset"` + Version string `json:"version"` + MaxSize int `json:"maxSize"` + Parameters *mockObject `json:"parameters"` + URL string `json:"url"` + AnchorURIs []string `json:"anchorURIs"` //nolint:tagliatelle + Operation *mockObject `json:"operation"` + Value string `json:"value"` + TaskID string `json:"taskId"` + SidetreeTxn *mockObject `json:"sidetreeTxn"` + AnchorLink string `json:"anchorLink"` + DeliveryAttempts int `json:"deliveryAttempts"` + Property string `json:"property"` + StoreName string `json:"storeName"` + Issuer string `json:"issuer"` + Status string `json:"status"` + LogURL string `json:"logUrl"` + Namespace string `json:"namespace"` + CanonicalRef string `json:"canonicalRef"` + AnchorString string `json:"anchorString"` + JRD *mockObject `json:"jrd"` + Backoff string `json:"backoff"` + Timeout string `json:"timeout"` + LogMonitor *mockObject `json:"logMonitor"` + LogMonitors []*mockObject `json:"logMonitors"` + MaxTime string `json:"maxTime"` + Index int `json:"index"` + FromIndex int `json:"fromIndex"` + ToIndex int `json:"toIndex"` + Source string `json:"source"` + Age string `json:"age"` + MinAge string `json:"minAge"` + LogSpec string `json:"logSpec"` + MaxOperationsToRepost int `json:"maxOperationsToRepost"` + MaxActivitiesToSync int `json:"maxActivitiesToSync"` + NextActivitySyncInterval string `json:"nextActivitySyncInterval"` + NumActivitiesSynced int `json:"numActivitiesSynced"` } func unmarshalLogData(t *testing.T, b []byte) *logData { diff --git a/pkg/activitypub/service/anchorsynctask/activitysynctask.go b/pkg/activitypub/service/anchorsynctask/activitysynctask.go index 6cc678f09..eb9fb52aa 100644 --- a/pkg/activitypub/service/anchorsynctask/activitysynctask.go +++ b/pkg/activitypub/service/anchorsynctask/activitysynctask.go @@ -10,6 +10,7 @@ import ( "context" "errors" "fmt" + "math" "net/url" "time" @@ -32,8 +33,10 @@ const logModule = "activity_sync" var logger = log.New(logModule) const ( - defaultInterval = time.Minute - defaultMinActivityAge = time.Minute + defaultInterval = time.Minute + defaultAcceleratedInterval = 15 * time.Second + defaultMinActivityAge = time.Minute + defaultMaxActivitiesToSync = math.MaxInt taskName = "activity-sync" ) @@ -51,60 +54,53 @@ type activityPubClient interface { } type taskManager interface { - RegisterTask(taskType string, interval time.Duration, task func()) + RegisterTaskEx(taskType string, interval time.Duration, task func() time.Duration) } // Config contains configuration parameters for the anchor event synchronization task. type Config struct { - ServiceIRI *url.URL - Interval time.Duration - MinActivityAge time.Duration + ServiceIRI *url.URL + Interval time.Duration + AcceleratedInterval time.Duration + MinActivityAge time.Duration + MaxActivitiesToSync int } type task struct { - serviceIRI *url.URL - apClient activityPubClient - store *syncStore - getHandler func() spi.InboxHandler - activityPubStore store.Store - closed chan struct{} - minActivityAge time.Duration - tracer trace.Tracer + serviceIRI *url.URL + apClient activityPubClient + store *syncStore + getHandler func() spi.InboxHandler + activityPubStore store.Store + closed chan struct{} + minActivityAge time.Duration + maxActivitiesToSync int + acceleratedInterval time.Duration + tracer trace.Tracer } // Register registers the anchor event synchronization task. func Register(cfg Config, taskMgr taskManager, apClient activityPubClient, apStore store.Store, storageProvider storage.Provider, handlerFactory func() spi.InboxHandler, ) error { - interval := cfg.Interval + config := resolveConfig(&cfg) - if interval == 0 { - interval = defaultInterval - } - - minActivityAge := cfg.MinActivityAge - - if minActivityAge == 0 { - minActivityAge = defaultMinActivityAge - } - - t, err := newTask(cfg.ServiceIRI, apClient, apStore, storageProvider, minActivityAge, handlerFactory) + t, err := newTask(config, apClient, apStore, storageProvider, handlerFactory) if err != nil { return fmt.Errorf("create task: %w", err) } logger.Info("Registering activity-sync task.", - logfields.WithServiceIRI(cfg.ServiceIRI), logfields.WithTaskMonitorInterval(interval), - logfields.WithMinAge(minActivityAge)) + logfields.WithServiceIRI(config.ServiceIRI), logfields.WithTaskMonitorInterval(config.Interval), + logfields.WithMinAge(config.MinActivityAge), logfields.WithMaxActivitiesToSync(config.MaxActivitiesToSync)) - taskMgr.RegisterTask(taskName, interval, t.run) + taskMgr.RegisterTaskEx(taskName, config.Interval, t.run) return nil } -func newTask(serviceIRI *url.URL, apClient activityPubClient, apStore store.Store, - storageProvider storage.Provider, minActivityAge time.Duration, - handlerFactory func() spi.InboxHandler, +func newTask(cfg *Config, apClient activityPubClient, apStore store.Store, + storageProvider storage.Provider, handlerFactory func() spi.InboxHandler, ) (*task, error) { s, err := newSyncStore(storageProvider) if err != nil { @@ -112,68 +108,132 @@ func newTask(serviceIRI *url.URL, apClient activityPubClient, apStore store.Stor } return &task{ - serviceIRI: serviceIRI, - apClient: apClient, - store: s, - activityPubStore: apStore, - getHandler: handlerFactory, - minActivityAge: minActivityAge, - closed: make(chan struct{}), - tracer: tracing.Tracer(tracing.SubsystemActivityPub), + serviceIRI: cfg.ServiceIRI, + apClient: apClient, + store: s, + activityPubStore: apStore, + getHandler: handlerFactory, + minActivityAge: cfg.MinActivityAge, + maxActivitiesToSync: cfg.MaxActivitiesToSync, + acceleratedInterval: cfg.AcceleratedInterval, + closed: make(chan struct{}), + tracer: tracing.Tracer(tracing.SubsystemActivityPub), }, nil } -func (m *task) run() { +func (m *task) run() time.Duration { + numFromFollowers, err := m.syncFollowers(m.maxActivitiesToSync) + if err != nil { + logger.Error("Error synchronizing activities", log.WithError(err)) + + return 0 + } + + var numFromFollowing int + + if numFromFollowers < m.maxActivitiesToSync { + numFromFollowing, err = m.syncFollowing(m.maxActivitiesToSync - numFromFollowers) + if err != nil { + logger.Error("Error synchronizing activities", log.WithError(err)) + + return 0 + } + } + + numTotal := numFromFollowers + numFromFollowing + + if numTotal > 0 { + if numTotal >= m.maxActivitiesToSync { + logger.Info("Reached the maximum number of activities to sync. Will continue syncing in the next run.", + logfields.WithNumActivitiesSynced(numTotal), logfields.WithNextActivitySyncInterval(m.acceleratedInterval)) + + return m.acceleratedInterval + } + + logger.Info("Done synchronizing activities", logfields.WithNumActivitiesSynced(numTotal)) + } + + return 0 +} + +func (m *task) syncFollowers(maxActivitiesToSync int) (int, error) { followers, err := m.getServices(store.Follower) if err != nil { logger.Error("Error retrieving my followers list", log.WithError(err)) - return + return 0, err } - if len(followers) > 0 { - for _, serviceIRI := range followers { - err = m.sync(serviceIRI, inbox, func(a *vocab.ActivityType) bool { - // Only sync Create activities that were originated by this service. - return a.Type().Is(vocab.TypeCreate) && a.Actor().String() == m.serviceIRI.String() - }) - if err != nil { - logger.Warn("Error processing activities from inbox of service", - logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + if len(followers) == 0 { + return 0, nil + } + + var numProcessed int + + for _, serviceIRI := range followers { + num, err := m.sync(serviceIRI, inbox, maxActivitiesToSync-numProcessed, func(a *vocab.ActivityType) bool { + // Only sync Create activities that were originated by this service. + return a.Type().Is(vocab.TypeCreate) && a.Actor().String() == m.serviceIRI.String() + }) + if err != nil { + logger.Warn("Error processing activities from inbox of service", + logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + } else { + numProcessed += num + + if numProcessed >= maxActivitiesToSync { + break } } - - logger.Debug("Done synchronizing activities with services that are following me.", - logfields.WithTotal(len(followers))) } + logger.Debug("Done synchronizing activities with services that are following me.", logfields.WithTotal(len(followers)), + logfields.WithNumActivitiesSynced(numProcessed)) + + return numProcessed, nil +} + +func (m *task) syncFollowing(maxActivitiesToSync int) (int, error) { following, err := m.getServices(store.Following) if err != nil { - logger.Error("Error retrieving my following list", log.WithError(err)) + return 0, fmt.Errorf("retrieve following list: %w", err) + } - return + if len(following) == 0 { + return 0, nil } - if len(following) > 0 { - for _, serviceIRI := range following { - err = m.sync(serviceIRI, outbox, func(a *vocab.ActivityType) bool { - return a.Type().IsAny(vocab.TypeCreate, vocab.TypeAnnounce) - }) - if err != nil { - logger.Warn("Error processing activities from outbox of service", - logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + var numProcessed int + + for _, serviceIRI := range following { + num, err := m.sync(serviceIRI, outbox, maxActivitiesToSync-numProcessed, func(a *vocab.ActivityType) bool { + return a.Type().IsAny(vocab.TypeCreate, vocab.TypeAnnounce) + }) + if err != nil { + logger.Warn("Error processing activities from outbox of service", + logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + } else { + numProcessed += num + + if numProcessed >= maxActivitiesToSync { + break } } - - logger.Debug("Done synchronizing activities with services that I'm following.", logfields.WithTotal(len(following))) } + + logger.Debug("Done synchronizing activities with services that I'm following.", logfields.WithTotal(len(following)), + logfields.WithNumActivitiesSynced(numProcessed)) + + return numProcessed, nil } //nolint:cyclop -func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vocab.ActivityType) bool) error { +func (m *task) sync(serviceIRI *url.URL, src activitySource, maxNumActivitiesToProcess int, + shouldSync func(*vocab.ActivityType) bool, +) (int, error) { it, lastSyncedPage, lastSyncedIndex, err := m.getNewActivities(serviceIRI, src) if err != nil { - return fmt.Errorf("get new activities: %w", err) + return 0, fmt.Errorf("get new activities: %w", err) } page, index := lastSyncedPage, lastSyncedIndex @@ -192,7 +252,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo break } - return fmt.Errorf("next activity: %w", e) + return numProcessed, fmt.Errorf("next activity: %w", e) } currentPage := it.CurrentPage() @@ -219,7 +279,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo n, e := m.syncActivity(span.Start("sync activities"), serviceIRI, currentPage, a) if e != nil { - return fmt.Errorf("sync activity [%s]: %w", a.ID(), e) + return numProcessed, fmt.Errorf("sync activity [%s]: %w", a.ID(), e) } numProcessed += n @@ -227,6 +287,10 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo progress.Log(n, page, currentPage) page, index = currentPage, it.NextIndex()-1 + + if numProcessed >= maxNumActivitiesToProcess { + break + } } if page.String() != lastSyncedPage.String() || index != lastSyncedIndex { @@ -241,7 +305,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo err = m.store.PutLastSyncedPage(serviceIRI, src, page, index) if err != nil { - return fmt.Errorf("update last synced page [%s] at index [%d]: %w", page, index, err) + return numProcessed, fmt.Errorf("update last synced page [%s] at index [%d]: %w", page, index, err) } } else { logger.Debug("Processed missing anchor events ending at page/index.", @@ -249,7 +313,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo logfields.WithURL(page), logfields.WithIndex(index)) } - return nil + return numProcessed, nil } func (m *task) syncActivity(ctx context.Context, serviceIRI, currentPage *url.URL, a *vocab.ActivityType) (int, error) { @@ -393,6 +457,28 @@ func (m *task) getLastSyncedPage(serviceIRI *url.URL, src activitySource) (*url. return actor.Outbox(), 0, nil } +func resolveConfig(cfg *Config) *Config { + config := *cfg + + if config.Interval == 0 { + config.Interval = defaultInterval + } + + if config.AcceleratedInterval == 0 { + config.AcceleratedInterval = defaultAcceleratedInterval + } + + if config.MinActivityAge == 0 { + config.MinActivityAge = defaultMinActivityAge + } + + if config.MaxActivitiesToSync == 0 { + config.MaxActivitiesToSync = defaultMaxActivitiesToSync + } + + return &config +} + type progressLogger struct { numProcessedInPage int } diff --git a/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go b/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go index a99f72337..afd52a90f 100644 --- a/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go +++ b/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go @@ -102,12 +102,19 @@ func TestRun(t *testing.T) { WithActivities(activities) t.Run("Success", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Second, + MaxActivitiesToSync: 2, + AcceleratedInterval: time.Millisecond, + }) + handler := &mockHandler{} handler.duplicateAnchors = append(handler.duplicateAnchors, announceActivities[1], createActivities[1]) task, err := newTask( - serviceIRI, apClient, apStore, storage.NewMockStoreProvider(), time.Second, + cfg, apClient, apStore, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, @@ -122,12 +129,19 @@ func TestRun(t *testing.T) { time.Sleep(time.Second) + // Split into two runs with MaxActivitiesToSync=2. + task.run() task.run() require.Equal(t, 3, len(handler.activities)) }) t.Run("QueryReferences error", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Nanosecond, + }) + errExpected := errors.New("injected query error") s := &mocks.ActivityStore{} @@ -136,7 +150,7 @@ func TestRun(t *testing.T) { handler := &mockHandler{} task, err := newTask( - serviceIRI, apClient, s, storage.NewMockStoreProvider(), time.Nanosecond, + cfg, apClient, s, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, @@ -150,6 +164,11 @@ func TestRun(t *testing.T) { }) t.Run("ReferenceIterator error", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Nanosecond, + }) + errExpected := errors.New("injected iterator error") it := &mocks2.ReferenceIterator{} @@ -161,7 +180,7 @@ func TestRun(t *testing.T) { handler := &mockHandler{} task, err := newTask( - serviceIRI, apClient, s, storage.NewMockStoreProvider(), time.Nanosecond, + cfg, apClient, s, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, @@ -175,6 +194,11 @@ func TestRun(t *testing.T) { }) t.Run("GetActor error", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Nanosecond, + }) + errExpected := errors.New("injected client error") apClient := mocks.NewActivitPubClient().WithError(errExpected) @@ -182,7 +206,7 @@ func TestRun(t *testing.T) { handler := &mockHandler{} task, err := newTask( - serviceIRI, apClient, apStore, storage.NewMockStoreProvider(), time.Nanosecond, + cfg, apClient, apStore, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, diff --git a/pkg/activitypub/service/mocks/taskmgr.go b/pkg/activitypub/service/mocks/taskmgr.go index 2a15adfa2..24eaccb62 100644 --- a/pkg/activitypub/service/mocks/taskmgr.go +++ b/pkg/activitypub/service/mocks/taskmgr.go @@ -77,7 +77,7 @@ func (m *TaskManager) stop() { type task struct { interval time.Duration - run func() + run func() time.Duration lastRunTime time.Time } @@ -95,7 +95,16 @@ func (m *TaskManager) InstanceID() string { } // RegisterTask registers the given task to be run at the given interval. -func (m *TaskManager) RegisterTask(_ string, interval time.Duration, run func()) { +func (m *TaskManager) RegisterTask(id string, interval time.Duration, run func()) { + m.RegisterTaskEx(id, interval, func() time.Duration { + run() + + return 0 + }) +} + +// RegisterTaskEx registers the given task to be run at the given interval. +func (m *TaskManager) RegisterTaskEx(_ string, interval time.Duration, run func() time.Duration) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/taskmgr/taskmgr.go b/pkg/taskmgr/taskmgr.go index fbdc739d9..6121834e2 100644 --- a/pkg/taskmgr/taskmgr.go +++ b/pkg/taskmgr/taskmgr.go @@ -11,7 +11,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/google/uuid" @@ -103,14 +102,26 @@ func (s *Manager) InstanceID() string { } // RegisterTask registers a task to be periodically run at the given interval. -func (s *Manager) RegisterTask(id string, interval time.Duration, task func()) { +func (s *Manager) RegisterTask(id string, interval time.Duration, run func()) { + s.RegisterTaskEx(id, interval, func() time.Duration { + run() + + return 0 + }) +} + +// RegisterTaskEx registers a task to be periodically run at the given interval. +// The task returns an override of the default interval. For example, if 5s is returned then the task +// will run again in 5 seconds. If 0 is returned then the task will run at the default interval. +func (s *Manager) RegisterTaskEx(id string, defaultInterval time.Duration, task func() time.Duration) { s.mutex.Lock() defer s.mutex.Unlock() s.tasks[id] = ®istration{ - handle: task, - id: id, - interval: interval, + handle: task, + id: id, + defaultInterval: defaultInterval, + nextInterval: defaultInterval, } } @@ -153,7 +164,7 @@ func (s *Manager) stop() { } func (s *Manager) run(t *registration) error { - if t.isRunning() { + if t.Running() { s.logger.Debug("Task is still running. Updating timestamp in the permit to tell others that I'm still alive.", logfields.WithTaskID(t.id)) @@ -228,10 +239,10 @@ func (s *Manager) shouldRun(t *registration) (bool, error) { timeSinceLastUpdate := time.Since(timeOfLastUpdate).Truncate(time.Second) if currentPermit.CurrentHolder == s.instanceID { - if timeSinceLastUpdate < t.interval { + if timeSinceLastUpdate < t.NextInterval() { s.logger.Debug("It's currently my duty to run this task but it's not time for it to run.", logfields.WithTaskID(t.id), logfields.WithTimeSinceLastUpdate(timeSinceLastUpdate), - logfields.WithTaskMonitorInterval(t.interval)) + logfields.WithTaskMonitorInterval(t.NextInterval())) return false, nil } @@ -249,7 +260,7 @@ func (s *Manager) shouldRun(t *registration) (bool, error) { // within the cluster have the same interval setting (which they should). // So, "unusually long time" means that the 'last update' time is greater than the Task Manager check interval plus // the task's run interval, in which case we'll assume that the other instance is dead and will take over. - maxTime := s.interval + t.interval + maxTime := s.interval + t.DefaultInterval() if timeSinceLastUpdate > maxTime { s.logger.Info("The current permit holder for this task has not updated the permit in an "+ @@ -299,23 +310,48 @@ func getPermitKey(taskID string) string { } type registration struct { - handle func() - running uint32 - id string - interval time.Duration + handle func() time.Duration + running bool + id string + defaultInterval time.Duration + nextInterval time.Duration + mutex sync.RWMutex } func (r *registration) run() { - if !atomic.CompareAndSwapUint32(&r.running, 0, 1) { + if r.Running() { // Already running. return } - r.handle() + nextInterval := r.handle() + + if nextInterval == 0 { + nextInterval = r.defaultInterval + } + + r.mutex.Lock() + + r.running = false + r.nextInterval = nextInterval - atomic.StoreUint32(&r.running, 0) + r.mutex.Unlock() } -func (r *registration) isRunning() bool { - return atomic.LoadUint32(&r.running) == 1 +func (r *registration) Running() bool { + r.mutex.RLock() + defer r.mutex.RUnlock() + + return r.running +} + +func (r *registration) DefaultInterval() time.Duration { + return r.defaultInterval +} + +func (r *registration) NextInterval() time.Duration { + r.mutex.RLock() + defer r.mutex.RUnlock() + + return r.nextInterval } diff --git a/pkg/taskmgr/taskmgr_test.go b/pkg/taskmgr/taskmgr_test.go index e8dda48f8..c1fa24b4a 100644 --- a/pkg/taskmgr/taskmgr_test.go +++ b/pkg/taskmgr/taskmgr_test.go @@ -67,9 +67,9 @@ func TestService(t *testing.T) { taskMgr := New(coordinationStore, time.Millisecond) err := taskMgr.run(®istration{ - handle: func() {}, - id: "test-task", - interval: time.Millisecond, + handle: func() time.Duration { return 0 }, + id: "test-task", + defaultInterval: time.Millisecond, }) require.Error(t, err) require.Contains(t, err.Error(), "get permit from DB for task [test-task]: get error") @@ -83,9 +83,9 @@ func TestService(t *testing.T) { taskMgr := New(coordinationStore, time.Millisecond) err := taskMgr.run(®istration{ - handle: func() {}, - id: "test-task", - interval: time.Millisecond, + handle: func() time.Duration { return 0 }, + id: "test-task", + defaultInterval: time.Millisecond, }) require.Error(t, err) require.Contains(t, err.Error(), diff --git a/test/bdd/fixtures/docker-compose.yml b/test/bdd/fixtures/docker-compose.yml index 4b05838fc..d48582bec 100644 --- a/test/bdd/fixtures/docker-compose.yml +++ b/test/bdd/fixtures/docker-compose.yml @@ -15,7 +15,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - VCT_ENABLED=true - ORB_HOST_URL=172.20.0.23:443 - ORB_METRICS_PROVIDER_NAME=prometheus @@ -119,10 +119,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s # VCT_MONITORING_INTERVAL is the interval (period) in which proofs are monitored from various VCTs that promised # to anchor a VC by a certain time. # Default value: 10s. @@ -171,7 +179,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - VCT_ENABLED=true - ORB_HOST_URL=172.20.0.2:443 - ORB_METRICS_PROVIDER_NAME=prometheus @@ -262,10 +270,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s # VCT_MONITORING_INTERVAL is the interval (period) in which proofs are monitored from various VCTs that promised # to anchor a VC by a certain time. # Default value: 10s. @@ -327,7 +343,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.4:80 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.4:48827 @@ -405,10 +421,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ANCHOR_DATA_URI_MEDIA_TYPE=application/json - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite ports: @@ -435,7 +459,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.5:80 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.5:48927 @@ -513,10 +537,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ANCHOR_DATA_URI_MEDIA_TYPE=application/json - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite ports: @@ -543,7 +575,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - VCT_ENABLED=true - ORB_HOST_URL=172.20.0.6:443 - ORB_METRICS_PROVIDER_NAME=prometheus @@ -632,10 +664,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite - VCT_LOG_ENTRIES_STORE_ENABLED=true ports: @@ -667,7 +707,7 @@ services: - ORB_HTTP_SIGN_ACTIVE_KEY_ID=aws-kms://arn:aws:kms:ca-central-1:111122223333:alias/http-sign - AWS_ACCESS_KEY_ID=mock - AWS_SECRET_ACCESS_KEY=mock - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.7:443 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.7:48727 @@ -756,10 +796,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite ports: - 48726:443 @@ -785,7 +833,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb5.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.31:443 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.31:48727 @@ -881,10 +929,18 @@ services: # ANCHOR_EVENT_SYNC_INTERVAL is the interval in which anchor events are synchronized with other services that # we're following. # Default value: 1m. - - ANCHOR_EVENT_SYNC_INTERVAL=10s + - ANCHOR_EVENT_SYNC_INTERVAL=20s + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=3s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m + # Default value: 10m - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s # WITNESS_POLICY_CACHE_EXPIRATION sets the expiration time of witness policy cache. # Default value: 30s. Set the cache expiration very low since we're updating the policy frequently during the test. @@ -1002,7 +1058,7 @@ services: - KMS_DATABASE_PREFIX=keystore_ - KMS_SECRET_LOCK_TYPE=local - KMS_SECRET_LOCK_KEY_PATH=/etc/kms/secret-lock.key - - KMS_LOG_LEVEL=debug + - KMS_LOG_LEVEL=error ports: - 7878:7878 volumes: @@ -1026,7 +1082,7 @@ services: - KMS_DATABASE_PREFIX=keystore_ - KMS_SECRET_LOCK_TYPE=local - KMS_SECRET_LOCK_KEY_PATH=/etc/kms/secret-lock.key - - KMS_LOG_LEVEL=debug + - KMS_LOG_LEVEL=error ports: - 7978:7878 volumes: