Skip to content

Commit

Permalink
feat: Limit to number of anchors to sync per run
Browse files Browse the repository at this point in the history
Added a configurable upper limit to the number of anchor activities to synchronize per task run. When the limit is reached then the task is run at an accelerated interval until all activities are processed.

closes #1550

Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed May 9, 2023
1 parent f77ef5f commit 25418ee
Show file tree
Hide file tree
Showing 10 changed files with 677 additions and 405 deletions.
76 changes: 55 additions & 21 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ const (
defaultTaskMgrCheckInterval = 10 * time.Second
defaultDataExpiryCheckInterval = time.Minute
defaultAnchorSyncInterval = time.Minute
defaultAnchorSyncMinActivityAge = time.Minute
defaultAnchorSyncAcceleratedInterval = 15 * time.Second
defaultAnchorSyncMinActivityAge = 10 * time.Minute
defaultAnchorSyncMaxActivities = 500
defaultVCTProofMonitoringInterval = 10 * time.Second
defaultVCTLogMonitoringInterval = 10 * time.Second
defaultVCTLogMonitoringMaxTreeSize = 50000
Expand Down Expand Up @@ -632,10 +634,22 @@ const (
"this service is following. Defaults to 1m if not set. " +
commonEnvVarUsageText + anchorSyncIntervalEnvKey

anchorSyncMaxActivitiesFlagName = "sync-max-activities"
anchorSyncMaxActivitiesEnvKey = "ANCHOR_EVENT_SYNC_MAX_ACTIVITIES"
anchorSyncMaxActivitiesFlagUsage = "The maximum number of activities to be synchronized in a single task run. Defaults to 500 if not set. " +
commonEnvVarUsageText + anchorSyncMaxActivitiesEnvKey

anchorSyncAcceleratedIntervalFlagName = "sync-accelerated-interval"
anchorSyncAcceleratedIntervalEnvKey = "ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL"
anchorSyncNextIntervalFlagUsage = "The interval in which to run the activity sync task after the maximum number of activities " +
"(specified by sync-max-activities) have been processed for the current task run. This should be smaller than the default interval " +
"in order to accelerate processing. Defaults to 15s if not set. " +
commonEnvVarUsageText + anchorSyncAcceleratedIntervalEnvKey

anchorSyncMinActivityAgeFlagName = "sync-min-activity-age"
anchorSyncMinActivityAgeEnvKey = "ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE"
anchorSyncMinActivityAgeFlagUsage = "The minimum age of an activity to be synchronized. The activity will be " +
"processed only if its age is greater than this value. Defaults to 1m if not set. " +
"processed only if its age is greater than this value. Defaults to 10m if not set. " +
commonEnvVarUsageText + anchorSyncMinActivityAgeEnvKey

activityPubClientCacheSizeFlagName = "apclient-cache-size"
Expand Down Expand Up @@ -1584,13 +1598,15 @@ func getVCTParams(cmd *cobra.Command) (*vctParams, error) {
}

type activityPubParams struct {
pageSize int
anchorSyncPeriod time.Duration
anchorSyncMinActivityAge time.Duration
clientCacheSize int
clientCacheExpiration time.Duration
iriCacheSize int
iriCacheExpiration time.Duration
pageSize int
anchorSyncPeriod time.Duration
anchorSyncAcceleratedPeriod time.Duration
anchorSyncMinActivityAge time.Duration
anchorSyncMaxActivities int
clientCacheSize int
clientCacheExpiration time.Duration
iriCacheSize int
iriCacheExpiration time.Duration
}

func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) {
Expand All @@ -1599,7 +1615,7 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) {
return nil, fmt.Errorf("%s: %w", activityPubPageSizeFlagName, err)
}

syncPeriod, minActivityAge, err := getAnchorSyncParameters(cmd)
syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, err := getAnchorSyncParameters(cmd)
if err != nil {
return nil, err
}
Expand All @@ -1615,13 +1631,15 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) {
}

