diff --git a/charts/beskar-ostree/values.yaml b/charts/beskar-ostree/values.yaml index 9c19d10..d5eaca7 100644 --- a/charts/beskar-ostree/values.yaml +++ b/charts/beskar-ostree/values.yaml @@ -131,4 +131,4 @@ configData: sync: timeout: 3600s # 1 hour - max_worker_count: 10 \ No newline at end of file + max_worker_count: 100 diff --git a/internal/pkg/beskar/plugin.go b/internal/pkg/beskar/plugin.go index 1d8bb9a..115d189 100644 --- a/internal/pkg/beskar/plugin.go +++ b/internal/pkg/beskar/plugin.go @@ -60,7 +60,8 @@ type pluginManager struct { func newPluginManager(registry distribution.Namespace, logger *logrus.Entry) *pluginManager { transport := http.DefaultTransport.(*http.Transport).Clone() - transport.IdleConnTimeout = 30 * time.Second + transport.IdleConnTimeout = 10 * time.Second + transport.MaxIdleConns = 0 transport.MaxIdleConnsPerHost = 16 reverseProxy := &httputil.ReverseProxy{ diff --git a/internal/pkg/beskar/registry.go b/internal/pkg/beskar/registry.go index b83a5f6..469b044 100644 --- a/internal/pkg/beskar/registry.go +++ b/internal/pkg/beskar/registry.go @@ -69,6 +69,18 @@ type Registry struct { hashedHostname string } +//nolint:gochecknoinits +func init() { + // Set http transport to avoid idle connections + // This propagates to all http clients created by the registry as they use http.DefaultTransport. + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 0 + transport.MaxIdleConnsPerHost = 16 + transport.IdleConnTimeout = 10 * time.Second + + http.DefaultTransport = transport +} + func New(beskarConfig *config.BeskarConfig) (context.Context, *Registry, error) { beskarRegistry := &Registry{ beskarConfig: beskarConfig, diff --git a/internal/pkg/config/sync.go b/internal/pkg/config/sync.go index ecc5e4c..a27aa39 100644 --- a/internal/pkg/config/sync.go +++ b/internal/pkg/config/sync.go @@ -5,30 +5,24 @@ package config import ( "time" - - "google.golang.org/protobuf/types/known/durationpb" ) const ( DefaultSyncTimeout = time.Hour - DefaultSyncMaxWorkerCount = 10 + DefaultSyncMaxWorkerCount = 100 ) type SyncConfig struct { - Timeout *durationpb.Duration `yaml:"timeout"` - MaxWorkerCount int `yaml:"max_worker_count"` + Timeout time.Duration `yaml:"timeout"` + MaxWorkerCount int `yaml:"max_worker_count"` } func (sc *SyncConfig) GetTimeout() time.Duration { - if sc.Timeout == nil { - return DefaultSyncTimeout - } - - if !sc.Timeout.IsValid() || sc.Timeout.GetSeconds() <= 0 { + if sc.Timeout <= 0 { return DefaultSyncTimeout } - return sc.Timeout.AsDuration() + return sc.Timeout } func (sc *SyncConfig) GetMaxWorkerCount() int { diff --git a/internal/pkg/pluginsrv/service.go b/internal/pkg/pluginsrv/service.go index d11e255..073ba82 100644 --- a/internal/pkg/pluginsrv/service.go +++ b/internal/pkg/pluginsrv/service.go @@ -300,6 +300,9 @@ func getBeskarTransport(caPEM *mtls.CAPEM, beskarMeta *gossip.BeskarMeta) (http. beskarAddr := net.JoinHostPort(beskarMeta.Hostname, strconv.Itoa(int(beskarMeta.RegistryPort))) transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 0 + transport.MaxIdleConnsPerHost = 16 + transport.IdleConnTimeout = 10 * time.Second transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { if addr == beskarAddr { diff --git a/internal/plugins/ostree/api.go b/internal/plugins/ostree/api.go index bb4e6f7..6fd64d2 100644 --- a/internal/plugins/ostree/api.go +++ b/internal/plugins/ostree/api.go @@ -8,6 +8,7 @@ import ( "github.com/RussellLuo/kun/pkg/werror" "github.com/RussellLuo/kun/pkg/werror/gcode" + "google.golang.org/protobuf/types/known/durationpb" apiv1 "go.ciq.dev/beskar/pkg/plugins/ostree/api/v1" ) @@ -73,7 +74,7 @@ func (p *Plugin) SyncRepository(ctx context.Context, repository string, properti } if properties.Timeout == nil || !properties.Timeout.IsValid() || properties.Timeout.GetSeconds() <= 0 { - properties.Timeout = p.beskarOSTreeConfig.Sync.Timeout + properties.Timeout = durationpb.New(p.handlerParams.Sync.GetTimeout()) } return p.repositoryManager.Get(ctx, repository).SyncRepository(ctx, properties) diff --git a/internal/plugins/ostree/pkg/libostree/options.go b/internal/plugins/ostree/pkg/libostree/options.go index 90f0735..e9dbdde 100644 --- a/internal/plugins/ostree/pkg/libostree/options.go +++ b/internal/plugins/ostree/pkg/libostree/options.go @@ -51,6 +51,10 @@ func gVariantBuilderAddVariant(builder *C.GVariantBuilder, key *C.gchar, variant C.g_variant_builder_add_variant(builder, key, variant) } +func gVariantBuilderAddStringTuple(builder *C.GVariantBuilder, key, value *C.gchar) { + C.g_variant_builder_add_string_tuple(builder, key, value) +} + // NoGPGVerify sets the gpg-verify option to false in the pull options. func NoGPGVerify() Option { return func(builder *C.GVariantBuilder, deferFree deferredFreeFn) { diff --git a/internal/plugins/ostree/pkg/libostree/options.go.h b/internal/plugins/ostree/pkg/libostree/options.go.h index 20b89d9..f1f4a98 100644 --- a/internal/plugins/ostree/pkg/libostree/options.go.h +++ b/internal/plugins/ostree/pkg/libostree/options.go.h @@ -12,3 +12,12 @@ g_variant_builder_add_variant( ) { g_variant_builder_add(builder, "{s@v}", key, value); } + +void +g_variant_builder_add_string_tuple( + GVariantBuilder *builder, + const gchar *key, + const char *value +) { + g_variant_builder_add(builder, "(ss)", key, value); +} diff --git a/internal/plugins/ostree/pkg/libostree/pull.go b/internal/plugins/ostree/pkg/libostree/pull.go index bb05bf3..ee471ee 100644 --- a/internal/plugins/ostree/pkg/libostree/pull.go +++ b/internal/plugins/ostree/pkg/libostree/pull.go @@ -211,3 +211,51 @@ func NetworkRetries(n int) Option { ) } } + +// MaxOutstandingFetcherRequests sets the max-outstanding-fetcher-requests option to the given value in the pull options. +// The max amount of concurrent connections allowed. +func MaxOutstandingFetcherRequests(n uint32) Option { + return func(builder *C.GVariantBuilder, deferFree deferredFreeFn) { + key := C.CString("max-outstanding-fetcher-requests") + deferFree(unsafe.Pointer(key)) + gVariantBuilderAddVariant( + builder, + key, + C.g_variant_new_variant(C.g_variant_new_uint32(C.guint32(n))), + ) + } +} + +// HTTPHeaders sets the http-headers option to the given value in the pull options. +// Additional HTTP headers to send with requests. +func HTTPHeaders(headers map[string]string) Option { + return func(builder *C.GVariantBuilder, deferFree deferredFreeFn) { + // Array of string tuples + typeStr := C.CString("a(ss)") + defer C.free(unsafe.Pointer(typeStr)) + variantType := C.g_variant_type_new(typeStr) + + // NOTE THE USE OF A NESTED BUILDER HERE - BE CAREFUL! + // The builder is freed by g_variant_builder_end below. + // See https://docs.gtk.org/glib/method.VariantBuilder.init.html + var hdrBuilder C.GVariantBuilder + C.g_variant_builder_init(&hdrBuilder, variantType) + + // Add headers to hdrBuilder (not builder) + for key, value := range headers { + gVariantBuilderAddStringTuple( + &hdrBuilder, + C.CString(key), + C.CString(value), + ) + } + + key := C.CString("http-headers") + deferFree(unsafe.Pointer(key)) + gVariantBuilderAddVariant( + builder, + key, + C.g_variant_new_variant(C.g_variant_builder_end(&hdrBuilder)), + ) + } +} diff --git a/internal/plugins/ostree/pkg/ostreerepository/api.go b/internal/plugins/ostree/pkg/ostreerepository/api.go index 7dddda3..6eaa3b6 100644 --- a/internal/plugins/ostree/pkg/ostreerepository/api.go +++ b/internal/plugins/ostree/pkg/ostreerepository/api.go @@ -86,7 +86,7 @@ func (h *Handler) DeleteRepository(ctx context.Context) (err error) { // Create a worker pool to deleting each file in the repository concurrently. // ctx will be cancelled on error, and the error will be returned. eg, ctx := errgroup.WithContext(ctx) - eg.SetLimit(100) + eg.SetLimit(h.Params.Sync.GetMaxWorkerCount()) // Walk the directory tree, skipping directories and deleting each file. if err := filepath.WalkDir(h.repoDir, func(path string, d os.DirEntry, err error) error { @@ -278,10 +278,7 @@ func (h *Handler) SyncRepository(_ context.Context, properties *apiv1.OSTreeRepo err = h.BeginLocalRepoTransaction(ctx, func(ctx context.Context, repo *libostree.Repo) (commit bool, transactionFnErr error) { // Pull the latest changes from the remote. - opts := []libostree.Option{ - libostree.Depth(properties.Depth), - libostree.Flags(libostree.Mirror | libostree.TrustedHTTP), - } + opts := h.standardPullOptions(libostree.Depth(properties.Depth)) if len(properties.Refs) > 0 { opts = append(opts, libostree.Refs(properties.Refs...)) } diff --git a/internal/plugins/ostree/pkg/ostreerepository/local.go b/internal/plugins/ostree/pkg/ostreerepository/local.go index c098b40..549ca3e 100644 --- a/internal/plugins/ostree/pkg/ostreerepository/local.go +++ b/internal/plugins/ostree/pkg/ostreerepository/local.go @@ -104,8 +104,11 @@ func (h *Handler) BeginLocalRepoTransaction(ctx context.Context, tFn Transaction if err := repo.Pull( ctx, beskarRemoteName, - libostree.NoGPGVerify(), - libostree.Flags(libostree.Mirror|libostree.TrustedHTTP), + h.standardPullOptions( + libostree.HTTPHeaders(map[string]string{ + "Connection": "close", + }), + )..., ); err != nil { return ctl.Errf("pulling ostree repository from %s: %s", beskarRemoteName, err) } @@ -142,3 +145,12 @@ func (h *Handler) BeginLocalRepoTransaction(ctx context.Context, tFn Transaction return nil } + +func (h *Handler) standardPullOptions(more ...libostree.Option) []libostree.Option { + return append([]libostree.Option{ + libostree.NoGPGVerify(), + libostree.Flags(libostree.Mirror | libostree.TrustedHTTP), + libostree.MaxOutstandingFetcherRequests(uint32(h.Params.Sync.GetMaxWorkerCount())), + libostree.AppendUserAgent("beskar-ostree"), // TODO: Inject version here + }, more...) +} diff --git a/internal/plugins/ostree/plugin.go b/internal/plugins/ostree/plugin.go index 1904ed4..1be0e88 100644 --- a/internal/plugins/ostree/plugin.go +++ b/internal/plugins/ostree/plugin.go @@ -40,9 +40,8 @@ var routerRego []byte var routerData []byte type Plugin struct { - ctx context.Context - config pluginsrv.Config - beskarOSTreeConfig *config.BeskarOSTreeConfig + ctx context.Context + config pluginsrv.Config repositoryManager *repository.Manager[*ostreerepository.Handler] handlerParams *repository.HandlerParams @@ -70,7 +69,8 @@ func New(ctx context.Context, beskarOSTreeConfig *config.BeskarOSTreeConfig) (*P } params := &repository.HandlerParams{ - Dir: filepath.Join(beskarOSTreeConfig.DataDir, "_repohandlers_"), + Dir: filepath.Join(beskarOSTreeConfig.DataDir, "_repohandlers_"), + Sync: beskarOSTreeConfig.Sync, } return &Plugin{ @@ -89,8 +89,7 @@ func New(ctx context.Context, beskarOSTreeConfig *config.BeskarOSTreeConfig) (*P }, }, }, - beskarOSTreeConfig: beskarOSTreeConfig, - handlerParams: params, + handlerParams: params, repositoryManager: repository.NewManager[*ostreerepository.Handler]( params, ostreerepository.NewHandler,