Skip to content

Commit

Permalink
Ostree http1 concurrency fix (#43)
Browse files Browse the repository at this point in the history
* makes standard pull options such as user agent reusable
adjusts http transport config to allow higher throughput
adds option to libostree to set concurrency limits
adds option to libostree to set httpheaders

* updates sync config to use protobuf duration type to allow for friendlier API

* Update internal/pkg/config/sync.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* updates sync config to use protobuf duration type to allow for friendlier API

* updates sync config to use protobuf duration type to allow for friendlier API

* updates sync config to use protobuf duration type to allow for friendlier API

* removes durationpb from config (still in api)

* linter fixes
gofumpt

* makes standard pull options such as user agent reusable
adjusts http transport config to allow higher throughput
adds option to libostree to set concurrency limits
adds option to libostree to set httpheaders

* linter fixes
gofumpt

---------

Co-authored-by: kishie <[email protected]>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 28, 2024
1 parent d987884 commit 8cef299
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 27 deletions.
2 changes: 1 addition & 1 deletion charts/beskar-ostree/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,4 @@ configData:

sync:
timeout: 3600s # 1 hour
max_worker_count: 10
max_worker_count: 100
3 changes: 2 additions & 1 deletion internal/pkg/beskar/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type pluginManager struct {

func newPluginManager(registry distribution.Namespace, logger *logrus.Entry) *pluginManager {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.IdleConnTimeout = 30 * time.Second
transport.IdleConnTimeout = 10 * time.Second
transport.MaxIdleConns = 0
transport.MaxIdleConnsPerHost = 16

reverseProxy := &httputil.ReverseProxy{
Expand Down
12 changes: 12 additions & 0 deletions internal/pkg/beskar/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ type Registry struct {
hashedHostname string
}

//nolint:gochecknoinits
func init() {
// Set http transport to avoid idle connections
// This propagates to all http clients created by the registry as they use http.DefaultTransport.
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 0
transport.MaxIdleConnsPerHost = 16
transport.IdleConnTimeout = 10 * time.Second

http.DefaultTransport = transport
}

func New(beskarConfig *config.BeskarConfig) (context.Context, *Registry, error) {
beskarRegistry := &Registry{
beskarConfig: beskarConfig,
Expand Down
16 changes: 5 additions & 11 deletions internal/pkg/config/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,24 @@ package config

import (
"time"

"google.golang.org/protobuf/types/known/durationpb"
)

const (
DefaultSyncTimeout = time.Hour
DefaultSyncMaxWorkerCount = 10
DefaultSyncMaxWorkerCount = 100
)

type SyncConfig struct {
Timeout *durationpb.Duration `yaml:"timeout"`
MaxWorkerCount int `yaml:"max_worker_count"`
Timeout time.Duration `yaml:"timeout"`
MaxWorkerCount int `yaml:"max_worker_count"`
}

func (sc *SyncConfig) GetTimeout() time.Duration {
if sc.Timeout == nil {
return DefaultSyncTimeout
}

if !sc.Timeout.IsValid() || sc.Timeout.GetSeconds() <= 0 {
if sc.Timeout <= 0 {
return DefaultSyncTimeout
}

return sc.Timeout.AsDuration()
return sc.Timeout
}

func (sc *SyncConfig) GetMaxWorkerCount() int {
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/pluginsrv/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ func getBeskarTransport(caPEM *mtls.CAPEM, beskarMeta *gossip.BeskarMeta) (http.
beskarAddr := net.JoinHostPort(beskarMeta.Hostname, strconv.Itoa(int(beskarMeta.RegistryPort)))

transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 0
transport.MaxIdleConnsPerHost = 16
transport.IdleConnTimeout = 10 * time.Second

transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
if addr == beskarAddr {
Expand Down
3 changes: 2 additions & 1 deletion internal/plugins/ostree/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/RussellLuo/kun/pkg/werror"
"github.com/RussellLuo/kun/pkg/werror/gcode"
"google.golang.org/protobuf/types/known/durationpb"

apiv1 "go.ciq.dev/beskar/pkg/plugins/ostree/api/v1"
)
Expand Down Expand Up @@ -73,7 +74,7 @@ func (p *Plugin) SyncRepository(ctx context.Context, repository string, properti
}

if properties.Timeout == nil || !properties.Timeout.IsValid() || properties.Timeout.GetSeconds() <= 0 {
properties.Timeout = p.beskarOSTreeConfig.Sync.Timeout
properties.Timeout = durationpb.New(p.handlerParams.Sync.GetTimeout())
}

return p.repositoryManager.Get(ctx, repository).SyncRepository(ctx, properties)
Expand Down
4 changes: 4 additions & 0 deletions internal/plugins/ostree/pkg/libostree/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func gVariantBuilderAddVariant(builder *C.GVariantBuilder, key *C.gchar, variant
C.g_variant_builder_add_variant(builder, key, variant)
}

func gVariantBuilderAddStringTuple(builder *C.GVariantBuilder, key, value *C.gchar) {
C.g_variant_builder_add_string_tuple(builder, key, value)
}

// NoGPGVerify sets the gpg-verify option to false in the pull options.
func NoGPGVerify() Option {
return func(builder *C.GVariantBuilder, deferFree deferredFreeFn) {
Expand Down
9 changes: 9 additions & 0 deletions internal/plugins/ostree/pkg/libostree/options.go.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@ g_variant_builder_add_variant(
) {
g_variant_builder_add(builder, "{s@v}", key, value);
}

void
g_variant_builder_add_string_tuple(
GVariantBuilder *builder,
const gchar *key,
const char *value
) {
g_variant_builder_add(builder, "(ss)", key, value);
}
48 changes: 48 additions & 0 deletions internal/plugins/ostree/pkg/libostree/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,51 @@ func NetworkRetries(n int) Option {
)
}
}

// MaxOutstandingFetcherRequests sets the max-outstanding-fetcher-requests option to the given value in the pull options.
// The max amount of concurrent connections allowed.
func MaxOutstandingFetcherRequests(n uint32) Option {
return func(builder *C.GVariantBuilder, deferFree deferredFreeFn) {
key := C.CString("max-outstanding-fetcher-requests")
deferFree(unsafe.Pointer(key))
gVariantBuilderAddVariant(
builder,
key,
C.g_variant_new_variant(C.g_variant_new_uint32(C.guint32(n))),
)
}
}

// HTTPHeaders sets the http-headers option to the given value in the pull options.
// Additional HTTP headers to send with requests.
func HTTPHeaders(headers map[string]string) Option {
return func(builder *C.GVariantBuilder, deferFree deferredFreeFn) {
// Array of string tuples
typeStr := C.CString("a(ss)")
defer C.free(unsafe.Pointer(typeStr))
variantType := C.g_variant_type_new(typeStr)

// NOTE THE USE OF A NESTED BUILDER HERE - BE CAREFUL!
// The builder is freed by g_variant_builder_end below.
// See https://docs.gtk.org/glib/method.VariantBuilder.init.html
var hdrBuilder C.GVariantBuilder
C.g_variant_builder_init(&hdrBuilder, variantType)

// Add headers to hdrBuilder (not builder)
for key, value := range headers {
gVariantBuilderAddStringTuple(
&hdrBuilder,
C.CString(key),
C.CString(value),
)
}

key := C.CString("http-headers")
deferFree(unsafe.Pointer(key))
gVariantBuilderAddVariant(
builder,
key,
C.g_variant_new_variant(C.g_variant_builder_end(&hdrBuilder)),
)
}
}
7 changes: 2 additions & 5 deletions internal/plugins/ostree/pkg/ostreerepository/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *Handler) DeleteRepository(ctx context.Context) (err error) {
// Create a worker pool to deleting each file in the repository concurrently.
// ctx will be cancelled on error, and the error will be returned.
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(100)
eg.SetLimit(h.Params.Sync.GetMaxWorkerCount())

// Walk the directory tree, skipping directories and deleting each file.
if err := filepath.WalkDir(h.repoDir, func(path string, d os.DirEntry, err error) error {
Expand Down Expand Up @@ -278,10 +278,7 @@ func (h *Handler) SyncRepository(_ context.Context, properties *apiv1.OSTreeRepo

err = h.BeginLocalRepoTransaction(ctx, func(ctx context.Context, repo *libostree.Repo) (commit bool, transactionFnErr error) {
// Pull the latest changes from the remote.
opts := []libostree.Option{
libostree.Depth(properties.Depth),
libostree.Flags(libostree.Mirror | libostree.TrustedHTTP),
}
opts := h.standardPullOptions(libostree.Depth(properties.Depth))
if len(properties.Refs) > 0 {
opts = append(opts, libostree.Refs(properties.Refs...))
}
Expand Down
16 changes: 14 additions & 2 deletions internal/plugins/ostree/pkg/ostreerepository/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ func (h *Handler) BeginLocalRepoTransaction(ctx context.Context, tFn Transaction
if err := repo.Pull(
ctx,
beskarRemoteName,
libostree.NoGPGVerify(),
libostree.Flags(libostree.Mirror|libostree.TrustedHTTP),
h.standardPullOptions(
libostree.HTTPHeaders(map[string]string{
"Connection": "close",
}),
)...,
); err != nil {
return ctl.Errf("pulling ostree repository from %s: %s", beskarRemoteName, err)
}
Expand Down Expand Up @@ -142,3 +145,12 @@ func (h *Handler) BeginLocalRepoTransaction(ctx context.Context, tFn Transaction

return nil
}

func (h *Handler) standardPullOptions(more ...libostree.Option) []libostree.Option {
return append([]libostree.Option{
libostree.NoGPGVerify(),
libostree.Flags(libostree.Mirror | libostree.TrustedHTTP),
libostree.MaxOutstandingFetcherRequests(uint32(h.Params.Sync.GetMaxWorkerCount())),
libostree.AppendUserAgent("beskar-ostree"), // TODO: Inject version here
}, more...)
}
11 changes: 5 additions & 6 deletions internal/plugins/ostree/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ var routerRego []byte
var routerData []byte

type Plugin struct {
ctx context.Context
config pluginsrv.Config
beskarOSTreeConfig *config.BeskarOSTreeConfig
ctx context.Context
config pluginsrv.Config

repositoryManager *repository.Manager[*ostreerepository.Handler]
handlerParams *repository.HandlerParams
Expand Down Expand Up @@ -70,7 +69,8 @@ func New(ctx context.Context, beskarOSTreeConfig *config.BeskarOSTreeConfig) (*P
}

params := &repository.HandlerParams{
Dir: filepath.Join(beskarOSTreeConfig.DataDir, "_repohandlers_"),
Dir: filepath.Join(beskarOSTreeConfig.DataDir, "_repohandlers_"),
Sync: beskarOSTreeConfig.Sync,
}

return &Plugin{
Expand All @@ -89,8 +89,7 @@ func New(ctx context.Context, beskarOSTreeConfig *config.BeskarOSTreeConfig) (*P
},
},
},
beskarOSTreeConfig: beskarOSTreeConfig,
handlerParams: params,
handlerParams: params,
repositoryManager: repository.NewManager[*ostreerepository.Handler](
params,
ostreerepository.NewHandler,
Expand Down

0 comments on commit 8cef299

Please sign in to comment.