Skip to content

Commit

Permalink
Merge pull request #1611 from 0chain/hotfix/reader-mem
Browse files Browse the repository at this point in the history
add mempool for reader and download blocks
  • Loading branch information
dabasov authored Sep 20, 2024
2 parents 030499e + ac0e501 commit 21ce33a
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 71 deletions.
4 changes: 4 additions & 0 deletions core/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"fmt"
"path"
"strings"

"github.com/valyala/bytebufferpool"
)

var MemPool bytebufferpool.Pool

func GetPathFields(p string) ([]string, error) {
if p == "" || p == "/" {
return nil, nil
Expand Down
27 changes: 26 additions & 1 deletion core/sys/fs_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"syscall/js"
"time"

"github.com/0chain/gosdk/core/common"
"github.com/0chain/gosdk/wasmsdk/jsbridge"
"github.com/valyala/bytebufferpool"
)

// MemFS implement file system on memory
type MemFS struct {
files map[string]*MemFile
dirs map[string]js.Value
sync.Mutex
}

// NewMemFS create MemFS instance
Expand All @@ -34,6 +38,8 @@ func NewMemFS() FS {
// descriptor has mode O_RDONLY.
// If there is an error, it will be of type *PathError.
func (mfs *MemFS) Open(name string) (File, error) {
mfs.Lock()
defer mfs.Unlock()
file := mfs.files[name]
if file != nil {
return file, nil
Expand All @@ -49,6 +55,8 @@ func (mfs *MemFS) Open(name string) (File, error) {
}

func (mfs *MemFS) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
mfs.Lock()
defer mfs.Unlock()
file := mfs.files[name]
if file != nil {
return file, nil
Expand All @@ -65,6 +73,8 @@ func (mfs *MemFS) OpenFile(name string, flag int, perm os.FileMode) (File, error

// ReadFile reads the file named by filename and returns the contents.
func (mfs *MemFS) ReadFile(name string) ([]byte, error) {
mfs.Lock()
defer mfs.Unlock()
file, ok := mfs.files[name]
if ok {
return file.Buffer, nil
Expand All @@ -77,7 +87,8 @@ func (mfs *MemFS) ReadFile(name string) ([]byte, error) {
func (mfs *MemFS) WriteFile(name string, data []byte, perm os.FileMode) error {
fileName := filepath.Base(name)
file := &MemFile{Name: fileName, Mode: perm, ModTime: time.Now()}

mfs.Lock()
defer mfs.Unlock()
mfs.files[name] = file

return nil
Expand All @@ -86,6 +97,18 @@ func (mfs *MemFS) WriteFile(name string, data []byte, perm os.FileMode) error {
// Remove removes the named file or (empty) directory.
// If there is an error, it will be of type *PathError.
func (mfs *MemFS) Remove(name string) error {
mfs.Lock()
defer mfs.Unlock()
f, ok := mfs.files[name]
if ok {
b := f.Buffer
if len(b) > 0 {
buff := &bytebufferpool.ByteBuffer{
B: b,
}
common.MemPool.Put(buff)
}
}
delete(mfs.files, name)
return nil
}
Expand All @@ -98,6 +121,8 @@ func (mfs *MemFS) MkdirAll(path string, perm os.FileMode) error {
// Stat returns a FileInfo describing the named file.
// If there is an error, it will be of type *PathError.
func (mfs *MemFS) Stat(name string) (fs.FileInfo, error) {
mfs.Lock()
defer mfs.Unlock()
file, ok := mfs.files[name]
if ok {
return file.Stat()
Expand Down
27 changes: 22 additions & 5 deletions core/sys/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"io/fs"
"os"
"time"

"github.com/0chain/gosdk/core/common"
"github.com/valyala/bytebufferpool"
)

// MemFile represents a file totally loaded in memory
Expand Down Expand Up @@ -52,7 +55,11 @@ func (f *MemFile) WriteAt(p []byte, offset int64) (n int, err error) {

// InitBuffer initializes the buffer with a specific size
func (f *MemFile) InitBuffer(size int) {
f.Buffer = make([]byte, size)
buff := common.MemPool.Get()
if cap(buff.B) < size {
buff.B = make([]byte, size)
}
f.Buffer = buff.B[:size]
}

// Sync not implemented
Expand Down Expand Up @@ -146,7 +153,7 @@ func (f *MemChanFile) Stat() (fs.FileInfo, error) {

// Read reads data from the file through the buffer channel
// It returns io.EOF when the buffer channel is closed.
// - p: file in bytes loaded from the buffer channel
// - p: file in bytes loaded from the buffer channel
func (f *MemChanFile) Read(p []byte) (int, error) {
select {
case err := <-f.ErrChan:
Expand All @@ -166,15 +173,19 @@ func (f *MemChanFile) Read(p []byte) (int, error) {
// Write writes data to the file through the buffer channel
// It writes the data to the buffer channel in chunks of ChunkWriteSize.
// If ChunkWriteSize is 0, it writes the data as a whole.
// - p: file in bytes to write to the buffer channel
// - p: file in bytes to write to the buffer channel
func (f *MemChanFile) Write(p []byte) (n int, err error) {
if f.ChunkWriteSize == 0 {
data := make([]byte, len(p))
copy(data, p)
f.Buffer <- data
} else {
if cap(f.data) == 0 {
f.data = make([]byte, 0, len(p))
bbuf := common.MemPool.Get()
if cap(bbuf.B) < len(p) {
bbuf.B = make([]byte, 0, len(p))
}
f.data = bbuf.B
}
f.data = append(f.data, p...)
}
Expand All @@ -193,7 +204,7 @@ func (f *MemChanFile) Sync() error {
}
f.Buffer <- f.data[current:end]
}
f.data = nil
f.data = f.data[:0]
return nil
}

Expand All @@ -205,6 +216,12 @@ func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) {
// Close closes the buffer channel
func (f *MemChanFile) Close() error {
close(f.Buffer)
if cap(f.data) > 0 {
bbuf := &bytebufferpool.ByteBuffer{
B: f.data,
}
common.MemPool.Put(bbuf)
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/herumi/bls-go-binary v1.31.0
github.com/hitenjain14/fasthttp v0.0.0-20240527123209-06019e79bff9
github.com/hitenjain14/fasthttp v0.0.0-20240916135632-f9303a91736c
github.com/influxdata/influxdb v1.8.3
github.com/klauspost/reedsolomon v1.11.8
github.com/lithammer/shortuuid/v3 v3.0.7
Expand Down Expand Up @@ -47,6 +47,7 @@ require (
github.com/hack-pad/go-webworkers v0.1.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/minio/sha256-simd v1.0.1
github.com/valyala/bytebufferpool v1.0.0
github.com/ybbus/jsonrpc/v3 v3.1.5
)

Expand Down Expand Up @@ -116,7 +117,6 @@ require (
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.dedis.ch/fixbuf v1.0.3 // indirect
Expand All @@ -143,4 +143,4 @@ require (
google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44 // indirect
)

//replace github.com/ethereum/go-ethereum => github.com/certifaction/go-ethereum v1.10.3-wasm
//replace github.com/ethereum/go-ethereum => github.com/certifaction/go-ethereum v1.10.3-wasm
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/herumi/bls-go-binary v1.31.0 h1:L1goQ2tMtGgpXCg5AwHAdJQpLs/pfnWWEc3Wog6OhmI=
github.com/herumi/bls-go-binary v1.31.0/go.mod h1:O4Vp1AfR4raRGwFeQpr9X/PQtncEicMoOe6BQt1oX0Y=
github.com/hitenjain14/fasthttp v0.0.0-20240527123209-06019e79bff9 h1:Z6Mu2JCsW2hbqx91L0HNPRPQ10RyAFvPocQHlrRo1Jk=
github.com/hitenjain14/fasthttp v0.0.0-20240527123209-06019e79bff9/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk=
github.com/hitenjain14/fasthttp v0.0.0-20240916135632-f9303a91736c h1:lDSIbcLu5TdT+uwb4wPzZgo1pQvKjP/tArL5QKjDJdI=
github.com/hitenjain14/fasthttp v0.0.0-20240916135632-f9303a91736c/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c h1:DZfsyhDK1hnSS5lH8l+JggqzEleHteTYfutAiVlSUM8=
Expand Down
18 changes: 9 additions & 9 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/core/common"
"github.com/0chain/gosdk/core/encryption"
"github.com/0chain/gosdk/core/pathutil"
"github.com/0chain/gosdk/core/sys"
"github.com/hack-pad/safejs"
Expand Down Expand Up @@ -777,10 +778,11 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
RemotePath: fullRemotePath,
CustomMeta: option.CustomMeta,
}
numBlocks := option.NumBlocks
if numBlocks <= 1 {
numBlocks = 100
}
// numBlocks := option.NumBlocks
// if numBlocks <= 1 {
// numBlocks = 100
// }
numBlocks := 60

options := []sdk.ChunkedUploadOption{
sdk.WithThumbnail(option.ThumbnailBytes.Buffer),
Expand Down Expand Up @@ -1016,10 +1018,8 @@ func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, s
statusBar = &StatusBar{wg: wg, totalBytesMap: make(map[string]int)}
)

fileName := strings.Replace(path.Base(remotePath), "/", "-", -1)
localPath := alloc.ID + "-" + fmt.Sprintf("%v-%s", startBlock, fileName)

fs, err := sys.Files.Open(localPath)
pathHash := encryption.FastHash(remotePath)
fs, err := sys.Files.Open(pathHash)
if err != nil {
return nil, fmt.Errorf("could not open local file: %v", err)
}
Expand All @@ -1029,7 +1029,7 @@ func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, s
return nil, fmt.Errorf("invalid memfile")
}

defer sys.Files.Remove(localPath) //nolint
defer sys.Files.Remove(pathHash) //nolint

wg.Add(1)
if authTicket != "" {
Expand Down
74 changes: 47 additions & 27 deletions wasmsdk/jsbridge/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ import (
"errors"
"io"
"syscall/js"

"github.com/0chain/gosdk/core/common"
"github.com/valyala/bytebufferpool"
)

type FileReader struct {
size int64
offset int64
readChunk js.Value
buf []byte
bufOffset int
endOfFile bool
size int64
offset int64
readChunk js.Value
buf []byte
bufOffset int
chunkReadSize int64
endOfFile bool
}

const (
Expand All @@ -24,34 +28,19 @@ const (

func NewFileReader(readChunkFuncName string, fileSize, chunkReadSize int64) (*FileReader, error) {
readChunk := js.Global().Get(readChunkFuncName)
var buf []byte
if bufferSize > fileSize {
buf = make([]byte, fileSize)
} else {
bufSize := (chunkReadSize * (bufferSize / chunkReadSize))
buf = make([]byte, bufSize)
}
result, err := Await(readChunk.Invoke(0, len(buf)))
if len(err) > 0 && !err[0].IsNull() {
return nil, errors.New("file_reader: " + err[0].String())
}
chunk := result[0]
n := js.CopyBytesToGo(buf, chunk)
if n < len(buf) {
return nil, errors.New("file_reader: failed to read first chunk")
}
return &FileReader{
size: fileSize,
offset: int64(n),
readChunk: readChunk,
buf: buf,
endOfFile: n == int(fileSize),
size: fileSize,
readChunk: readChunk,
chunkReadSize: chunkReadSize,
}, nil
}

func (r *FileReader) Read(p []byte) (int, error) {
//js.Value doesn't work in parallel invoke
size := len(p)
if len(r.buf) == 0 && !r.endOfFile {
r.initBuffer()
}

if len(r.buf)-r.bufOffset < size && !r.endOfFile {
r.bufOffset = 0 //reset buffer offset
Expand All @@ -74,12 +63,43 @@ func (r *FileReader) Read(p []byte) (int, error) {
n := copy(p, r.buf[r.bufOffset:])
r.bufOffset += n
if r.endOfFile && r.bufOffset == len(r.buf) {
buff := &bytebufferpool.ByteBuffer{
B: r.buf,
}
common.MemPool.Put(buff)
return n, io.EOF
}

return n, nil
}

func (r *FileReader) initBuffer() error {
bufSize := r.size
if bufferSize < bufSize {
bufSize = (r.chunkReadSize * (bufferSize / r.chunkReadSize))
}
buff := common.MemPool.Get()
if cap(buff.B) < int(bufSize) {
buff.B = make([]byte, bufSize)
}
r.buf = buff.B[:bufSize]
result, err := Await(r.readChunk.Invoke(0, len(r.buf)))

if len(err) > 0 && !err[0].IsNull() {
return errors.New("file_reader: " + err[0].String())
}

chunk := result[0]

n := js.CopyBytesToGo(r.buf, chunk)
r.offset += int64(n)
if n < len(r.buf) {
r.buf = r.buf[:n]
}
r.endOfFile = len(r.buf) == int(r.size)
return nil
}

func (r *FileReader) Seek(offset int64, whence int) (int64, error) {

var abs int64
Expand Down
8 changes: 4 additions & 4 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func main() {

setWallet(clientID, clientKey, peerPublicKey, publicKey, privateKey, mnemonic, isSplit)
hideLogs()
debug.SetGCPercent(40)
debug.SetMemoryLimit(300 * 1024 * 1024) //300MB
debug.SetGCPercent(75)
debug.SetMemoryLimit(1 * 1024 * 1024 * 1024) //1GB
err = startListener(respChan)
if err != nil {
fmt.Println("Error starting listener", err)
Expand All @@ -491,8 +491,8 @@ func main() {
}

hideLogs()
debug.SetGCPercent(40)
debug.SetMemoryLimit(2.5 * 1024 * 1024 * 1024) //2.5 GB
debug.SetGCPercent(75)
debug.SetMemoryLimit(3.5 * 1024 * 1024 * 1024) //3.5 GB

<-make(chan bool)

Expand Down
Loading

0 comments on commit 21ce33a

Please sign in to comment.