Skip to content

Commit

Permalink
plumbing: format/packfile, performance optimizations for reading larg…
Browse files Browse the repository at this point in the history
…e commit histories (src-d#963)

Signed-off-by: Filip Navara <[email protected]>
  • Loading branch information
filipnavara authored and mcuadros committed Nov 28, 2018
1 parent 3dbfb89 commit 8f52c50
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 61 deletions.
2 changes: 1 addition & 1 deletion plumbing/format/packfile/fsobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewFSObject(
// Reader implements the plumbing.EncodedObject interface.
func (o *FSObject) Reader() (io.ReadCloser, error) {
obj, ok := o.cache.Get(o.hash)
if ok {
if ok && obj != o {
reader, err := obj.Reader()
if err != nil {
return nil, err
Expand Down
51 changes: 28 additions & 23 deletions plumbing/format/packfile/packfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ var (
ErrZLib = NewError("zlib reading error")
)

// When reading small objects from packfile it is beneficial to do so at
// once to exploit the buffered I/O. In many cases the objects are so small
// that they were already loaded to memory when the object header was
// loaded from the packfile. Wrapping in FSObject would cause this buffered
// data to be thrown away and then re-read later, with the additional
// seeking causing reloads from disk. Objects smaller than this threshold
// are now always read into memory and stored in cache instead of being
// wrapped in FSObject.
const smallObjectThreshold = 16 * 1024

// Packfile allows retrieving information from inside a packfile.
type Packfile struct {
idxfile.Index
Expand Down Expand Up @@ -79,15 +89,7 @@ func (p *Packfile) GetByOffset(o int64) (plumbing.EncodedObject, error) {
}
}

if _, err := p.s.SeekFromStart(o); err != nil {
if err == io.EOF || isInvalid(err) {
return nil, plumbing.ErrObjectNotFound
}

return nil, err
}

return p.nextObject()
return p.objectAtOffset(o)
}

// GetSizeByOffset retrieves the size of the encoded object from the
Expand All @@ -108,6 +110,12 @@ func (p *Packfile) GetSizeByOffset(o int64) (size int64, err error) {
return h.Length, nil
}

func (p *Packfile) objectHeaderAtOffset(offset int64) (*ObjectHeader, error) {
h, err := p.s.SeekObjectHeader(offset)
p.s.pendingObject = nil
return h, err
}

func (p *Packfile) nextObjectHeader() (*ObjectHeader, error) {
h, err := p.s.NextObjectHeader()
p.s.pendingObject = nil
Expand Down Expand Up @@ -154,11 +162,7 @@ func (p *Packfile) getObjectType(h *ObjectHeader) (typ plumbing.ObjectType, err
if baseType, ok := p.offsetToType[offset]; ok {
typ = baseType
} else {
if _, err = p.s.SeekFromStart(offset); err != nil {
return
}

h, err = p.nextObjectHeader()
h, err = p.objectHeaderAtOffset(offset)
if err != nil {
return
}
Expand All @@ -175,8 +179,8 @@ func (p *Packfile) getObjectType(h *ObjectHeader) (typ plumbing.ObjectType, err
return
}

func (p *Packfile) nextObject() (plumbing.EncodedObject, error) {
h, err := p.nextObjectHeader()
func (p *Packfile) objectAtOffset(offset int64) (plumbing.EncodedObject, error) {
h, err := p.objectHeaderAtOffset(offset)
if err != nil {
if err == io.EOF || isInvalid(err) {
return nil, plumbing.ErrObjectNotFound
Expand All @@ -190,6 +194,13 @@ func (p *Packfile) nextObject() (plumbing.EncodedObject, error) {
return p.getNextObject(h)
}

// If the object is not a delta and it's small enough then read it
// completely into memory now since it is already read from disk
// into buffer anyway.
if h.Length <= smallObjectThreshold && h.Type != plumbing.OFSDeltaObject && h.Type != plumbing.REFDeltaObject {
return p.getNextObject(h)
}

hash, err := p.FindHash(h.Offset)
if err != nil {
return nil, err
Expand Down Expand Up @@ -233,11 +244,7 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {
}
}

if _, err := p.s.SeekFromStart(offset); err != nil {
return nil, err
}

h, err := p.nextObjectHeader()
h, err := p.objectHeaderAtOffset(offset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -329,8 +336,6 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset
if err != nil {
return err
}

p.cachePut(base)
}

obj.SetType(base.Type())
Expand Down
6 changes: 1 addition & 5 deletions plumbing/format/packfile/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,7 @@ func (p *Parser) readData(o *objectInfo) ([]byte, error) {
return data, nil
}

if _, err := p.scanner.SeekFromStart(o.Offset); err != nil {
return nil, err
}

if _, err := p.scanner.NextObjectHeader(); err != nil {
if _, err := p.scanner.SeekObjectHeader(o.Offset); err != nil {
return nil, err
}

Expand Down
46 changes: 42 additions & 4 deletions plumbing/format/packfile/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,52 @@ func (s *Scanner) readCount() (uint32, error) {
return binary.ReadUint32(s.r)
}

// SeekObjectHeader seeks to specified offset and returns the ObjectHeader
// for the next object in the reader
func (s *Scanner) SeekObjectHeader(offset int64) (*ObjectHeader, error) {
// if seeking we assume that you are not interested in the header
if s.version == 0 {
s.version = VersionSupported
}

if _, err := s.r.Seek(offset, io.SeekStart); err != nil {
return nil, err
}

h, err := s.nextObjectHeader()
if err != nil {
return nil, err
}

h.Offset = offset
return h, nil
}

// NextObjectHeader returns the ObjectHeader for the next object in the reader
func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) {
defer s.Flush()

if err := s.doPending(); err != nil {
return nil, err
}

offset, err := s.r.Seek(0, io.SeekCurrent)
if err != nil {
return nil, err
}

h, err := s.nextObjectHeader()
if err != nil {
return nil, err
}

h.Offset = offset
return h, nil
}

// nextObjectHeader returns the ObjectHeader for the next object in the reader
// without the Offset field
func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) {
defer s.Flush()

s.crc.Reset()

h := &ObjectHeader{}
Expand Down Expand Up @@ -308,7 +346,7 @@ var byteSlicePool = sync.Pool{
// SeekFromStart sets a new offset from start, returns the old position before
// the change.
func (s *Scanner) SeekFromStart(offset int64) (previous int64, err error) {
// if seeking we assume that you are not interested on the header
// if seeking we assume that you are not interested in the header
if s.version == 0 {
s.version = VersionSupported
}
Expand Down Expand Up @@ -385,7 +423,7 @@ type bufferedSeeker struct {
}

func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent {
if whence == io.SeekCurrent && offset == 0 {
current, err := r.r.Seek(offset, whence)
if err != nil {
return current, err
Expand Down
17 changes: 17 additions & 0 deletions plumbing/format/packfile/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ func (s *ScannerSuite) TestNextObjectHeaderWithOutReadObjectNonSeekable(c *C) {
c.Assert(n, Equals, f.PackfileHash)
}

func (s *ScannerSuite) TestSeekObjectHeader(c *C) {
r := fixtures.Basic().One().Packfile()
p := NewScanner(r)

h, err := p.SeekObjectHeader(expectedHeadersOFS[4].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[4])
}

func (s *ScannerSuite) TestSeekObjectHeaderNonSeekable(c *C) {
r := io.MultiReader(fixtures.Basic().One().Packfile())
p := NewScanner(r)

_, err := p.SeekObjectHeader(expectedHeadersOFS[4].Offset)
c.Assert(err, Equals, ErrSeekNotSupported)
}

var expectedHeadersOFS = []ObjectHeader{
{Type: plumbing.CommitObject, Offset: 12, Length: 254},
{Type: plumbing.OFSDeltaObject, Offset: 186, Length: 93, OffsetReference: 12},
Expand Down
59 changes: 36 additions & 23 deletions storage/filesystem/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@ import (
type ObjectStorage struct {
options Options

// deltaBaseCache is an object cache uses to cache delta's bases when
deltaBaseCache cache.Object
// objectCache is an object cache uses to cache delta's bases and also recently
// loaded loose objects
objectCache cache.Object

dir *dotgit.DotGit
index map[plumbing.Hash]idxfile.Index
}

// NewObjectStorage creates a new ObjectStorage with the given .git directory and cache.
func NewObjectStorage(dir *dotgit.DotGit, cache cache.Object) *ObjectStorage {
return NewObjectStorageWithOptions(dir, cache, Options{})
func NewObjectStorage(dir *dotgit.DotGit, objectCache cache.Object) *ObjectStorage {
return NewObjectStorageWithOptions(dir, objectCache, Options{})
}

// NewObjectStorageWithOptions creates a new ObjectStorage with the given .git directory, cache and extra options
func NewObjectStorageWithOptions(dir *dotgit.DotGit, cache cache.Object, ops Options) *ObjectStorage {
func NewObjectStorageWithOptions(dir *dotgit.DotGit, objectCache cache.Object, ops Options) *ObjectStorage {
return &ObjectStorage{
options: ops,
deltaBaseCache: cache,
dir: dir,
options: ops,
objectCache: objectCache,
dir: dir,
}
}

Expand Down Expand Up @@ -206,7 +207,7 @@ func (s *ObjectStorage) encodedObjectSizeFromPackfile(h plumbing.Hash) (
idx := s.index[pack]
hash, err := idx.FindHash(offset)
if err == nil {
obj, ok := s.deltaBaseCache.Get(hash)
obj, ok := s.objectCache.Get(hash)
if ok {
return obj.Size(), nil
}
Expand All @@ -215,8 +216,8 @@ func (s *ObjectStorage) encodedObjectSizeFromPackfile(h plumbing.Hash) (
}

var p *packfile.Packfile
if s.deltaBaseCache != nil {
p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.deltaBaseCache)
if s.objectCache != nil {
p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.objectCache)
} else {
p = packfile.NewPackfile(idx, s.dir.Fs(), f)
}
Expand All @@ -241,9 +242,19 @@ func (s *ObjectStorage) EncodedObjectSize(h plumbing.Hash) (
// EncodedObject returns the object with the given hash, by searching for it in
// the packfile and the git object directories.
func (s *ObjectStorage) EncodedObject(t plumbing.ObjectType, h plumbing.Hash) (plumbing.EncodedObject, error) {
obj, err := s.getFromUnpacked(h)
if err == plumbing.ErrObjectNotFound {
var obj plumbing.EncodedObject
var err error

if s.index != nil {
obj, err = s.getFromPackfile(h, false)
if err == plumbing.ErrObjectNotFound {
obj, err = s.getFromUnpacked(h)
}
} else {
obj, err = s.getFromUnpacked(h)
if err == plumbing.ErrObjectNotFound {
obj, err = s.getFromPackfile(h, false)
}
}

// If the error is still object not found, check if it's a shared object
Expand All @@ -254,7 +265,7 @@ func (s *ObjectStorage) EncodedObject(t plumbing.ObjectType, h plumbing.Hash) (p
// Create a new object storage with the DotGit(s) and check for the
// required hash object. Skip when not found.
for _, dg := range dotgits {
o := NewObjectStorage(dg, s.deltaBaseCache)
o := NewObjectStorage(dg, s.objectCache)
enobj, enerr := o.EncodedObject(t, h)
if enerr != nil {
continue
Expand Down Expand Up @@ -296,6 +307,10 @@ func (s *ObjectStorage) DeltaObject(t plumbing.ObjectType,
}

func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedObject, err error) {
if cacheObj, found := s.objectCache.Get(h); found {
return cacheObj, nil
}

f, err := s.dir.Object(h)
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -327,6 +342,8 @@ func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedOb
return nil, err
}

s.objectCache.Put(obj)

_, err = io.Copy(w, r)
return obj, err
}
Expand Down Expand Up @@ -369,7 +386,7 @@ func (s *ObjectStorage) decodeObjectAt(
) (plumbing.EncodedObject, error) {
hash, err := idx.FindHash(offset)
if err == nil {
obj, ok := s.deltaBaseCache.Get(hash)
obj, ok := s.objectCache.Get(hash)
if ok {
return obj, nil
}
Expand All @@ -380,8 +397,8 @@ func (s *ObjectStorage) decodeObjectAt(
}

var p *packfile.Packfile
if s.deltaBaseCache != nil {
p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.deltaBaseCache)
if s.objectCache != nil {
p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.objectCache)
} else {
p = packfile.NewPackfile(idx, s.dir.Fs(), f)
}
Expand All @@ -400,11 +417,7 @@ func (s *ObjectStorage) decodeDeltaObjectAt(
}

p := packfile.NewScanner(f)
if _, err := p.SeekFromStart(offset); err != nil {
return nil, err
}

header, err := p.NextObjectHeader()
header, err := p.SeekObjectHeader(offset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -495,7 +508,7 @@ func (s *ObjectStorage) buildPackfileIters(
}
return newPackfileIter(
s.dir.Fs(), pack, t, seen, s.index[h],
s.deltaBaseCache, s.options.KeepDescriptors,
s.objectCache, s.options.KeepDescriptors,
)
},
}, nil
Expand Down
6 changes: 1 addition & 5 deletions storage/filesystem/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ func NewStorageWithOptions(fs billy.Filesystem, cache cache.Object, ops Options)
fs: fs,
dir: dir,

ObjectStorage: ObjectStorage{
options: ops,
deltaBaseCache: cache,
dir: dir,
},
ObjectStorage: *NewObjectStorageWithOptions(dir, cache, ops),
ReferenceStorage: ReferenceStorage{dir: dir},
IndexStorage: IndexStorage{dir: dir},
ShallowStorage: ShallowStorage{dir: dir},
Expand Down

0 comments on commit 8f52c50

Please sign in to comment.