Skip to content

Commit

Permalink
initial sectorbuilder FS refactor integration
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jan 28, 2020
1 parent eb4b85a commit 5af64c5
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 28 deletions.
6 changes: 3 additions & 3 deletions cmd/lotus-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func main() {
Miner: maddr,
SectorSize: sectorSize,
WorkerThreads: 2,
Dir: sbdir,
Paths: sectorbuilder.SimplePath(sbdir),
}

if robench == "" {
Expand Down Expand Up @@ -174,7 +174,7 @@ func main() {

r := rand.New(rand.NewSource(100 + int64(i)))

pi, err := sb.AddPiece(dataSize, i, r, nil)
pi, err := sb.AddPiece(context.TODO(), dataSize, i, r, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func main() {

if !c.Bool("skip-unseal") {
log.Info("Unsealing sector")
rc, err := sb.ReadPieceFromSealedSector(1, 0, dataSize, ticket.TicketBytes[:], commD[:])
rc, err := sb.ReadPieceFromSealedSector(context.TODO(), 1, 0, dataSize, ticket.TicketBytes[:], commD[:])
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion cmd/lotus-chainwatch/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
go subMpool(ctx, api, st)
go subBlocks(ctx, api, st)
}

}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-seal-worker/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, aut
SectorSize: ssize,
Miner: act,
WorkerThreads: 1,
Dir: repo,
Paths: sectorbuilder.SimplePath(repo),
})
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-seed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ var aggregateSectorDirsCmd = &cli.Command{
agsb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SectorSize: ssize,
Dir: destdir,
Paths: sectorbuilder.SimplePath(destdir),
WorkerThreads: 2,
}, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder")))
if err != nil {
Expand Down Expand Up @@ -257,7 +257,7 @@ var aggregateSectorDirsCmd = &cli.Command{
sb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SectorSize: genm.SectorSize,
Dir: dir,
Paths: sectorbuilder.SimplePath(dir),
WorkerThreads: 2,
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/lotus-seed/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
Miner: maddr,
SectorSize: ssize,
FallbackLastID: offset,
Dir: sbroot,
Paths: sectorbuilder.SimplePath(sbroot),
WorkerThreads: 2,
}

Expand All @@ -59,7 +59,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
return nil, err
}

pi, err := sb.AddPiece(size, sid, rand.Reader, nil)
pi, err := sb.AddPiece(context.TODO(), size, sid, rand.Reader, nil)
if err != nil {
return nil, err
}
Expand All @@ -76,7 +76,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
return nil, xerrors.Errorf("commit: %w", err)
}

if err := sb.TrimCache(sid); err != nil {
if err := sb.TrimCache(context.TODO(), sid); err != nil {
return nil, xerrors.Errorf("trim cache: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-storage-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ var initCmd = &cli.Command{
oldsb, err := sectorbuilder.New(&sectorbuilder.Config{
SectorSize: ssize,
WorkerThreads: 2,
Dir: pssb,
Paths: sectorbuilder.SimplePath(pssb),
}, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err)
Expand All @@ -186,7 +186,7 @@ var initCmd = &cli.Command{
nsb, err := sectorbuilder.New(&sectorbuilder.Config{
SectorSize: ssize,
WorkerThreads: 2,
Dir: lr.Path(),
Paths: sectorbuilder.SimplePath(lr.Path()),
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return xerrors.Errorf("failed to open up sectorbuilder: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi

replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0

replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder
39 changes: 27 additions & 12 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"mime"
"net/http"
"os"

"github.com/filecoin-project/lotus/api/apistruct"
"strconv"

"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage"
Expand Down Expand Up @@ -43,8 +44,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {

mux := mux.NewRouter()

mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT")
mux.HandleFunc("/remote/{type}/{id}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", sm.remotePutSector).Methods("PUT")

log.Infof("SERVEGETREMOTE %s", r.URL)

Expand All @@ -54,14 +55,21 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"])
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}

path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}

stat, err := os.Stat(path)
stat, err := os.Stat(string(path))
if err != nil {
log.Error(err)
w.WriteHeader(500)
Expand All @@ -70,10 +78,10 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques

var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(path)
rd, err = tarutil.TarDirectory(string(path))
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
Expand All @@ -92,7 +100,14 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"])
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}

path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id)
if err != nil {
log.Error(err)
w.WriteHeader(500)
Expand All @@ -106,21 +121,21 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
return
}

if err := os.RemoveAll(path); err != nil {
if err := os.RemoveAll(string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}

switch mediatype {
case "application/x-tar":
if err := tarutil.ExtractTar(r.Body, path); err != nil {
if err := tarutil.ExtractTar(r.Body, string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
default:
if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil {
if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
Expand Down
2 changes: 1 addition & 1 deletion node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit
NoPreCommit: noprecommit,
NoCommit: nocommit,

Dir: sp,
Paths: sectorbuilder.SimplePath(sp),
}

return sb, nil
Expand Down
4 changes: 2 additions & 2 deletions storage/sealing/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
out := make([]Piece, len(sizes))

for i, size := range sizes {
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
ppi, err := m.sb.AddPiece(ctx, size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
if err != nil {
return nil, err
return nil, xerrors.Errorf("add piece: %w", err)
}

existingPieceSizes = append(existingPieceSizes, size)
Expand Down
2 changes: 1 addition & 1 deletion storage/sealing/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, er
func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
log.Infof("Seal piece for deal %d", dealID)

ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []uint64{})
if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions storage/sectorblocks/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size)

r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector(
context.TODO(),
best.SectorID,
best.Offset,
best.Size,
Expand Down

0 comments on commit 5af64c5

Please sign in to comment.