From a634609fb1350b6332edd363b7de3b3e7b09821e Mon Sep 17 00:00:00 2001 From: Pontus Freyhult Date: Tue, 14 Jan 2025 09:26:07 +0100 Subject: [PATCH] New setting for chunk size when getting data, redid passing to HTTPReader to support that --- cmd/sdafs/main.go | 23 +++++++- cmd/sdafs/main_test.go | 31 ++++++++++- internal/httpreader/httpreader.go | 75 +++++++++++++------------- internal/httpreader/httpreader_test.go | 71 +++++++++++++----------- internal/sdafs/sdafs.go | 12 ++++- 5 files changed, 136 insertions(+), 76 deletions(-) diff --git a/cmd/sdafs/main.go b/cmd/sdafs/main.go index a0a0b9d..3ec3632 100644 --- a/cmd/sdafs/main.go +++ b/cmd/sdafs/main.go @@ -15,7 +15,8 @@ import ( var credentialsFile, rootURL, logFile string var foreground, open bool -var maxRetries int +var maxRetries uint +var chunkSize uint var Version string = "development" @@ -49,9 +50,12 @@ func getConfigs() mainConfig { flag.StringVar(&logFile, "log", "", "File to send logs to instead of stderr,"+ " defaults to sdafs.log if detached, empty string means stderr which is default for foreground") - flag.IntVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. Retries will be done with some form of backoff") + flag.UintVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. "+ + "Retries will be done with some form of backoff. Max 60") flag.BoolVar(&foreground, "foreground", false, "Do not detach, run in foreground and send log output to stdout") flag.BoolVar(&open, "open", false, "Set permissions allowing access by others than the user") + flag.UintVar(&chunkSize, "chunksize", 5120, "Chunk size (in kb) used when fetching data. "+ + "Higher values likely to give better throughput but higher latency. Min 64 Max 16384.") flag.Parse() @@ -60,6 +64,19 @@ func getConfigs() mainConfig { usage() } + // Some sanity checks + if chunkSize > 16384 || chunkSize < 64 { + fmt.Printf("Chunk size %d is not allowed, valid values are 64 to 16384\n\n", + chunkSize) + usage() + } + + if maxRetries > 60 { + fmt.Printf("Max retries %d is not allowed, valid values are 0 to 60\n\n ", + maxRetries) + usage() + } + useLogFile := logFile // Background and logfile not specified if !foreground && logFile == "" { @@ -71,6 +88,8 @@ func getConfigs() mainConfig { RootURL: rootURL, CredentialsFile: credentialsFile, SkipLevels: 0, + ChunkSize: int(chunkSize), + MaxRetries: int(maxRetries), } if open { diff --git a/cmd/sdafs/main_test.go b/cmd/sdafs/main_test.go index a9cbf52..41dd353 100644 --- a/cmd/sdafs/main_test.go +++ b/cmd/sdafs/main_test.go @@ -11,8 +11,10 @@ import ( "github.com/tj/assert" ) +// The handling of checking things that should exit is based on +// https://stackoverflow.com/a/33404435 + func TestConfOptionNoMountPoint(t *testing.T) { - // https://stackoverflow.com/a/33404435 if os.Getenv("BE_CRASHER") == "1" { getConfigs() return @@ -21,6 +23,30 @@ func TestConfOptionNoMountPoint(t *testing.T) { runExiting(t, "TestConfOptionNoMountPoint") } +func TestBadChunkSize(t *testing.T) { + if os.Getenv("BE_CRASHER") == "1" { + os.Args = []string{"binary", "-chunksize", "30", "mountpoint"} + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + flag.Parse() + getConfigs() + return + } + + runExiting(t, "TestBadChunkSize") +} + +func TestBadMaxRetries(t *testing.T) { + if os.Getenv("BE_CRASHER") == "1" { + os.Args = []string{"binary", "-maxretries", "90", "mountpoint"} + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + flag.Parse() + getConfigs() + return + } + + runExiting(t, "TestBadMaxRetries") +} + func TestConfOptions(t *testing.T) { safeArgs := os.Args @@ -134,7 +160,8 @@ func runExiting(t *testing.T, testName string) { return } - t.Fatalf("process when it should not for test %s %v, want exit status 1", + t.Fatalf("process succeeded when it should not for test %s"+ + " %v, want exit status 1", testName, err) diff --git a/internal/httpreader/httpreader.go b/internal/httpreader/httpreader.go index 63a4f3b..8146ce2 100644 --- a/internal/httpreader/httpreader.go +++ b/internal/httpreader/httpreader.go @@ -13,6 +13,19 @@ import ( "time" ) +type Conf struct { + Token string + Client *http.Client + Headers *http.Header + MaxRetries int + ChunkSize int +} + +type Request struct { + FileURL string + ObjectSize uint64 +} + // TODO: Make something better that does TTLing var cache map[string][]CacheBlock @@ -54,38 +67,22 @@ func (r *HTTPReader) initCache() { // HTTPReader is the vehicle to keep track of needed state for the reader type HTTPReader struct { - client *http.Client + conf *Conf currentOffset uint64 - - fileURL string - objectSize uint64 - lock sync.Mutex - token string - extraHeaders *http.Header - - maxRetries int + fileURL string + objectSize uint64 + lock sync.Mutex } -func NewHTTPReader(fileURL, token string, - objectSize uint64, - client *http.Client, - headers *http.Header, - maxRetries int) (*HTTPReader, error) { - - if maxRetries == 0 { - maxRetries = 6 - } +func NewHTTPReader(conf *Conf, request *Request, +) (*HTTPReader, error) { - log.Printf("Creating reader for %v, object size %v", fileURL, objectSize) + log.Printf("Creating reader for %v, object size %v", request.FileURL, request.ObjectSize) reader := &HTTPReader{ - client: client, - currentOffset: 0, - fileURL: fileURL, - objectSize: objectSize, - lock: sync.Mutex{}, - token: token, - extraHeaders: headers, - maxRetries: maxRetries, + conf: conf, + fileURL: request.FileURL, + objectSize: request.ObjectSize, + lock: sync.Mutex{}, } reader.initCache() @@ -147,19 +144,19 @@ func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) { r.fileURL, err) } - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token)) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token)) if rangeSpec != "" { req.Header.Add("Range", rangeSpec) } - if r.extraHeaders != nil { - for h := range *r.extraHeaders { - req.Header.Add(h, r.extraHeaders.Get(h)) + if r.conf.Headers != nil { + for h := range *r.conf.Headers { + req.Header.Add(h, r.conf.Headers.Get(h)) } } - resp, err := r.client.Do(req) + resp, err := r.conf.Client.Do(req) if err != nil { return nil, fmt.Errorf( "Error while making request for %s: %v", @@ -244,7 +241,7 @@ func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) { waitBefore = 2 * waitBefore } - if waitBefore < time.Duration(math.Pow(2, float64(r.maxRetries)))*time.Second { + if waitBefore < time.Duration(math.Pow(2, float64(r.conf.MaxRetries)))*time.Second { r.prefetchAt(waitBefore, offset) } @@ -387,14 +384,14 @@ func (r *HTTPReader) doRequest() (*http.Response, error) { r.fileURL, err) } - if r.extraHeaders != nil { - for h := range *r.extraHeaders { - req.Header.Add(h, r.extraHeaders.Get(h)) + if r.conf.Headers != nil { + for h := range *r.conf.Headers { + req.Header.Add(h, r.conf.Headers.Get(h)) } } - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token)) - return r.client.Do(req) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token)) + return r.conf.Client.Do(req) } func (r *HTTPReader) Read(dst []byte) (n int, err error) { @@ -450,7 +447,7 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) { var data []byte var wait time.Duration = 1 - for tries := 0; tries <= r.maxRetries; tries++ { + for tries := 0; tries <= r.conf.MaxRetries; tries++ { r.addToOutstanding(start) data, err = r.doFetch(wantedRange) r.removeFromOutstanding(start) diff --git a/internal/httpreader/httpreader_test.go b/internal/httpreader/httpreader_test.go index bf8bbcd..2f70c50 100644 --- a/internal/httpreader/httpreader_test.go +++ b/internal/httpreader/httpreader_test.go @@ -86,6 +86,15 @@ func clientPublicKeyHeader() *http.Header { return &h } +func useConf() *Conf { + c := Conf{ + Client: http.DefaultClient, + Headers: clientPublicKeyHeader(), + MaxRetries: 7, + Token: "token"} + + return &c +} func TestHTTPReader(t *testing.T) { httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -94,11 +103,10 @@ func TestHTTPReader(t *testing.T) { httpmock.RegisterResponder("GET", url, testDataResponder) - reader, err := NewHTTPReader(url, "", - 14000, - http.DefaultClient, - clientPublicKeyHeader(), - 0) + reader, err := NewHTTPReader( + useConf(), + &Request{FileURL: url, + ObjectSize: 14000}) assert.Nil(t, err, "Backend failed") if reader == nil { @@ -200,12 +208,12 @@ func TestHTTPReaderPrefetches(t *testing.T) { testDataResponder) var readBackBuffer [4096]byte - seeker, err := NewHTTPReader(url, - "token", - 14000, - http.DefaultClient, - clientPublicKeyHeader(), - 0) + + seeker, err := NewHTTPReader( + useConf(), + &Request{FileURL: url, + ObjectSize: 14000}) + assert.Nil(t, err, "New reader failed unexpectedly") _, err = seeker.Read(readBackBuffer[0:4096]) @@ -215,12 +223,11 @@ func TestHTTPReaderPrefetches(t *testing.T) { err = seeker.Close() assert.Nil(t, err, "unexpected error when closing") - reader, err := NewHTTPReader(url, - "token", - 14000, - http.DefaultClient, - clientPublicKeyHeader(), - 0) + reader, err := NewHTTPReader( + useConf(), + &Request{FileURL: url, + ObjectSize: 14000}) + assert.Nil(t, err, "unexpected error when creating reader") assert.NotNil(t, reader, "unexpected error when creating reader") @@ -266,13 +273,14 @@ func TestHTTPReaderFailures(t *testing.T) { var readBackBuffer [4096]byte + c := useConf() + c.MaxRetries = 2 // test first where the - failreader, err := NewHTTPReader(failurl, - "token", - 14000, - http.DefaultClient, - clientPublicKeyHeader(), - 2) + failreader, err := NewHTTPReader( + c, + &Request{FileURL: failurl, + ObjectSize: 14000}) + // We shouldn't see failures yet assert.Nil(t, err, "unexpected error when creating reader") assert.NotNil(t, failreader, "unexpected error when creating reader") @@ -297,12 +305,11 @@ func TestHTTPReaderFailures(t *testing.T) { failCounter = 0 failFirst = 10 - serverFailReader, err := NewHTTPReader(failurl+"2", - "token", - 14000, - http.DefaultClient, - clientPublicKeyHeader(), - 2) + serverFailReader, err := NewHTTPReader( + c, + &Request{FileURL: failurl + "2", + ObjectSize: 14000}) + // We shouldn't see failures yet assert.Nil(t, err, "unexpected error when creating reader") assert.NotNil(t, serverFailReader, "unexpected error when creating reader") @@ -358,10 +365,10 @@ func TestDoRequest(t *testing.T) { httpmock.RegisterResponder("GET", checkurl, testDoRequestResponder) + c := useConf() r := HTTPReader{ + conf: c, fileURL: checkurl, - token: "token", - client: http.DefaultClient, } resp, err := r.doRequest() @@ -373,7 +380,7 @@ func TestDoRequest(t *testing.T) { assert.Nil(t, err, "Unexpected error from doRequest") h := http.Header{} - r.extraHeaders = &h + r.conf.Headers = &h h.Add("HeaderA", "SomeGoose") resp, err = r.doRequest() diff --git a/internal/sdafs/sdafs.go b/internal/sdafs/sdafs.go index 4f34a38..d700e72 100644 --- a/internal/sdafs/sdafs.go +++ b/internal/sdafs/sdafs.go @@ -70,6 +70,7 @@ type Conf struct { DirPerms os.FileMode FilePerms os.FileMode HTTPClient *http.Client + ChunkSize int } // inode is the struct to manage a directory entry @@ -740,7 +741,16 @@ func (s *SDAfs) OpenFile( return fuse.EINVAL } - r, err := httpreader.NewHTTPReader(s.getFileURL(in), s.token, s.getTotalSize(in), s.client, &s.keyHeader, s.conf.MaxRetries) + conf := httpreader.Conf{ + Token: s.token, + Client: s.client, + Headers: &s.keyHeader, + MaxRetries: s.conf.MaxRetries, + ChunkSize: s.conf.ChunkSize, + } + r, err := httpreader.NewHTTPReader(&conf, + &httpreader.Request{FileURL: s.getFileURL(in), + ObjectSize: s.getTotalSize(in)}) if err != nil { log.Printf("OpenFile failed: reader for %s gave: %v", in.key, err) return fuse.EIO