From d7e886785a1d1d3342e9f724c571557f16ba2fe3 Mon Sep 17 00:00:00 2001 From: Mick Stanciu Date: Tue, 30 May 2023 14:20:03 +1000 Subject: [PATCH] INTG-2821 configure the concurrency (#418) --- .../cmd/export/export.go | 1 + cmd/safetyculture-exporter/cmd/root.go | 2 ++ pkg/api/configuration_manager.go | 13 +++++--- pkg/internal/feed/feed_exporter.go | 32 +++++++++++-------- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/cmd/safetyculture-exporter/cmd/export/export.go b/cmd/safetyculture-exporter/cmd/export/export.go index 387768d6..c6ba476d 100644 --- a/cmd/safetyculture-exporter/cmd/export/export.go +++ b/cmd/safetyculture-exporter/cmd/export/export.go @@ -151,6 +151,7 @@ func NewSafetyCultureExporter(v *viper.Viper) *exporterAPI.SafetyCultureExporter // MapViperConfigToExporterConfiguration maps Viper config to ExporterConfiguration structure func MapViperConfigToExporterConfiguration(v *viper.Viper, cfg *exporterAPI.ExporterConfiguration) { cfg.AccessToken = v.GetString("access_token") + cfg.API.MaxConcurrency = v.GetInt("api.max_concurrency") cfg.SheqsyUsername = v.GetString("sheqsy_username") cfg.SheqsyCompanyID = v.GetString("sheqsy_company_id") cfg.Db.Dialect = v.GetString("db.dialect") diff --git a/cmd/safetyculture-exporter/cmd/root.go b/cmd/safetyculture-exporter/cmd/root.go index 850b53d3..911d3b74 100644 --- a/cmd/safetyculture-exporter/cmd/root.go +++ b/cmd/safetyculture-exporter/cmd/root.go @@ -99,6 +99,7 @@ func configFlags() { connectionFlags.Bool("tls-skip-verify", false, "Skip verification of API TLS certificates") connectionFlags.String("tls-cert", "", "Custom root CA certificate to use when making API requests") connectionFlags.String("proxy-url", "", "Proxy URL for making API requests through") + connectionFlags.Int("max-concurrency", 10, "Maximum number of concurrent API requests (defaults to max 10)") dbFlags = flag.NewFlagSet("db", flag.ContinueOnError) dbFlags.String("db-dialect", "mysql", "Database dialect. mysql, postgres and sqlserver are the only valid options.") @@ -161,6 +162,7 @@ func bindFlags() { util.Check(viper.BindPFlag("api.tls_skip_verify", connectionFlags.Lookup("tls-skip-verify")), "while binding flag") util.Check(viper.BindPFlag("api.tls_cert", connectionFlags.Lookup("tls-cert")), "while binding flag") util.Check(viper.BindPFlag("api.proxy_url", connectionFlags.Lookup("proxy-url")), "while binding flag") + util.Check(viper.BindPFlag("api.max_concurrency", connectionFlags.Lookup("max-concurrency")), "while binding flag") util.Check(viper.BindPFlag("db.dialect", dbFlags.Lookup("db-dialect")), "while binding flag") util.Check(viper.BindPFlag("db.connection_string", dbFlags.Lookup("db-connection-string")), "while binding flag") diff --git a/pkg/api/configuration_manager.go b/pkg/api/configuration_manager.go index 38aab01a..aace050e 100644 --- a/pkg/api/configuration_manager.go +++ b/pkg/api/configuration_manager.go @@ -17,11 +17,12 @@ import ( type ExporterConfiguration struct { AccessToken string `yaml:"access_token"` API struct { - ProxyURL string `yaml:"proxy_url"` - SheqsyURL string `yaml:"sheqsy_url"` - TLSCert string `yaml:"tls_cert"` - TLSSkipVerify bool `yaml:"tls_skip_verify"` - URL string `yaml:"url"` + ProxyURL string `yaml:"proxy_url"` + SheqsyURL string `yaml:"sheqsy_url"` + TLSCert string `yaml:"tls_cert"` + TLSSkipVerify bool `yaml:"tls_skip_verify"` + URL string `yaml:"url"` + MaxConcurrency int `yaml:"max_concurrency"` } `yaml:"api"` Csv struct { MaxRowsPerFile int `yaml:"max_rows_per_file"` @@ -249,6 +250,7 @@ func BuildConfigurationWithDefaults() *ExporterConfiguration { cfg := &ExporterConfiguration{} cfg.API.SheqsyURL = "https://app.sheqsy.com" cfg.API.URL = "https://api.safetyculture.io" + cfg.API.MaxConcurrency = 10 cfg.Csv.MaxRowsPerFile = 1000000 cfg.Db.Dialect = "mysql" cfg.Export.Tables = []string{} @@ -339,6 +341,7 @@ func (ec *ExporterConfiguration) ToExporterConfig() *feed.ExporterFeedCfg { ExportSiteIncludeFullHierarchy: ec.Export.Site.IncludeFullHierarchy, ExportIssueLimit: ec.Export.Issue.Limit, ExportAssetLimit: ec.Export.Asset.Limit, + MaxConcurrentGoRoutines: ec.API.MaxConcurrency, } } diff --git a/pkg/internal/feed/feed_exporter.go b/pkg/internal/feed/feed_exporter.go index 4c1e138d..5d5e95b5 100644 --- a/pkg/internal/feed/feed_exporter.go +++ b/pkg/internal/feed/feed_exporter.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/MickStanciu/go-fn/fn" "github.com/SafetyCulture/safetyculture-exporter/pkg/httpapi" "github.com/SafetyCulture/safetyculture-exporter/pkg/internal/events" "github.com/SafetyCulture/safetyculture-exporter/pkg/logger" @@ -59,6 +60,7 @@ type ExporterFeedCfg struct { ExportSiteIncludeFullHierarchy bool ExportIssueLimit int ExportAssetLimit int + MaxConcurrentGoRoutines int } func NewExporterApp(scApiClient *httpapi.Client, sheqsyApiClient *httpapi.Client, cfg *ExporterFeedCfg) *ExporterFeedClient { @@ -99,8 +101,12 @@ func (e *ExporterFeedClient) ExportFeeds(exporter Exporter, ctx context.Context) tablesMap[table] = true } + maxConcurrentRoutines := fn.GetOrElse(e.configuration.MaxConcurrentGoRoutines, maxConcurrentGoRoutines, func(i int) bool { + return i > 0 + }) + var wg sync.WaitGroup - semaphore := make(chan int, maxConcurrentGoRoutines) + semaphore := make(chan int, maxConcurrentRoutines) atLeastOneRun := false @@ -227,17 +233,7 @@ func (e *ExporterFeedClient) ExportFeeds(exporter Exporter, ctx context.Context) func (e *ExporterFeedClient) GetFeeds() []Feed { return []Feed{ e.getInspectionFeed(), - &InspectionItemFeed{ - SkipIDs: e.configuration.ExportInspectionSkipIds, - ModifiedAfter: e.configuration.ExportModifiedAfterTime, - TemplateIDs: e.configuration.ExportTemplateIds, - Archived: e.configuration.ExportInspectionArchived, - Completed: e.configuration.ExportInspectionCompleted, - IncludeInactive: e.configuration.ExportInspectionIncludedInactiveItems, - Incremental: e.configuration.ExportIncremental, - Limit: e.configuration.ExportInspectionLimit, - ExportMedia: e.configuration.ExportMedia, - }, + &UserFeed{}, &TemplateFeed{ Incremental: e.configuration.ExportIncremental, }, @@ -249,7 +245,6 @@ func (e *ExporterFeedClient) GetFeeds() []Feed { IncludeFullHierarchy: e.configuration.ExportSiteIncludeFullHierarchy, }, &SiteMemberFeed{}, - &UserFeed{}, &GroupFeed{}, &GroupUserFeed{}, &ScheduleFeed{ @@ -275,6 +270,17 @@ func (e *ExporterFeedClient) GetFeeds() []Feed { Incremental: e.configuration.ExportIncremental, Limit: e.configuration.ExportActionLimit, }, + &InspectionItemFeed{ + SkipIDs: e.configuration.ExportInspectionSkipIds, + ModifiedAfter: e.configuration.ExportModifiedAfterTime, + TemplateIDs: e.configuration.ExportTemplateIds, + Archived: e.configuration.ExportInspectionArchived, + Completed: e.configuration.ExportInspectionCompleted, + IncludeInactive: e.configuration.ExportInspectionIncludedInactiveItems, + Incremental: e.configuration.ExportIncremental, + Limit: e.configuration.ExportInspectionLimit, + ExportMedia: e.configuration.ExportMedia, + }, &IssueFeed{ Incremental: false, // this was disabled on request. Issues API doesn't support modified After filters Limit: e.configuration.ExportIssueLimit,