Skip to content

Commit

Permalink
feat: parallel ftp
Browse files Browse the repository at this point in the history
  • Loading branch information
Vilsol committed Dec 27, 2023
1 parent dd40fef commit 2bfc225
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 62 deletions.
170 changes: 109 additions & 61 deletions cli/disk/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package disk

import (
"bytes"
"context"
"fmt"
"io"
"log"
"log/slog"
"net/url"
"path/filepath"
"strings"
"sync"
"time"

"github.com/jackc/puddle/v2"
"github.com/jlaffaye/ftp"
)

// TODO Make configurable
const connectionCount = 5

var _ Disk = (*ftpDisk)(nil)

type ftpDisk struct {
client *ftp.ServerConn
path string
stepLock sync.Mutex
pool *puddle.Pool[*ftp.ServerConn]
path string
}

type ftpEntry struct {
Expand All @@ -40,19 +44,31 @@ func newFTP(path string) (Disk, error) {
return nil, fmt.Errorf("failed to parse ftp url: %w", err)
}

c, failedHidden, _ := testFTP(u, ftp.DialWithTimeout(time.Second*5), ftp.DialWithForceListHidden(true))
if failedHidden {
c, _, err = testFTP(u, ftp.DialWithTimeout(time.Second*5))
if err != nil {
return nil, err
}
}
pool, err := puddle.NewPool(&puddle.Config[*ftp.ServerConn]{
Constructor: func(ctx context.Context) (*ftp.ServerConn, error) {
c, failedHidden, err := testFTP(u, ftp.DialWithTimeout(time.Second*5), ftp.DialWithForceListHidden(true))
if failedHidden {
c, _, err = testFTP(u, ftp.DialWithTimeout(time.Second*5))
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}

slog.Info("logged into ftp", slog.Bool("hidden-files", !failedHidden))

slog.Info("logged into ftp", slog.Bool("hidden-files", !failedHidden))
return c, nil
},
MaxSize: connectionCount,
})
if err != nil {
log.Fatal(err)
}

return &ftpDisk{
path: u.Path,
client: c,
path: u.Path,
pool: pool,
}, nil
}

Expand All @@ -76,25 +92,23 @@ func testFTP(u *url.URL, options ...ftp.DialOption) (*ftp.ServerConn, bool, erro
}