return &activityPubParams{
pageSize: activityPubPageSize,
anchorSyncPeriod: syncPeriod,
anchorSyncMinActivityAge: minActivityAge,
clientCacheSize: apClientCacheSize,
clientCacheExpiration: apClientCacheExpiration,
iriCacheSize: apIRICacheSize,
iriCacheExpiration: apIRICacheExpiration,
pageSize: activityPubPageSize,
anchorSyncPeriod: syncPeriod,
anchorSyncAcceleratedPeriod: acceleratedSyncPeriod,
anchorSyncMinActivityAge: minActivityAge,
anchorSyncMaxActivities: maxActivities,
clientCacheSize: apClientCacheSize,
clientCacheExpiration: apClientCacheExpiration,
iriCacheSize: apIRICacheSize,
iriCacheExpiration: apIRICacheExpiration,
}, nil
}

Expand Down Expand Up @@ -2182,19 +2200,33 @@ func getActivityPubCacheParameters(cmd *cobra.Command, params *cacheParams) (int
return cacheSize, cacheExpiration, nil
}

func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, minActivityAge time.Duration, err error) {
func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, acceleratedSyncPeriod,
minActivityAge time.Duration, maxActivities int, err error,
) {
syncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncIntervalFlagName, anchorSyncIntervalEnvKey, defaultAnchorSyncInterval)
if err != nil {
return 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err)
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err)
}

acceleratedSyncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncAcceleratedIntervalFlagName, anchorSyncAcceleratedIntervalEnvKey,
defaultAnchorSyncAcceleratedInterval)
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err)
}

minActivityAge, err = cmdutil.GetDuration(cmd, anchorSyncMinActivityAgeFlagName, anchorSyncMinActivityAgeEnvKey,
defaultAnchorSyncMinActivityAge)
if err != nil {
return 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err)
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err)
}

maxActivities, err = cmdutil.GetInt(cmd, anchorSyncMaxActivitiesFlagName, anchorSyncMaxActivitiesEnvKey,
defaultAnchorSyncMaxActivities)
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err)
}

return syncPeriod, minActivityAge, nil
return syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, nil
}

func newAPServiceParams(apServiceID, externalEndpoint string,
Expand Down Expand Up @@ -2377,6 +2409,8 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(httpTimeoutFlagName, "", "", httpTimeoutFlagUsage)
startCmd.Flags().StringP(httpDialTimeoutFlagName, "", "", httpDialTimeoutFlagUsage)
startCmd.Flags().StringP(anchorSyncIntervalFlagName, anchorSyncIntervalFlagShorthand, "", anchorSyncIntervalFlagUsage)
startCmd.Flags().StringP(anchorSyncAcceleratedIntervalFlagName, "", "", anchorSyncNextIntervalFlagUsage)
startCmd.Flags().StringP(anchorSyncMaxActivitiesFlagName, "", "", anchorSyncMaxActivitiesFlagUsage)
startCmd.Flags().StringP(anchorSyncMinActivityAgeFlagName, "", "", anchorSyncMinActivityAgeFlagUsage)
startCmd.Flags().StringP(vctProofMonitoringIntervalFlagName, "", "", vctProofMonitoringIntervalFlagUsage)
startCmd.Flags().StringP(vctProofMonitoringExpiryPeriodFlagName, "", "", vctProofMonitoringExpiryPeriodFlagUsage)
Expand Down
8 changes: 5 additions & 3 deletions cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,9 +873,11 @@ func startOrbServices(parameters *orbParameters) error {

err = anchorsynctask.Register(
anchorsynctask.Config{
ServiceIRI: parameters.apServiceParams.serviceIRI(),
Interval: parameters.activityPub.anchorSyncPeriod,
MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge,
ServiceIRI: parameters.apServiceParams.serviceIRI(),
Interval: parameters.activityPub.anchorSyncPeriod,
AcceleratedInterval: parameters.activityPub.anchorSyncAcceleratedPeriod,
MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge,
MaxActivitiesToSync: parameters.activityPub.anchorSyncMaxActivities,
},
taskMgr, apClient, apStore, storeProviders.provider,
func() apspi.InboxHandler {
Expand Down
Loading

0 comments on commit 25418ee

Please sign in to comment.