Skip to content

Commit

Permalink
Merge pull request #17 from NBISweden/chunksize
Browse files Browse the repository at this point in the history
New setting for chunk size when getting data
  • Loading branch information
pontus authored Jan 14, 2025
2 parents 693b330 + a634609 commit 8b79f3e
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 76 deletions.
23 changes: 21 additions & 2 deletions cmd/sdafs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (

var credentialsFile, rootURL, logFile string
var foreground, open bool
var maxRetries int
var maxRetries uint
var chunkSize uint

var Version string = "development"

Expand Down Expand Up @@ -49,9 +50,12 @@ func getConfigs() mainConfig {
flag.StringVar(&logFile, "log", "", "File to send logs to instead of stderr,"+
" defaults to sdafs.log if detached, empty string means stderr which is default for foreground")

flag.IntVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. Retries will be done with some form of backoff")
flag.UintVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. "+
"Retries will be done with some form of backoff. Max 60")
flag.BoolVar(&foreground, "foreground", false, "Do not detach, run in foreground and send log output to stdout")
flag.BoolVar(&open, "open", false, "Set permissions allowing access by others than the user")
flag.UintVar(&chunkSize, "chunksize", 5120, "Chunk size (in kb) used when fetching data. "+
"Higher values likely to give better throughput but higher latency. Min 64 Max 16384.")

flag.Parse()

Expand All @@ -60,6 +64,19 @@ func getConfigs() mainConfig {
usage()
}

// Some sanity checks
if chunkSize > 16384 || chunkSize < 64 {
fmt.Printf("Chunk size %d is not allowed, valid values are 64 to 16384\n\n",
chunkSize)
usage()
}

if maxRetries > 60 {
fmt.Printf("Max retries %d is not allowed, valid values are 0 to 60\n\n ",
maxRetries)
usage()
}

useLogFile := logFile
// Background and logfile not specified
if !foreground && logFile == "" {
Expand All @@ -71,6 +88,8 @@ func getConfigs() mainConfig {
RootURL: rootURL,
CredentialsFile: credentialsFile,
SkipLevels: 0,
ChunkSize: int(chunkSize),
MaxRetries: int(maxRetries),
}

if open {
Expand Down
31 changes: 29 additions & 2 deletions cmd/sdafs/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/tj/assert"
)

// The handling of checking things that should exit is based on
// https://stackoverflow.com/a/33404435

func TestConfOptionNoMountPoint(t *testing.T) {
// https://stackoverflow.com/a/33404435
if os.Getenv("BE_CRASHER") == "1" {
getConfigs()
return
Expand All @@ -21,6 +23,30 @@ func TestConfOptionNoMountPoint(t *testing.T) {
runExiting(t, "TestConfOptionNoMountPoint")
}

func TestBadChunkSize(t *testing.T) {
if os.Getenv("BE_CRASHER") == "1" {
os.Args = []string{"binary", "-chunksize", "30", "mountpoint"}
flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError)
flag.Parse()
getConfigs()
return
}

runExiting(t, "TestBadChunkSize")
}

func TestBadMaxRetries(t *testing.T) {
if os.Getenv("BE_CRASHER") == "1" {
os.Args = []string{"binary", "-maxretries", "90", "mountpoint"}
flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError)
flag.Parse()
getConfigs()
return
}

runExiting(t, "TestBadMaxRetries")
}

func TestConfOptions(t *testing.T) {
safeArgs := os.Args

Expand Down Expand Up @@ -134,7 +160,8 @@ func runExiting(t *testing.T, testName string) {
return
}

t.Fatalf("process when it should not for test %s %v, want exit status 1",
t.Fatalf("process succeeded when it should not for test %s"+
" %v, want exit status 1",
testName,
err)

Expand Down
75 changes: 36 additions & 39 deletions internal/httpreader/httpreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ import (
"time"
)

type Conf struct {
Token string
Client *http.Client
Headers *http.Header
MaxRetries int
ChunkSize int
}

type Request struct {
FileURL string
ObjectSize uint64
}

// TODO: Make something better that does TTLing

var cache map[string][]CacheBlock
Expand Down Expand Up @@ -54,38 +67,22 @@ func (r *HTTPReader) initCache() {

// HTTPReader is the vehicle to keep track of needed state for the reader
type HTTPReader struct {
client *http.Client
conf *Conf
currentOffset uint64

fileURL string
objectSize uint64
lock sync.Mutex
token string
extraHeaders *http.Header

maxRetries int
fileURL string
objectSize uint64
lock sync.Mutex
}

func NewHTTPReader(fileURL, token string,
objectSize uint64,
client *http.Client,
headers *http.Header,
maxRetries int) (*HTTPReader, error) {

if maxRetries == 0 {
maxRetries = 6
}
func NewHTTPReader(conf *Conf, request *Request,
) (*HTTPReader, error) {

log.Printf("Creating reader for %v, object size %v", fileURL, objectSize)
log.Printf("Creating reader for %v, object size %v", request.FileURL, request.ObjectSize)
reader := &HTTPReader{
client: client,
currentOffset: 0,
fileURL: fileURL,
objectSize: objectSize,
lock: sync.Mutex{},
token: token,
extraHeaders: headers,
maxRetries: maxRetries,
conf: conf,
fileURL: request.FileURL,
objectSize: request.ObjectSize,
lock: sync.Mutex{},
}

reader.initCache()
Expand Down Expand Up @@ -147,19 +144,19 @@ func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) {
r.fileURL, err)
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token))
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token))

if rangeSpec != "" {
req.Header.Add("Range", rangeSpec)
}

if r.extraHeaders != nil {
for h := range *r.extraHeaders {
req.Header.Add(h, r.extraHeaders.Get(h))
if r.conf.Headers != nil {
for h := range *r.conf.Headers {
req.Header.Add(h, r.conf.Headers.Get(h))
}
}

resp, err := r.client.Do(req)
resp, err := r.conf.Client.Do(req)
if err != nil {
return nil, fmt.Errorf(
"Error while making request for %s: %v",
Expand Down Expand Up @@ -244,7 +241,7 @@ func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) {
waitBefore = 2 * waitBefore
}

if waitBefore < time.Duration(math.Pow(2, float64(r.maxRetries)))*time.Second {
if waitBefore < time.Duration(math.Pow(2, float64(r.conf.MaxRetries)))*time.Second {
r.prefetchAt(waitBefore, offset)
}

Expand Down Expand Up @@ -387,14 +384,14 @@ func (r *HTTPReader) doRequest() (*http.Response, error) {
r.fileURL, err)
}

if r.extraHeaders != nil {
for h := range *r.extraHeaders {
req.Header.Add(h, r.extraHeaders.Get(h))
if r.conf.Headers != nil {
for h := range *r.conf.Headers {
req.Header.Add(h, r.conf.Headers.Get(h))
}
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token))
return r.client.Do(req)
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token))
return r.conf.Client.Do(req)
}

func (r *HTTPReader) Read(dst []byte) (n int, err error) {
Expand Down Expand Up @@ -450,7 +447,7 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) {

var data []byte
var wait time.Duration = 1
for tries := 0; tries <= r.maxRetries; tries++ {
for tries := 0; tries <= r.conf.MaxRetries; tries++ {
r.addToOutstanding(start)
data, err = r.doFetch(wantedRange)
r.removeFromOutstanding(start)
Expand Down
71 changes: 39 additions & 32 deletions internal/httpreader/httpreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ func clientPublicKeyHeader() *http.Header {
return &h
}

func useConf() *Conf {
c := Conf{
Client: http.DefaultClient,
Headers: clientPublicKeyHeader(),
MaxRetries: 7,
Token: "token"}

return &c
}
func TestHTTPReader(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
Expand All @@ -94,11 +103,10 @@ func TestHTTPReader(t *testing.T) {
httpmock.RegisterResponder("GET", url,
testDataResponder)

reader, err := NewHTTPReader(url, "",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
0)
reader, err := NewHTTPReader(
useConf(),
&Request{FileURL: url,
ObjectSize: 14000})
assert.Nil(t, err, "Backend failed")

if reader == nil {
Expand Down Expand Up @@ -200,12 +208,12 @@ func TestHTTPReaderPrefetches(t *testing.T) {
testDataResponder)

var readBackBuffer [4096]byte
seeker, err := NewHTTPReader(url,
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
0)

seeker, err := NewHTTPReader(
useConf(),
&Request{FileURL: url,
ObjectSize: 14000})

assert.Nil(t, err, "New reader failed unexpectedly")

_, err = seeker.Read(readBackBuffer[0:4096])
Expand All @@ -215,12 +223,11 @@ func TestHTTPReaderPrefetches(t *testing.T) {
err = seeker.Close()
assert.Nil(t, err, "unexpected error when closing")

reader, err := NewHTTPReader(url,
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
0)
reader, err := NewHTTPReader(
useConf(),
&Request{FileURL: url,
ObjectSize: 14000})

assert.Nil(t, err, "unexpected error when creating reader")
assert.NotNil(t, reader, "unexpected error when creating reader")

Expand Down Expand Up @@ -266,13 +273,14 @@ func TestHTTPReaderFailures(t *testing.T) {

var readBackBuffer [4096]byte

c := useConf()
c.MaxRetries = 2
// test first where the
failreader, err := NewHTTPReader(failurl,
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
2)
failreader, err := NewHTTPReader(
c,
&Request{FileURL: failurl,
ObjectSize: 14000})

// We shouldn't see failures yet
assert.Nil(t, err, "unexpected error when creating reader")
assert.NotNil(t, failreader, "unexpected error when creating reader")
Expand All @@ -297,12 +305,11 @@ func TestHTTPReaderFailures(t *testing.T) {
failCounter = 0
failFirst = 10

serverFailReader, err := NewHTTPReader(failurl+"2",
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
2)
serverFailReader, err := NewHTTPReader(
c,
&Request{FileURL: failurl + "2",
ObjectSize: 14000})

// We shouldn't see failures yet
assert.Nil(t, err, "unexpected error when creating reader")
assert.NotNil(t, serverFailReader, "unexpected error when creating reader")
Expand Down Expand Up @@ -358,10 +365,10 @@ func TestDoRequest(t *testing.T) {
httpmock.RegisterResponder("GET", checkurl,
testDoRequestResponder)

c := useConf()
r := HTTPReader{
conf: c,
fileURL: checkurl,
token: "token",
client: http.DefaultClient,
}
resp, err := r.doRequest()

Expand All @@ -373,7 +380,7 @@ func TestDoRequest(t *testing.T) {
assert.Nil(t, err, "Unexpected error from doRequest")

h := http.Header{}
r.extraHeaders = &h
r.conf.Headers = &h
h.Add("HeaderA", "SomeGoose")

resp, err = r.doRequest()
Expand Down
12 changes: 11 additions & 1 deletion internal/sdafs/sdafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Conf struct {
DirPerms os.FileMode
FilePerms os.FileMode
HTTPClient *http.Client
ChunkSize int
}

// inode is the struct to manage a directory entry
Expand Down Expand Up @@ -740,7 +741,16 @@ func (s *SDAfs) OpenFile(
return fuse.EINVAL
}

r, err := httpreader.NewHTTPReader(s.getFileURL(in), s.token, s.getTotalSize(in), s.client, &s.keyHeader, s.conf.MaxRetries)
conf := httpreader.Conf{
Token: s.token,
Client: s.client,
Headers: &s.keyHeader,
MaxRetries: s.conf.MaxRetries,
ChunkSize: s.conf.ChunkSize,
}
r, err := httpreader.NewHTTPReader(&conf,
&httpreader.Request{FileURL: s.getFileURL(in),
ObjectSize: s.getTotalSize(in)})
if err != nil {
log.Printf("OpenFile failed: reader for %s gave: %v", in.key, err)
return fuse.EIO
Expand Down

0 comments on commit 8b79f3e

Please sign in to comment.