func (l *ftpDisk) Exists(path string) (bool, error) {
l.stepLock.Lock()
defer l.stepLock.Unlock()

slog.Debug("checking if file exists", slog.String("path", clean(path)), slog.String("schema", "ftp"))

slog.Debug("going to root directory", slog.String("schema", "ftp"))
err := l.client.ChangeDir("/")
res, err := l.acquire()
if err != nil {
return false, fmt.Errorf("failed to change directory: %w", err)
return false, err
}

defer res.Release()

slog.Debug("checking if file exists", slog.String("path", clean(path)), slog.String("schema", "ftp"))

split := strings.Split(clean(path)[1:], "/")
for _, s := range split[:len(split)-1] {
dir, err := l.ReadDirLock("", false)
dir, err := l.readDirLock(res, "")
if err != nil {
return false, err
}

currentDir, _ := l.client.CurrentDir()
currentDir, _ := res.Value().CurrentDir()

foundDir := false
for _, entry := range dir {
Expand All @@ -109,12 +123,12 @@ func (l *ftpDisk) Exists(path string) (bool, error) {
}

slog.Debug("entering directory", slog.String("dir", s), slog.String("cwd", currentDir), slog.String("schema", "ftp"))
if err := l.client.ChangeDir(s); err != nil {
if err := res.Value().ChangeDir(s); err != nil {
return false, fmt.Errorf("failed to enter directory: %w", err)
}
}

dir, err := l.ReadDirLock("", false)
dir, err := l.readDirLock(res, "")
if err != nil {
return false, fmt.Errorf("failed listing directory: %w", err)
}
Expand All @@ -131,12 +145,16 @@ func (l *ftpDisk) Exists(path string) (bool, error) {
}

func (l *ftpDisk) Read(path string) ([]byte, error) {
l.stepLock.Lock()
defer l.stepLock.Unlock()
res, err := l.acquire()
if err != nil {
return nil, err
}

defer res.Release()

slog.Debug("reading file", slog.String("path", clean(path)), slog.String("schema", "ftp"))

f, err := l.client.Retr(clean(path))
f, err := res.Value().Retr(clean(path))
if err != nil {
return nil, fmt.Errorf("failed to retrieve path: %w", err)
}
Expand All @@ -152,30 +170,32 @@ func (l *ftpDisk) Read(path string) ([]byte, error) {
}

func (l *ftpDisk) Write(path string, data []byte) error {
l.stepLock.Lock()
defer l.stepLock.Unlock()
res, err := l.acquire()
if err != nil {
return err
}

defer res.Release()

slog.Debug("writing to file", slog.String("path", clean(path)), slog.String("schema", "ftp"))
if err := l.client.Stor(clean(path), bytes.NewReader(data)); err != nil {
if err := res.Value().Stor(clean(path), bytes.NewReader(data)); err != nil {
return fmt.Errorf("failed to write file: %w", err)
}

return nil
}

func (l *ftpDisk) Remove(path string) error {
l.stepLock.Lock()
defer l.stepLock.Unlock()

slog.Debug("going to root directory", slog.String("schema", "ftp"))
err := l.client.ChangeDir("/")
res, err := l.acquire()
if err != nil {
return fmt.Errorf("failed to change directory: %w", err)
return err
}

defer res.Release()

slog.Debug("deleting path", slog.String("path", clean(path)), slog.String("schema", "ftp"))
if err := l.client.Delete(clean(path)); err != nil {
if err := l.client.RemoveDirRecur(clean(path)); err != nil {
if err := res.Value().Delete(clean(path)); err != nil {
if err := res.Value().RemoveDirRecur(clean(path)); err != nil {
return fmt.Errorf("failed to delete path: %w", err)
}
}
Expand All @@ -184,23 +204,21 @@ func (l *ftpDisk) Remove(path string) error {
}

func (l *ftpDisk) MkDir(path string) error {
l.stepLock.Lock()
defer l.stepLock.Unlock()

slog.Debug("going to root directory", slog.String("schema", "ftp"))
err := l.client.ChangeDir("/")
res, err := l.acquire()
if err != nil {
return fmt.Errorf("failed to change directory: %w", err)
return err
}

defer res.Release()

split := strings.Split(clean(path)[1:], "/")
for _, s := range split {
dir, err := l.ReadDirLock("", false)
dir, err := l.readDirLock(res, "")
if err != nil {
return err
}

currentDir, _ := l.client.CurrentDir()
currentDir, _ := res.Value().CurrentDir()

foundDir := false
for _, entry := range dir {
Expand All @@ -212,13 +230,13 @@ func (l *ftpDisk) MkDir(path string) error {

if !foundDir {
slog.Debug("making directory", slog.String("dir", s), slog.String("cwd", currentDir), slog.String("schema", "ftp"))
if err := l.client.MakeDir(s); err != nil {
if err := res.Value().MakeDir(s); err != nil {
return fmt.Errorf("failed to make directory: %w", err)
}
}

slog.Debug("entering directory", slog.String("dir", s), slog.String("cwd", currentDir), slog.String("schema", "ftp"))
if err := l.client.ChangeDir(s); err != nil {
if err := res.Value().ChangeDir(s); err != nil {
return fmt.Errorf("failed to enter directory: %w", err)
}
}
Expand All @@ -227,18 +245,20 @@ func (l *ftpDisk) MkDir(path string) error {
}

func (l *ftpDisk) ReadDir(path string) ([]Entry, error) {
return l.ReadDirLock(path, true)
}

func (l *ftpDisk) ReadDirLock(path string, lock bool) ([]Entry, error) {
if lock {
l.stepLock.Lock()
defer l.stepLock.Unlock()
res, err := l.acquire()
if err != nil {
return nil, err
}

defer res.Release()

return l.readDirLock(res, path)
}

func (l *ftpDisk) readDirLock(res *puddle.Resource[*ftp.ServerConn], path string) ([]Entry, error) {
slog.Debug("reading directory", slog.String("path", clean(path)), slog.String("schema", "ftp"))

dir, err := l.client.List(clean(path))
dir, err := res.Value().List(clean(path))
if err != nil {
return nil, fmt.Errorf("failed to list files in directory: %w", err)
}
Expand All @@ -254,15 +274,19 @@ func (l *ftpDisk) ReadDirLock(path string, lock bool) ([]Entry, error) {
}

func (l *ftpDisk) Open(path string, _ int) (io.WriteCloser, error) {
res, err := l.acquire()
if err != nil {
return nil, err
}

reader, writer := io.Pipe()

slog.Debug("opening for writing", slog.String("path", clean(path)), slog.String("schema", "ftp"))

go func() {
l.stepLock.Lock()
defer l.stepLock.Unlock()
defer res.Release()

err := l.client.Stor(clean(path), reader)
err := res.Value().Stor(clean(path), reader)
if err != nil {
slog.Error("failed to store file", slog.Any("err", err))
}
Expand All @@ -271,3 +295,27 @@ func (l *ftpDisk) Open(path string, _ int) (io.WriteCloser, error) {

return writer, nil
}

func (l *ftpDisk) goHome(res *puddle.Resource[*ftp.ServerConn]) error {
slog.Debug("going to root directory", slog.String("schema", "ftp"))

err := res.Value().ChangeDir("/")
if err != nil {
return fmt.Errorf("failed to change directory: %w", err)
}

return nil
}

func (l *ftpDisk) acquire() (*puddle.Resource[*ftp.ServerConn], error) {
res, err := l.pool.Acquire(context.TODO())
if err != nil {
return nil, fmt.Errorf("failed acquiring connection: %w", err)
}

if err := l.goHome(res); err != nil {
return nil, err
}

return res, nil
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/charmbracelet/glamour v0.6.0
github.com/charmbracelet/lipgloss v0.9.1
github.com/charmbracelet/x/exp/teatest v0.0.0-20231215171016-7ba2b450712d
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/jackc/puddle/v2 v2.2.1
github.com/jlaffaye/ftp v0.2.0
github.com/lmittmann/tint v1.0.3
github.com/muesli/reflow v0.3.0
Expand Down Expand Up @@ -48,6 +48,7 @@ require (
github.com/charmbracelet/harmonica v0.2.0 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gookit/color v1.5.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jlaffaye/ftp v0.2.0 h1:lXNvW7cBu7R/68bknOX3MrRIIqZ61zELs1P2RAiA3lg=
github.com/jlaffaye/ftp v0.2.0/go.mod h1:is2Ds5qkhceAPy2xD6RLI6hmp/qysSoymZ+Z2uTnspI=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down

0 comments on commit 2bfc225

Please sign in to comment.