Skip to content

Commit

Permalink
New setting for chunk size when getting data, redid passing to HTTPRe…
Browse files Browse the repository at this point in the history
…ader to support that
  • Loading branch information
pontus committed Jan 14, 2025
1 parent 693b330 commit a634609
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 76 deletions.
23 changes: 21 additions & 2 deletions cmd/sdafs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (

var credentialsFile, rootURL, logFile string
var foreground, open bool
var maxRetries int
var maxRetries uint
var chunkSize uint

var Version string = "development"

Expand Down Expand Up @@ -49,9 +50,12 @@ func getConfigs() mainConfig {
flag.StringVar(&logFile, "log", "", "File to send logs to instead of stderr,"+
" defaults to sdafs.log if detached, empty string means stderr which is default for foreground")

flag.IntVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. Retries will be done with some form of backoff")
flag.UintVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. "+
"Retries will be done with some form of backoff. Max 60")
flag.BoolVar(&foreground, "foreground", false, "Do not detach, run in foreground and send log output to stdout")
flag.BoolVar(&open, "open", false, "Set permissions allowing access by others than the user")
flag.UintVar(&chunkSize, "chunksize", 5120, "Chunk size (in kb) used when fetching data. "+
"Higher values likely to give better throughput but higher latency. Min 64 Max 16384.")

flag.Parse()

Expand All @@ -60,6 +64,19 @@ func getConfigs() mainConfig {
usage()
}

// Some sanity checks
if chunkSize > 16384 || chunkSize < 64 {
fmt.Printf("Chunk size %d is not allowed, valid values are 64 to 16384\n\n",
chunkSize)
usage()
}

if maxRetries > 60 {
fmt.Printf("Max retries %d is not allowed, valid values are 0 to 60\n\n ",
maxRetries)
usage()
}

useLogFile := logFile
// Background and logfile not specified
if !foreground && logFile == "" {
Expand All @@ -71,6 +88,8 @@ func getConfigs() mainConfig {
RootURL: rootURL,
CredentialsFile: credentialsFile,
SkipLevels: 0,
ChunkSize: int(chunkSize),
MaxRetries: int(maxRetries),
}

if open {
Expand Down
31 changes: 29 additions & 2 deletions cmd/sdafs/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/tj/assert"
)

// The handling of checking things that should exit is based on
// https://stackoverflow.com/a/33404435

func TestConfOptionNoMountPoint(t *testing.T) {
// https://stackoverflow.com/a/33404435
if os.Getenv("BE_CRASHER") == "1" {
getConfigs()
return
Expand All @@ -21,6 +23,30 @@ func TestConfOptionNoMountPoint(t *testing.T) {
runExiting(t, "TestConfOptionNoMountPoint")
}

func TestBadChunkSize(t *testing.T) {
if os.Getenv("BE_CRASHER") == "1" {
os.Args = []string{"binary", "-chunksize", "30", "mountpoint"}
flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError)
flag.Parse()
getConfigs()
return
}

runExiting(t, "TestBadChunkSize")
}

func TestBadMaxRetries(t *testing.T) {
if os.Getenv("BE_CRASHER") == "1" {
os.Args = []string{"binary", "-maxretries", "90", "mountpoint"}
flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError)
flag.Parse()
getConfigs()
return
}

runExiting(t, "TestBadMaxRetries")
}

func TestConfOptions(t *testing.T) {
safeArgs := os.Args

Expand Down Expand Up @@ -134,7 +160,8 @@ func runExiting(t *testing.T, testName string) {
return
}

t.Fatalf("process when it should not for test %s %v, want exit status 1",
t.Fatalf("process succeeded when it should not for test %s"+
" %v, want exit status 1",
testName,
err)

Expand Down
75 changes: 36 additions & 39 deletions internal/httpreader/httpreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ import (
"time"
)

type Conf struct {
Token string
Client *http.Client
Headers *http.Header
MaxRetries int
ChunkSize int
}

type Request struct {
FileURL string
ObjectSize uint64
}

// TODO: Make something better that does TTLing

var cache map[string][]CacheBlock
Expand Down Expand Up @@ -54,38 +67,22 @@ func (r *HTTPReader) initCache() {

// HTTPReader is the vehicle to keep track of needed state for the reader
type HTTPReader struct {
client *http.Client
conf *Conf
currentOffset uint64

fileURL string
objectSize uint64
lock sync.Mutex
token string
extraHeaders *http.Header

maxRetries int
fileURL string
objectSize uint64
lock sync.Mutex
}

func NewHTTPReader(fileURL, token string,
objectSize uint64,
client *http.Client,
headers *http.Header,
maxRetries int) (*HTTPReader, error) {

if maxRetries == 0 {
maxRetries = 6
}
func NewHTTPReader(conf *Conf, request *Request,
) (*HTTPReader, error) {

log.Printf("Creating reader for %v, object size %v", fileURL, objectSize)
log.Printf("Creating reader for %v, object size %v", request.FileURL, request.ObjectSize)
reader := &HTTPReader{
client: client,
currentOffset: 0,
fileURL: fileURL,
objectSize: objectSize,
lock: sync.Mutex{},
token: token,
extraHeaders: headers,
maxRetries: maxRetries,
conf: conf,
fileURL: request.FileURL,
objectSize: request.ObjectSize,
lock: sync.Mutex{},
}

reader.initCache()
Expand Down Expand Up @@ -147,19 +144,19 @@ func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) {
r.fileURL, err)
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token))
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token))

if rangeSpec != "" {
req.Header.Add("Range", rangeSpec)
}

if r.extraHeaders != nil {
for h := range *r.extraHeaders {
req.Header.Add(h, r.extraHeaders.Get(h))
if r.conf.Headers != nil {
for h := range *r.conf.Headers {
req.Header.Add(h, r.conf.Headers.Get(h))
}
}

resp, err := r.client.Do(req)
resp, err := r.conf.Client.Do(req)
if err != nil {
return nil, fmt.Errorf(
"Error while making request for %s: %v",
Expand Down Expand Up @@ -244,7 +241,7 @@ func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) {
waitBefore = 2 * waitBefore
}

if waitBefore < time.Duration(math.Pow(2, float64(r.maxRetries)))*time.Second {
if waitBefore < time.Duration(math.Pow(2, float64(r.conf.MaxRetries)))*time.Second {
r.prefetchAt(waitBefore, offset)
}

Expand Down Expand Up @@ -387,14 +384,14 @@ func (r *HTTPReader) doRequest() (*http.Response, error) {
r.fileURL, err)
}

if r.extraHeaders != nil {
for h := range *r.extraHeaders {
req.Header.Add(h, r.extraHeaders.Get(h))
if r.conf.Headers != nil {
for h := range *r.conf.Headers {
req.Header.Add(h, r.conf.Headers.Get(h))
}
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token))
return r.client.Do(req)
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token))
return r.conf.Client.Do(req)
}

func (r *HTTPReader) Read(dst []byte) (n int, err error) {
Expand Down Expand Up @@ -450,7 +447,7 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) {

var data []byte
var wait time.Duration = 1
for tries := 0; tries <= r.maxRetries; tries++ {
for tries := 0; tries <= r.conf.MaxRetries; tries++ {
r.addToOutstanding(start)
data, err = r.doFetch(wantedRange)
r.removeFromOutstanding(start)
Expand Down
71 changes: 39 additions & 32 deletions internal/httpreader/httpreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ func clientPublicKeyHeader() *http.Header {
return &h
}

func useConf() *Conf {
c := Conf{
Client: http.DefaultClient,
Headers: clientPublicKeyHeader(),
MaxRetries: 7,
Token: "token"}

return &c
}
func TestHTTPReader(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
Expand All @@ -94,11 +103,10 @@ func TestHTTPReader(t *testing.T) {
httpmock.RegisterResponder("GET", url,
testDataResponder)

reader, err := NewHTTPReader(url, "",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
0)
reader, err := NewHTTPReader(
useConf(),
&Request{FileURL: url,
ObjectSize: 14000})
assert.Nil(t, err, "Backend failed")

if reader == nil {
Expand Down Expand Up @@ -200,12 +208,12 @@ func TestHTTPReaderPrefetches(t *testing.T) {
testDataResponder)

var readBackBuffer [4096]byte
seeker, err := NewHTTPReader(url,
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
0)

seeker, err := NewHTTPReader(
useConf(),
&Request{FileURL: url,
ObjectSize: 14000})

assert.Nil(t, err, "New reader failed unexpectedly")

_, err = seeker.Read(readBackBuffer[0:4096])
Expand All @@ -215,12 +223,11 @@ func TestHTTPReaderPrefetches(t *testing.T) {
err = seeker.Close()
assert.Nil(t, err, "unexpected error when closing")

reader, err := NewHTTPReader(url,
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
0)
reader, err := NewHTTPReader(
useConf(),
&Request{FileURL: url,
ObjectSize: 14000})

assert.Nil(t, err, "unexpected error when creating reader")
assert.NotNil(t, reader, "unexpected error when creating reader")

Expand Down Expand Up @@ -266,13 +273,14 @@ func TestHTTPReaderFailures(t *testing.T) {

var readBackBuffer [4096]byte

c := useConf()
c.MaxRetries = 2
// test first where the
failreader, err := NewHTTPReader(failurl,
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
2)
failreader, err := NewHTTPReader(
c,
&Request{FileURL: failurl,
ObjectSize: 14000})

// We shouldn't see failures yet
assert.Nil(t, err, "unexpected error when creating reader")
assert.NotNil(t, failreader, "unexpected error when creating reader")
Expand All @@ -297,12 +305,11 @@ func TestHTTPReaderFailures(t *testing.T) {
failCounter = 0
failFirst = 10

serverFailReader, err := NewHTTPReader(failurl+"2",
"token",
14000,
http.DefaultClient,
clientPublicKeyHeader(),
2)
serverFailReader, err := NewHTTPReader(
c,
&Request{FileURL: failurl + "2",
ObjectSize: 14000})

// We shouldn't see failures yet
assert.Nil(t, err, "unexpected error when creating reader")
assert.NotNil(t, serverFailReader, "unexpected error when creating reader")
Expand Down Expand Up @@ -358,10 +365,10 @@ func TestDoRequest(t *testing.T) {
httpmock.RegisterResponder("GET", checkurl,
testDoRequestResponder)

c := useConf()
r := HTTPReader{
conf: c,
fileURL: checkurl,
token: "token",
client: http.DefaultClient,
}
resp, err := r.doRequest()

Expand All @@ -373,7 +380,7 @@ func TestDoRequest(t *testing.T) {
assert.Nil(t, err, "Unexpected error from doRequest")

h := http.Header{}
r.extraHeaders = &h
r.conf.Headers = &h
h.Add("HeaderA", "SomeGoose")

resp, err = r.doRequest()
Expand Down
12 changes: 11 additions & 1 deletion internal/sdafs/sdafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Conf struct {
DirPerms os.FileMode
FilePerms os.FileMode
HTTPClient *http.Client
ChunkSize int
}

// inode is the struct to manage a directory entry
Expand Down Expand Up @@ -740,7 +741,16 @@ func (s *SDAfs) OpenFile(
return fuse.EINVAL
}

r, err := httpreader.NewHTTPReader(s.getFileURL(in), s.token, s.getTotalSize(in), s.client, &s.keyHeader, s.conf.MaxRetries)
conf := httpreader.Conf{
Token: s.token,
Client: s.client,
Headers: &s.keyHeader,
MaxRetries: s.conf.MaxRetries,
ChunkSize: s.conf.ChunkSize,
}
r, err := httpreader.NewHTTPReader(&conf,
&httpreader.Request{FileURL: s.getFileURL(in),
ObjectSize: s.getTotalSize(in)})
if err != nil {
log.Printf("OpenFile failed: reader for %s gave: %v", in.key, err)
return fuse.EIO
Expand Down

0 comments on commit a634609

Please sign in to comment.