From 8af61aff87d6cad6b38d67a181a9e32e9249897a Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Tue, 12 Nov 2024 15:19:42 -0800 Subject: [PATCH 1/6] bump version --- ocfl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocfl.go b/ocfl.go index 73da0d63..bc987fd1 100644 --- a/ocfl.go +++ b/ocfl.go @@ -15,7 +15,7 @@ import ( const ( // package version - Version = "0.4.0" + Version = "0.5.0" LogsDir = "logs" ExtensionsDir = "extensions" ) From 2daa149add5580469ca69f5650fac52fe489d59d Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Tue, 12 Nov 2024 15:32:44 -0800 Subject: [PATCH 2/6] github actions workflow --- .github/workflows/go.yml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 .github/workflows/go.yml diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 00000000..210214d8 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,29 @@ +name: 'go' +on: # rebuild any PRs and main branch changes + pull_request: + push: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + steps: + + - name: Checkout (GitHub) + uses: actions/checkout@v4 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build devcontainer and run tests + uses: devcontainers/ci@v0.3 + with: + imageName: ghcr.io/${{ github.repository_owner }}/ocfl-go-devcontainer + eventFilterForPush: pull_request + runCmd: | + go test ./... -count=5 -race \ No newline at end of file From beb4dd7bde83f073b072928bd06f584edd2fb0c0 Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Wed, 13 Nov 2024 22:33:20 +0000 Subject: [PATCH 3/6] generic helpers errors in iterators --- files.go | 80 +++++++++++++++++++++++++++----------------------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/files.go b/files.go index f1564d87..62bb4f8a 100644 --- a/files.go +++ b/files.go @@ -230,36 +230,16 @@ type FileErrSeq iter.Seq2[*FileRef, error] // UntilErr returns a new iterator yielding *FileRef values from seq that // terminates on the first non-nil error in seq. The terminating error is // returned by errFn. -func (seq FileErrSeq) UntilErr() (files FileSeq, errFn func() error) { - var firstErr error - files = func(yield func(*FileRef) bool) { - for file, err := range seq { - if err != nil { - firstErr = err - break - } - if !yield(file) { - break - } - } - } - errFn = func() error { return firstErr } - return +func (fileErrs FileErrSeq) UntilErr() (FileSeq, func() error) { + files, errFn := seqUntilErr(iter.Seq2[*FileRef, error](fileErrs)) + return FileSeq(files), errFn } // IgnoreErr returns an iterator of *[FileRef]s in seq that are not associated // with an error. -func (seq FileErrSeq) IgnoreErr() (files FileSeq) { - return func(yield func(*FileRef) bool) { - for file, err := range seq { - if err != nil { - continue - } - if !yield(file) { - break - } - } - } +func (fileErrs FileErrSeq) IgnoreErr() FileSeq { + files := seqIgnoreErr(iter.Seq2[*FileRef, error](fileErrs)) + return FileSeq(files) } // FileDigests is a [FileRef] plus digest values of the file contents. @@ -386,21 +366,9 @@ func (digests FileDigestsSeq) Stage() (*Stage, error) { // UntilErr returns an iterator of *FileDigests from dfs that terminates on the // first non-nil error in dfs. The terminating error is returned by errFn -func (dfs FileDigestsErrSeq) UntilErr() (digests FileDigestsSeq, errFn func() error) { - var firstErr error - digests = func(yield func(*FileDigests) bool) { - for file, err := range dfs { - if err != nil { - firstErr = err - break - } - if !yield(file) { - break - } - } - } - errFn = func() error { return firstErr } - return +func (dfs FileDigestsErrSeq) UntilErr() (FileDigestsSeq, func() error) { + seq, errFn := seqUntilErr(iter.Seq2[*FileDigests, error](dfs)) + return FileDigestsSeq(seq), errFn } func walkFiles(ctx context.Context, fsys FS, dir string) (FileSeq, func() error) { @@ -471,3 +439,33 @@ var errBreakFileWalk = errors.New("break") func validFileType(mode fs.FileMode) bool { return mode.IsDir() || mode.IsRegular() || mode.Type() == fs.ModeIrregular } + +func seqUntilErr[T any](inSeq iter.Seq2[T, error]) (outSeq iter.Seq[T], errFn func() error) { + var firstErr error + outSeq = func(yield func(T) bool) { + for v, err := range inSeq { + if err != nil { + firstErr = err + break + } + if !yield(v) { + break + } + } + } + errFn = func() error { return firstErr } + return +} + +func seqIgnoreErr[T any](inSeq iter.Seq2[T, error]) iter.Seq[T] { + return func(yield func(T) bool) { + for v, err := range inSeq { + if err != nil { + continue + } + if !yield(v) { + break + } + } + } +} From b403e2214767e74726c0217831b8f78aa34004f8 Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Wed, 13 Nov 2024 22:34:21 +0000 Subject: [PATCH 4/6] cleanup storage root directory walk --- object.go | 29 --------------------- objectstate.go | 10 +++++++ root.go | 71 ++++++++++++++++++++++++++++++++++++-------------- root_test.go | 12 ++++----- 4 files changed, 67 insertions(+), 55 deletions(-) diff --git a/object.go b/object.go index 88248726..da4d794d 100644 --- a/object.go +++ b/object.go @@ -357,32 +357,3 @@ func objectExpectedID(id string) ObjectOption { o.expectID = id } } - -// ObjectPaths searches dir in fsys (and its subdirectories) for OCFL object -// declarations and returns an iterator that yields each object path it finds. -func ObjectPaths(ctx context.Context, fsys FS, dir string) func(yield func(string, error) bool) { - return func(yield func(string, error) bool) { - objectPathsWalk(ctx, fsys, dir, yield) - } -} - -func objectPathsWalk(ctx context.Context, fsys FS, dir string, yield func(string, error) bool) bool { - entries, err := fsys.ReadDir(ctx, dir) - if err != nil { - yield("", err) - return false - } - state := ParseObjectDir(entries) - if state.HasNamaste() { - return yield(dir, nil) - } - for _, e := range entries { - if e.IsDir() { - subdir := path.Join(dir, e.Name()) - if !objectPathsWalk(ctx, fsys, subdir, yield) { - return false - } - } - } - return true -} diff --git a/objectstate.go b/objectstate.go index 1671ee44..620e23ed 100644 --- a/objectstate.go +++ b/objectstate.go @@ -138,3 +138,13 @@ func (state ObjectState) HasVersionDir(v VNum) bool { func (state ObjectState) Empty() bool { return state.Flags == 0 && len(state.VersionDirs) == 0 && len(state.Invalid) == 0 } + +// Namaste return the ObjectState's Namaste value, which may be a zero value. +func (state ObjectState) Namaste() Namaste { + var n Namaste + if state.HasNamaste() { + n.Version = state.Spec + n.Type = NamasteTypeObject + } + return n +} diff --git a/root.go b/root.go index 6931a902..a4cd254b 100644 --- a/root.go +++ b/root.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "io/fs" + "iter" "path" "github.com/srerickson/ocfl-go/extension" @@ -136,30 +137,26 @@ func (r *Root) ResolveID(id string) (string, error) { return objPath, nil } -// ObjectPaths returns an iterator of object paths for the objects in the -// storage root. -func (r *Root) ObjectPaths(ctx context.Context) func(func(string, error) bool) { - return ObjectPaths(ctx, r.fs, r.dir) -} - -// Objects returns an iterator of objects in the storage root. -func (r *Root) Objects(ctx context.Context) func(func(*Object, error) bool) { - return func(yield func(*Object, error) bool) { - for dir, err := range r.ObjectPaths(ctx) { +// WalkObjectDirs returns an iterator that walks r's directory structure, +// yielding paths and directory entries for all objects. If an error is +// encountered, iteration terminates. The terminating error is accessed with the +// returned error function. +func (r *Root) WalkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) { + var walkErr error + dirs := func(yield func(string, []fs.DirEntry) bool) { + for dir, err := range r.walkDirs(ctx) { if err != nil { - yield(nil, err) - return + walkErr = err + break } - obj, err := NewObject(ctx, r.fs, dir) - if err != nil { - yield(nil, err) - return - } - if !yield(obj, nil) { - return + if ParseObjectDir(dir.entries).HasNamaste() { + if !yield(dir.path, dir.entries) { + break + } } } } + return dirs, func() error { return walkErr } } // Path returns the root's dir relative to its FS @@ -367,3 +364,39 @@ func InitRoot(spec Spec, layoutDesc string, extensions ...extension.Extension) R } } } + +func (root *Root) walkDirs(ctx context.Context) iter.Seq2[*rootDirRef, error] { + return func(yield func(*rootDirRef, error) bool) { + root.walkDir(ctx, &rootDirRef{path: "."}, yield) + } +} + +// rootDirRef is a directory in a storage root +type rootDirRef struct { + path string // path relative to the storage root + entries []fs.DirEntry +} + +// walkDir reads dir, yields the result, and calls walkDir on each subdirectory +// unless dir is an object root +func (root *Root) walkDir(ctx context.Context, ref *rootDirRef, yield func(*rootDirRef, error) bool) { + var err error + ref.entries, err = root.fs.ReadDir(ctx, path.Join(root.dir, ref.path)) + if !yield(ref, err) { + return + } + if len(ref.entries) < 1 { + return + } + if ParseObjectDir(ref.entries).HasNamaste() { + // don't descend below object root directory + return + } + // TODO: don't descent below extension directories + for _, e := range ref.entries { + if e.IsDir() { + next := &rootDirRef{path: path.Join(ref.path, e.Name())} + root.walkDir(ctx, next, yield) + } + } +} diff --git a/root_test.go b/root_test.go index 4773fcfd..58754ab6 100644 --- a/root_test.go +++ b/root_test.go @@ -77,7 +77,6 @@ func TestRoot(t *testing.T) { be.NilErr(t, ocfl.ValidateObject(ctx, obj.FS(), obj.Path()).Err()) be.Equal(t, objID, sameObj.Inventory().ID()) }) - t.Run("Objects", func(t *testing.T) { t.Run("simple-root", func(t *testing.T) { fsys := ocfl.DirFS(storeFixturePath) @@ -85,15 +84,14 @@ func TestRoot(t *testing.T) { root, err := ocfl.NewRoot(ctx, fsys, dir) be.NilErr(t, err) count := 0 - for _, err := range root.Objects(ctx) { - be.NilErr(t, err) + dirs, walkErr := root.WalkObjectDirs(ctx) + for dir := range dirs { + valid := root.ValidateObjectDir(ctx, dir) + be.NilErr(t, valid.Err()) count++ } be.Equal(t, 3, count) - - valid := root.ValidateObjectDir(ctx, "http%3A%2F%2Fexample.org%2Fminimal_mixed_digests") - be.NilErr(t, valid.Err()) - + be.NilErr(t, walkErr()) }) }) From ad16ecdcb0e973a05f806dd00722ac57d7a7b2a5 Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Thu, 14 Nov 2024 23:40:00 +0000 Subject: [PATCH 5/6] remove unused internal package --- internal/walkdirs/flowmatic.go | 125 --------------------------------- internal/walkdirs/walkdirs.go | 68 ------------------ 2 files changed, 193 deletions(-) delete mode 100644 internal/walkdirs/flowmatic.go delete mode 100644 internal/walkdirs/walkdirs.go diff --git a/internal/walkdirs/flowmatic.go b/internal/walkdirs/flowmatic.go deleted file mode 100644 index 092d718a..00000000 --- a/internal/walkdirs/flowmatic.go +++ /dev/null @@ -1,125 +0,0 @@ -package walkdirs - -// Code in this file was adapted from Carl Johnson's "flowmatic" package, -// distibuted with the following license. - -// MIT License - -// Copyright (c) 2022 Carl Johnson - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -import ( - "runtime" - "sync" - - "github.com/carlmjohnson/deque" -) - -// Manager is a function that serially examines Task results to see if it produced any new Inputs. -// Returning false will halt the processing of future tasks. -type Manager[Input, Output any] func(Input, Output, error) (tasks []Input, ok bool) - -// Task is a function that can concurrently transform an input into an output. -type Task[Input, Output any] func(in Input) (out Output, err error) - -// // DoTasks does tasks using n concurrent workers (or GOMAXPROCS workers if n < -// 1) which produce output consumed by a serially run manager. The manager -// should return a slice of new task inputs based on prior task results, or -// return false to halt processing. If a task panics during execution, the panic -// will be caught and rethrown in the parent Goroutine. -// -// DoTailingTasks is the same as DoTasks except tasks in the task queue are -// evaluated in last in, first out order. -func DoTailingTasks[Input, Output any](n int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input) { - in, out := start(n, task) - defer func() { - close(in) - // drain any waiting tasks - for range out { - } - }() - queue := deque.Of(initial...) - inflight := 0 - for inflight > 0 || queue.Len() > 0 { - inch := in - item, ok := queue.Tail() - if !ok { - inch = nil - } - select { - case inch <- item: - inflight++ - queue.PopTail() - case r := <-out: - inflight-- - if r.Panic != nil { - panic(r.Panic) - } - items, ok := manager(r.In, r.Out, r.Err) - if !ok { - return - } - queue.Append(items...) - } - } -} - -// result is the type returned by the output channel of Start. -type result[Input, Output any] struct { - In Input - Out Output - Err error - Panic any -} - -// start n workers (or GOMAXPROCS workers if n < 1) which consume -// the in channel, execute task, and send the Result on the out channel. -// Callers should close the in channel to stop the workers from waiting for tasks. -// The out channel will be closed once the last result has been sent. -func start[Input, Output any](n int, task Task[Input, Output]) (in chan<- Input, out <-chan result[Input, Output]) { - if n < 1 { - n = runtime.GOMAXPROCS(0) - } - inch := make(chan Input) - ouch := make(chan result[Input, Output], n) - var wg sync.WaitGroup - wg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer wg.Done() - defer func() { - pval := recover() - if pval == nil { - return - } - ouch <- result[Input, Output]{Panic: pval} - }() - for inval := range inch { - outval, err := task(inval) - ouch <- result[Input, Output]{inval, outval, err, nil} - } - }() - } - go func() { - wg.Wait() - close(ouch) - }() - return inch, ouch -} diff --git a/internal/walkdirs/walkdirs.go b/internal/walkdirs/walkdirs.go deleted file mode 100644 index dfd3bb55..00000000 --- a/internal/walkdirs/walkdirs.go +++ /dev/null @@ -1,68 +0,0 @@ -package walkdirs - -import ( - "context" - "errors" - "io/fs" - "path" - "runtime" -) - -type FS interface { - ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) -} - -type SkipFunc func(string) bool - -// ErrSkipDirs can be returned by a WalkDirsFunc to prevent WalkDirs from -// walking subdirectories. -var ErrSkipDirs = errors.New("skip subdirectories") - -// WalkDirsFunc is a function called for each directory by WalkDirs. If -// the function returns ErrSkipDirs, none of the directory's subdirectories -// are walked. -type WalkDirsFunc func(name string, entries []fs.DirEntry, err error) error - -// WalkDirs is a directory-oriented FS walker. It walks the FS starting at root, -// calling fn for each directory. If fn returns an error (other than -// ErrSkipDirs), the walk is canceled. WalkDirs reads directory entries in -// concurrent goroutines, the number of which is configurable. Each call to the -// WalkDirsFunc occurs from the same goroutine. The directory structure is -// walked depth-first order and in lexical order if concurrency is 1. -func WalkDirs(ctx context.Context, fsys FS, dir string, skipfn SkipFunc, fn WalkDirsFunc, gos int) error { - if gos < 1 { - gos = runtime.NumCPU() - } - readDirTask := func(dir string) ([]fs.DirEntry, error) { - return fsys.ReadDir(ctx, dir) - } - var walkErr error - // walkMgr is called for each directory and returns slice of paths to walk - walkMgr := func(dir string, entries []fs.DirEntry, err error) ([]string, bool) { - if fnErr := fn(dir, entries, err); fnErr != nil { - if errors.Is(fnErr, ErrSkipDirs) { - // don't add this directory's sub-directories - return nil, true - } - walkErr = fnErr - return nil, false - } - var subDirs []string // paths to continue search - // evaluate entries in reverse order so they are in lexical order for - // DoTailingTasks (which is LIFO). Note, - for i := len(entries); i > 0; i-- { - e := entries[i-1] - if !e.IsDir() { - continue - } - subdir := path.Join(dir, e.Name()) - if skipfn != nil && skipfn(subdir) { - continue - } - subDirs = append(subDirs, subdir) - } - return subDirs, true - } - DoTailingTasks(gos, readDirTask, walkMgr, dir) - return walkErr -} From 29f468553333f266a3af0804dfcd909253a45b0f Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Thu, 14 Nov 2024 23:40:48 +0000 Subject: [PATCH 6/6] use object declarations iterator for Root.Objects() --- examples/listobjects/main.go | 57 ++++++------------------- namaste.go | 2 +- objectstate.go | 2 +- root.go | 83 ++++++++++++++++++++++++++++-------- root_test.go | 24 ++++++++--- 5 files changed, 98 insertions(+), 70 deletions(-) diff --git a/examples/listobjects/main.go b/examples/listobjects/main.go index df9be78b..eec80849 100644 --- a/examples/listobjects/main.go +++ b/examples/listobjects/main.go @@ -33,24 +33,22 @@ func main() { logger.Error("missing required storage root URI") os.Exit(1) } - fsys, dir, err := parseStoreConn(ctx, storeConn) - if err != nil { - logger.Error("can't parse storage root argument", "err", err) - os.Exit(1) - } - if err := listObjects2(ctx, fsys, dir, numgos, logger); err != nil { + if err := listObjects(ctx, storeConn, numgos, logger); err != nil { logger.Error("exit with errors", "err", err) os.Exit(1) } } -func listObjects2(ctx context.Context, fsys ocfl.FS, dir string, numgos int, log *slog.Logger) (err error) { - allFiles, walkErrFn := ocfl.WalkFiles(ctx, fsys, dir) - defer func() { - err = walkErrFn() - }() - decls := allFiles.Filter(func(f *ocfl.FileRef) bool { return f.Namaste().IsObject() }) - for obj, err := range decls.OpenObjectsBatch(ctx, numgos) { +func listObjects(ctx context.Context, storeConn string, numgos int, log *slog.Logger) (err error) { + fsys, dir, err := parseStoreConn(ctx, storeConn) + if err != nil { + return fmt.Errorf("can't parse storage root argument: %w", err) + } + root, err := ocfl.NewRoot(ctx, fsys, dir) + if err != nil { + return nil + } + for obj, err := range root.ObjectsBatch(ctx, numgos) { if err != nil { log.Error(err.Error()) continue @@ -58,40 +56,9 @@ func listObjects2(ctx context.Context, fsys ocfl.FS, dir string, numgos int, log id := obj.Inventory().ID() fmt.Println(id) } - return + return nil } -// func listObjects(ctx context.Context, fsys ocfl.FS, dir string, gos int, _ *slog.Logger) error { -// objectDirs := func(yield func(string) bool) { -// for dir, err := range ocfl.ObjectPaths(ctx, fsys, dir) { -// if err != nil { -// break -// } -// if !yield(dir) { -// break -// } -// } -// } -// getID := func(dir string) (ocfl.ReadInventory, error) { -// obj, err := ocfl.NewObject(ctx, fsys, dir) -// if err != nil { -// return nil, err -// } -// return obj.Inventory(), nil -// } -// var err error -// resultIter := pipeline.Results(objectDirs, getID, gos) -// resultIter(func(r pipeline.Result[string, ocfl.ReadInventory]) bool { -// if r.Err != nil { -// err = r.Err -// return false -// } -// fmt.Println(r.In, r.Out.ID()) -// return true -// }) -// return err -// } - func parseStoreConn(ctx context.Context, name string) (ocfl.FS, string, error) { //if we were using s3-based backend: rl, err := url.Parse(name) diff --git a/namaste.go b/namaste.go index 941d82cf..73e53324 100644 --- a/namaste.go +++ b/namaste.go @@ -51,7 +51,7 @@ func FindNamaste(items []fs.DirEntry) (Namaste, error) { } } -// Name returns the filename for d (0=TYPE_VERSION) or an empty string if d is +// Name returns the filename for n ('0=TYPE_VERSION') or an empty string if n is // empty func (n Namaste) Name() string { if n.Type == "" || n.Version.Empty() { diff --git a/objectstate.go b/objectstate.go index 620e23ed..c8ce982c 100644 --- a/objectstate.go +++ b/objectstate.go @@ -139,7 +139,7 @@ func (state ObjectState) Empty() bool { return state.Flags == 0 && len(state.VersionDirs) == 0 && len(state.Invalid) == 0 } -// Namaste return the ObjectState's Namaste value, which may be a zero value. +// Namaste returns state's Namaste value, which may be a zero value. func (state ObjectState) Namaste() Namaste { var n Namaste if state.HasNamaste() { diff --git a/root.go b/root.go index a4cd254b..711b36b0 100644 --- a/root.go +++ b/root.go @@ -37,9 +37,9 @@ type Root struct { initArgs *initRootArgs } -// NewRoot returns a new *Root for working with the OCFL storage root at +// NewRoot returns a new *[Root] for working with the OCFL storage root at // directory dir in fsys. It can be used to initialize new storage roots if the -// InitRoot option is used, fsys is an ocfl.WriteFS, and dir is a non-existing +// [InitRoot] option is used, fsys is an ocfl.WriteFS, and dir is a non-existing // or empty directory. func NewRoot(ctx context.Context, fsys FS, dir string, opts ...RootOption) (*Root, error) { r := &Root{fs: fsys, dir: dir} @@ -137,26 +137,48 @@ func (r *Root) ResolveID(id string) (string, error) { return objPath, nil } -// WalkObjectDirs returns an iterator that walks r's directory structure, -// yielding paths and directory entries for all objects. If an error is -// encountered, iteration terminates. The terminating error is accessed with the -// returned error function. -func (r *Root) WalkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) { - var walkErr error - dirs := func(yield func(string, []fs.DirEntry) bool) { - for dir, err := range r.walkDirs(ctx) { - if err != nil { - walkErr = err - break +// ObjectDeclarations returns an iterator that yields all OCFL object +// declaration files in r. If an error occurs during iteration, it is returned +// by the error function. +func (r *Root) ObjectDeclarations(ctx context.Context) (FileSeq, func() error) { + allFiles, errFn := WalkFiles(ctx, r.fs, r.dir) + decls := allFiles.Filter(func(f *FileRef) bool { return f.Namaste().IsObject() }) + return decls, errFn +} + +// Objects returns an iterator that yields objects or an error for every object +// declaration file in the root. +func (r *Root) Objects(ctx context.Context) iter.Seq2[*Object, error] { + return func(yield func(*Object, error) bool) { + decls, listErr := r.ObjectDeclarations(ctx) + for obj, err := range decls.OpenObjects(ctx) { + if !yield(obj, err) { + return } - if ParseObjectDir(dir.entries).HasNamaste() { - if !yield(dir.path, dir.entries) { - break - } + } + if err := listErr(); err != nil { + yield(nil, err) + } + } +} + +// ObjectsBatch returns an iterator that uses [FileSeq.OpenObjectsBatch] to open +// objects in the root in numgos separate goroutines, yielding the results +func (r *Root) ObjectsBatch(ctx context.Context, numgos int) iter.Seq2[*Object, error] { + return func(yield func(*Object, error) bool) { + allFiles, listErr := WalkFiles(ctx, r.fs, r.dir) + objs := allFiles. + Filter(func(f *FileRef) bool { return f.Namaste().IsObject() }). + OpenObjectsBatch(ctx, numgos) + for obj, err := range objs { + if !yield(obj, err) { + return } } + if err := listErr(); err != nil { + yield(nil, err) + } } - return dirs, func() error { return walkErr } } // Path returns the root's dir relative to its FS @@ -345,6 +367,7 @@ func writeExtensionConfig(ctx context.Context, fsys WriteFS, root string, config return nil } +// RootOption is used to configure the behavior of [NewRoot]() type RootOption func(*Root) type initRootArgs struct { @@ -365,6 +388,30 @@ func InitRoot(spec Spec, layoutDesc string, extensions ...extension.Extension) R } } +// TODO: export a function for iterating of object root's directory entries. +// +// WalkObjectDirs returns an iterator that walks r's directory structure, +// yielding paths and directory entries for all objects. If an error is +// encountered, iteration terminates. The terminating error is accessed with the +// returned error function. +func (r *Root) walkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) { + var walkErr error + dirs := func(yield func(string, []fs.DirEntry) bool) { + for dir, err := range r.walkDirs(ctx) { + if err != nil { + walkErr = err + break + } + if ParseObjectDir(dir.entries).HasNamaste() { + if !yield(dir.path, dir.entries) { + break + } + } + } + } + return dirs, func() error { return walkErr } +} + func (root *Root) walkDirs(ctx context.Context) iter.Seq2[*rootDirRef, error] { return func(yield func(*rootDirRef, error) bool) { root.walkDir(ctx, &rootDirRef{path: "."}, yield) diff --git a/root_test.go b/root_test.go index 58754ab6..d7ee4015 100644 --- a/root_test.go +++ b/root_test.go @@ -84,14 +84,28 @@ func TestRoot(t *testing.T) { root, err := ocfl.NewRoot(ctx, fsys, dir) be.NilErr(t, err) count := 0 - dirs, walkErr := root.WalkObjectDirs(ctx) - for dir := range dirs { - valid := root.ValidateObjectDir(ctx, dir) - be.NilErr(t, valid.Err()) + for obj, err := range root.Objects(ctx) { + be.NilErr(t, err) count++ + be.True(t, obj.Exists()) + } + be.Equal(t, 3, count) + }) + }) + + t.Run("ObjectsBatch", func(t *testing.T) { + t.Run("simple-root", func(t *testing.T) { + fsys := ocfl.DirFS(storeFixturePath) + dir := `1.0/good-stores/simple-root` + root, err := ocfl.NewRoot(ctx, fsys, dir) + be.NilErr(t, err) + count := 0 + for obj, err := range root.ObjectsBatch(ctx, 2) { + be.NilErr(t, err) + count++ + be.True(t, obj.Exists()) } be.Equal(t, 3, count) - be.NilErr(t, walkErr()) }) })