From d2b2e7d47d22220e3c5d9431c81b56da756acf7a Mon Sep 17 00:00:00 2001 From: flowerinthenight Date: Mon, 15 Jul 2024 10:27:55 +0900 Subject: [PATCH] sos: rename DistMem to SoS, or Spillover-Store --- cmd/demo/main.go | 32 ++-- hedge.go | 29 ++-- proto/v1/default.pb.go | 46 +++--- proto/v1/default.proto | 6 +- proto/v1/default_grpc.pb.go | 100 ++++++------- service.go | 20 +-- distmem.go => sos.go | 282 ++++++++++++++++++------------------ 7 files changed, 258 insertions(+), 257 deletions(-) rename distmem.go => sos.go (59%) diff --git a/cmd/demo/main.go b/cmd/demo/main.go index 91f7d1d..52724f0 100644 --- a/cmd/demo/main.go +++ b/cmd/demo/main.go @@ -294,7 +294,7 @@ func main() { w.Write([]byte("OK")) }) - mux.HandleFunc("/distmem", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/sos", func(w http.ResponseWriter, r *http.Request) { defer func(start time.Time) { slog.Info("distmem:", "duration", time.Since(start)) }(time.Now()) @@ -309,14 +309,14 @@ func main() { limit := 14_000 // 4 pods, all // limit := 2_500 - dm := func() *hedge.DistMem { - dm := op.NewDistMem(name, &hedge.DistMemOptions{ + sos := func() *hedge.SoS { + sos := op.NewSoS(name, &hedge.SoSOptions{ MemLimit: 150_000, DiskLimit: 120_000, Expiration: 30, }) - writer, err := dm.Writer() + writer, err := sos.Writer() if err != nil { slog.Error("Writer failed:", "err", err) return nil @@ -331,16 +331,16 @@ func main() { } slog.Info("write_dm:", "i", limit, "n", n, "write_err", writer.Err()) - return dm + return sos }() - if dm == nil { - slog.Error("failed in creating DistMem object") + if sos == nil { + slog.Error("failed in creating SoS object") return } func() { - reader, err := dm.Reader() + reader, err := sos.Reader() if err != nil { slog.Error(err.Error()) return @@ -384,11 +384,11 @@ func main() { slog.Info("read_dm:", "read_err", reader.Err()) }() - dm.Close() + sos.Close() w.Write([]byte("OK")) }) - mux.HandleFunc("/dmlocal", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/soslocal", func(w http.ResponseWriter, r *http.Request) { defer func(start time.Time) { slog.Info("distmem:", "duration", time.Since(start)) }(time.Now()) @@ -419,13 +419,13 @@ func main() { slog.Info("start distmem:", "name", name) - dm := func() *hedge.DistMem { - dm := op.NewDistMem(name, &hedge.DistMemOptions{ + sos := func() *hedge.SoS { + sos := op.NewSoS(name, &hedge.SoSOptions{ MemLimit: 100_000, Expiration: 30, }) - writer, err := dm.Writer() + writer, err := sos.Writer() if err != nil { slog.Error("Writer failed:", "err", err) return nil @@ -467,11 +467,11 @@ func main() { "err", writer.Err(), ) - return dm + return sos }() func() { - reader, _ := dm.Reader() + reader, _ := sos.Reader() out := make(chan []byte) eg := new(errgroup.Group) eg.Go(func() error { @@ -503,7 +503,7 @@ func main() { slog.Info("read_dm:", "read_err", reader.Err()) }() - dm.Close() + sos.Close() w.Write([]byte("OK")) }) diff --git a/hedge.go b/hedge.go index fc83bfa..0760d3b 100644 --- a/hedge.go +++ b/hedge.go @@ -219,8 +219,8 @@ type Op struct { broadcastStreamIn chan *StreamMessage broadcastStreamOut chan *StreamMessage - dmsLock *sync.Mutex - dms map[string]*DistMem // distributed memory + sosLock *sync.Mutex + soss map[string]*SoS // distributed memory *spindle.Lock // handles our distributed lock members map[string]struct{} // key=id @@ -551,18 +551,19 @@ func (op *Op) NewSemaphore(ctx context.Context, name string, limit int) (*Semaph return &Semaphore{name, limit, op}, nil } -// NewDistMem returns an object for writing data to distributed memory and -// disk across the cluster. The order of writing is local memory, local -// disk, other pod's memory, other pod's disk, etc. -func (op *Op) NewDistMem(name string, opts ...*DistMemOptions) *DistMem { - op.dmsLock.Lock() - defer op.dmsLock.Unlock() - if _, ok := op.dms[name]; ok { - return op.dms[name] +// NewSoS returns an object for writing data to spill-over +// storage across the cluster. The order of writing is local +// memory, local disk, other pod's memory, other pod's disk, +// and so on. +func (op *Op) NewSoS(name string, opts ...*SoSOptions) *SoS { + op.sosLock.Lock() + defer op.sosLock.Unlock() + if _, ok := op.soss[name]; ok { + return op.soss[name] } - op.dms[name] = newDistMem(name, op, opts...) - return op.dms[name] + op.soss[name] = newSoS(name, op, opts...) + return op.soss[name] } // Get reads a key (or keys) from Op. @@ -1321,8 +1322,8 @@ func New(client *spanner.Client, hostPort, lockTable, lockName, logTable string, members: make(map[string]struct{}), ensureCh: make(chan string), ensureDone: make(chan struct{}, 1), - dmsLock: &sync.Mutex{}, - dms: map[string]*DistMem{}, + sosLock: &sync.Mutex{}, + soss: map[string]*SoS{}, Lock: &spindle.Lock{}, // init later } diff --git a/proto/v1/default.pb.go b/proto/v1/default.pb.go index e440ad6..f6d41c6 100644 --- a/proto/v1/default.pb.go +++ b/proto/v1/default.pb.go @@ -89,7 +89,7 @@ var file_default_proto_rawDesc = []byte{ 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, - 0xd6, 0x02, 0x0a, 0x05, 0x48, 0x65, 0x64, 0x67, 0x65, 0x12, 0x3e, 0x0a, 0x04, 0x53, 0x65, 0x6e, + 0xd3, 0x02, 0x0a, 0x05, 0x48, 0x65, 0x64, 0x67, 0x65, 0x12, 0x3e, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, @@ -97,23 +97,23 @@ var file_default_proto_rawDesc = []byte{ 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, - 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x43, - 0x0a, 0x09, 0x44, 0x4d, 0x65, 0x6d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x17, 0x2e, 0x68, 0x65, - 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x00, 0x28, - 0x01, 0x30, 0x01, 0x12, 0x42, 0x0a, 0x08, 0x44, 0x4d, 0x65, 0x6d, 0x52, 0x65, 0x61, 0x64, 0x12, - 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, - 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, - 0x64, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x3f, 0x0a, 0x09, 0x44, 0x4d, 0x65, 0x6d, 0x43, - 0x6c, 0x6f, 0x73, 0x65, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, + 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x42, + 0x0a, 0x08, 0x53, 0x6f, 0x53, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, + 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x00, 0x28, 0x01, + 0x30, 0x01, 0x12, 0x41, 0x0a, 0x07, 0x53, 0x6f, 0x53, 0x52, 0x65, 0x61, 0x64, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x00, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x69, 0x6e, 0x74, - 0x68, 0x65, 0x6e, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, + 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x3e, 0x0a, 0x08, 0x53, 0x6f, 0x53, 0x43, 0x6c, 0x6f, 0x73, + 0x65, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, + 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x22, 0x00, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x69, 0x6e, 0x74, 0x68, 0x65, 0x6e, + 0x69, 0x67, 0x68, 0x74, 0x2f, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -137,14 +137,14 @@ var file_default_proto_depIdxs = []int32{ 1, // 0: hedge.proto.v1.Payload.meta:type_name -> hedge.proto.v1.Payload.MetaEntry 0, // 1: hedge.proto.v1.Hedge.Send:input_type -> hedge.proto.v1.Payload 0, // 2: hedge.proto.v1.Hedge.Broadcast:input_type -> hedge.proto.v1.Payload - 0, // 3: hedge.proto.v1.Hedge.DMemWrite:input_type -> hedge.proto.v1.Payload - 0, // 4: hedge.proto.v1.Hedge.DMemRead:input_type -> hedge.proto.v1.Payload - 0, // 5: hedge.proto.v1.Hedge.DMemClose:input_type -> hedge.proto.v1.Payload + 0, // 3: hedge.proto.v1.Hedge.SoSWrite:input_type -> hedge.proto.v1.Payload + 0, // 4: hedge.proto.v1.Hedge.SoSRead:input_type -> hedge.proto.v1.Payload + 0, // 5: hedge.proto.v1.Hedge.SoSClose:input_type -> hedge.proto.v1.Payload 0, // 6: hedge.proto.v1.Hedge.Send:output_type -> hedge.proto.v1.Payload 0, // 7: hedge.proto.v1.Hedge.Broadcast:output_type -> hedge.proto.v1.Payload - 0, // 8: hedge.proto.v1.Hedge.DMemWrite:output_type -> hedge.proto.v1.Payload - 0, // 9: hedge.proto.v1.Hedge.DMemRead:output_type -> hedge.proto.v1.Payload - 0, // 10: hedge.proto.v1.Hedge.DMemClose:output_type -> hedge.proto.v1.Payload + 0, // 8: hedge.proto.v1.Hedge.SoSWrite:output_type -> hedge.proto.v1.Payload + 0, // 9: hedge.proto.v1.Hedge.SoSRead:output_type -> hedge.proto.v1.Payload + 0, // 10: hedge.proto.v1.Hedge.SoSClose:output_type -> hedge.proto.v1.Payload 6, // [6:11] is the sub-list for method output_type 1, // [1:6] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name diff --git a/proto/v1/default.proto b/proto/v1/default.proto index 72f8e34..5194e97 100644 --- a/proto/v1/default.proto +++ b/proto/v1/default.proto @@ -7,9 +7,9 @@ option go_package = "github.com/flowerinthenight/hedge/proto/v1"; service Hedge { rpc Send(stream Payload) returns (stream Payload) {} rpc Broadcast(stream Payload) returns (stream Payload) {} - rpc DMemWrite(stream Payload) returns (stream Payload) {} - rpc DMemRead(stream Payload) returns (stream Payload) {} - rpc DMemClose(Payload) returns (Payload) {} + rpc SoSWrite(stream Payload) returns (stream Payload) {} + rpc SoSRead(stream Payload) returns (stream Payload) {} + rpc SoSClose(Payload) returns (Payload) {} } message Payload { diff --git a/proto/v1/default_grpc.pb.go b/proto/v1/default_grpc.pb.go index 45e6516..e195f9d 100644 --- a/proto/v1/default_grpc.pb.go +++ b/proto/v1/default_grpc.pb.go @@ -20,9 +20,9 @@ const _ = grpc.SupportPackageIsVersion7 type HedgeClient interface { Send(ctx context.Context, opts ...grpc.CallOption) (Hedge_SendClient, error) Broadcast(ctx context.Context, opts ...grpc.CallOption) (Hedge_BroadcastClient, error) - DMemWrite(ctx context.Context, opts ...grpc.CallOption) (Hedge_DMemWriteClient, error) - DMemRead(ctx context.Context, opts ...grpc.CallOption) (Hedge_DMemReadClient, error) - DMemClose(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error) + SoSWrite(ctx context.Context, opts ...grpc.CallOption) (Hedge_SoSWriteClient, error) + SoSRead(ctx context.Context, opts ...grpc.CallOption) (Hedge_SoSReadClient, error) + SoSClose(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error) } type hedgeClient struct { @@ -95,30 +95,30 @@ func (x *hedgeBroadcastClient) Recv() (*Payload, error) { return m, nil } -func (c *hedgeClient) DMemWrite(ctx context.Context, opts ...grpc.CallOption) (Hedge_DMemWriteClient, error) { - stream, err := c.cc.NewStream(ctx, &Hedge_ServiceDesc.Streams[2], "/hedge.proto.v1.Hedge/DMemWrite", opts...) +func (c *hedgeClient) SoSWrite(ctx context.Context, opts ...grpc.CallOption) (Hedge_SoSWriteClient, error) { + stream, err := c.cc.NewStream(ctx, &Hedge_ServiceDesc.Streams[2], "/hedge.proto.v1.Hedge/SoSWrite", opts...) if err != nil { return nil, err } - x := &hedgeDMemWriteClient{stream} + x := &hedgeSoSWriteClient{stream} return x, nil } -type Hedge_DMemWriteClient interface { +type Hedge_SoSWriteClient interface { Send(*Payload) error Recv() (*Payload, error) grpc.ClientStream } -type hedgeDMemWriteClient struct { +type hedgeSoSWriteClient struct { grpc.ClientStream } -func (x *hedgeDMemWriteClient) Send(m *Payload) error { +func (x *hedgeSoSWriteClient) Send(m *Payload) error { return x.ClientStream.SendMsg(m) } -func (x *hedgeDMemWriteClient) Recv() (*Payload, error) { +func (x *hedgeSoSWriteClient) Recv() (*Payload, error) { m := new(Payload) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -126,30 +126,30 @@ func (x *hedgeDMemWriteClient) Recv() (*Payload, error) { return m, nil } -func (c *hedgeClient) DMemRead(ctx context.Context, opts ...grpc.CallOption) (Hedge_DMemReadClient, error) { - stream, err := c.cc.NewStream(ctx, &Hedge_ServiceDesc.Streams[3], "/hedge.proto.v1.Hedge/DMemRead", opts...) +func (c *hedgeClient) SoSRead(ctx context.Context, opts ...grpc.CallOption) (Hedge_SoSReadClient, error) { + stream, err := c.cc.NewStream(ctx, &Hedge_ServiceDesc.Streams[3], "/hedge.proto.v1.Hedge/SoSRead", opts...) if err != nil { return nil, err } - x := &hedgeDMemReadClient{stream} + x := &hedgeSoSReadClient{stream} return x, nil } -type Hedge_DMemReadClient interface { +type Hedge_SoSReadClient interface { Send(*Payload) error Recv() (*Payload, error) grpc.ClientStream } -type hedgeDMemReadClient struct { +type hedgeSoSReadClient struct { grpc.ClientStream } -func (x *hedgeDMemReadClient) Send(m *Payload) error { +func (x *hedgeSoSReadClient) Send(m *Payload) error { return x.ClientStream.SendMsg(m) } -func (x *hedgeDMemReadClient) Recv() (*Payload, error) { +func (x *hedgeSoSReadClient) Recv() (*Payload, error) { m := new(Payload) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -157,9 +157,9 @@ func (x *hedgeDMemReadClient) Recv() (*Payload, error) { return m, nil } -func (c *hedgeClient) DMemClose(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error) { +func (c *hedgeClient) SoSClose(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error) { out := new(Payload) - err := c.cc.Invoke(ctx, "/hedge.proto.v1.Hedge/DMemClose", in, out, opts...) + err := c.cc.Invoke(ctx, "/hedge.proto.v1.Hedge/SoSClose", in, out, opts...) if err != nil { return nil, err } @@ -172,9 +172,9 @@ func (c *hedgeClient) DMemClose(ctx context.Context, in *Payload, opts ...grpc.C type HedgeServer interface { Send(Hedge_SendServer) error Broadcast(Hedge_BroadcastServer) error - DMemWrite(Hedge_DMemWriteServer) error - DMemRead(Hedge_DMemReadServer) error - DMemClose(context.Context, *Payload) (*Payload, error) + SoSWrite(Hedge_SoSWriteServer) error + SoSRead(Hedge_SoSReadServer) error + SoSClose(context.Context, *Payload) (*Payload, error) mustEmbedUnimplementedHedgeServer() } @@ -188,14 +188,14 @@ func (UnimplementedHedgeServer) Send(Hedge_SendServer) error { func (UnimplementedHedgeServer) Broadcast(Hedge_BroadcastServer) error { return status.Errorf(codes.Unimplemented, "method Broadcast not implemented") } -func (UnimplementedHedgeServer) DMemWrite(Hedge_DMemWriteServer) error { - return status.Errorf(codes.Unimplemented, "method DMemWrite not implemented") +func (UnimplementedHedgeServer) SoSWrite(Hedge_SoSWriteServer) error { + return status.Errorf(codes.Unimplemented, "method SoSWrite not implemented") } -func (UnimplementedHedgeServer) DMemRead(Hedge_DMemReadServer) error { - return status.Errorf(codes.Unimplemented, "method DMemRead not implemented") +func (UnimplementedHedgeServer) SoSRead(Hedge_SoSReadServer) error { + return status.Errorf(codes.Unimplemented, "method SoSRead not implemented") } -func (UnimplementedHedgeServer) DMemClose(context.Context, *Payload) (*Payload, error) { - return nil, status.Errorf(codes.Unimplemented, "method DMemClose not implemented") +func (UnimplementedHedgeServer) SoSClose(context.Context, *Payload) (*Payload, error) { + return nil, status.Errorf(codes.Unimplemented, "method SoSClose not implemented") } func (UnimplementedHedgeServer) mustEmbedUnimplementedHedgeServer() {} @@ -262,25 +262,25 @@ func (x *hedgeBroadcastServer) Recv() (*Payload, error) { return m, nil } -func _Hedge_DMemWrite_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(HedgeServer).DMemWrite(&hedgeDMemWriteServer{stream}) +func _Hedge_SoSWrite_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HedgeServer).SoSWrite(&hedgeSoSWriteServer{stream}) } -type Hedge_DMemWriteServer interface { +type Hedge_SoSWriteServer interface { Send(*Payload) error Recv() (*Payload, error) grpc.ServerStream } -type hedgeDMemWriteServer struct { +type hedgeSoSWriteServer struct { grpc.ServerStream } -func (x *hedgeDMemWriteServer) Send(m *Payload) error { +func (x *hedgeSoSWriteServer) Send(m *Payload) error { return x.ServerStream.SendMsg(m) } -func (x *hedgeDMemWriteServer) Recv() (*Payload, error) { +func (x *hedgeSoSWriteServer) Recv() (*Payload, error) { m := new(Payload) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err @@ -288,25 +288,25 @@ func (x *hedgeDMemWriteServer) Recv() (*Payload, error) { return m, nil } -func _Hedge_DMemRead_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(HedgeServer).DMemRead(&hedgeDMemReadServer{stream}) +func _Hedge_SoSRead_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HedgeServer).SoSRead(&hedgeSoSReadServer{stream}) } -type Hedge_DMemReadServer interface { +type Hedge_SoSReadServer interface { Send(*Payload) error Recv() (*Payload, error) grpc.ServerStream } -type hedgeDMemReadServer struct { +type hedgeSoSReadServer struct { grpc.ServerStream } -func (x *hedgeDMemReadServer) Send(m *Payload) error { +func (x *hedgeSoSReadServer) Send(m *Payload) error { return x.ServerStream.SendMsg(m) } -func (x *hedgeDMemReadServer) Recv() (*Payload, error) { +func (x *hedgeSoSReadServer) Recv() (*Payload, error) { m := new(Payload) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err @@ -314,20 +314,20 @@ func (x *hedgeDMemReadServer) Recv() (*Payload, error) { return m, nil } -func _Hedge_DMemClose_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Hedge_SoSClose_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Payload) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(HedgeServer).DMemClose(ctx, in) + return srv.(HedgeServer).SoSClose(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/hedge.proto.v1.Hedge/DMemClose", + FullMethod: "/hedge.proto.v1.Hedge/SoSClose", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HedgeServer).DMemClose(ctx, req.(*Payload)) + return srv.(HedgeServer).SoSClose(ctx, req.(*Payload)) } return interceptor(ctx, in, info, handler) } @@ -340,8 +340,8 @@ var Hedge_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*HedgeServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "DMemClose", - Handler: _Hedge_DMemClose_Handler, + MethodName: "SoSClose", + Handler: _Hedge_SoSClose_Handler, }, }, Streams: []grpc.StreamDesc{ @@ -358,14 +358,14 @@ var Hedge_ServiceDesc = grpc.ServiceDesc{ ClientStreams: true, }, { - StreamName: "DMemWrite", - Handler: _Hedge_DMemWrite_Handler, + StreamName: "SoSWrite", + Handler: _Hedge_SoSWrite_Handler, ServerStreams: true, ClientStreams: true, }, { - StreamName: "DMemRead", - Handler: _Hedge_DMemRead_Handler, + StreamName: "SoSRead", + Handler: _Hedge_SoSRead_Handler, ServerStreams: true, ClientStreams: true, }, diff --git a/service.go b/service.go index 19e9ca3..eac8025 100644 --- a/service.go +++ b/service.go @@ -109,7 +109,7 @@ func (s *service) Broadcast(hs pb.Hedge_BroadcastServer) error { return g.Wait() } -func (s *service) DMemWrite(hs pb.Hedge_DMemWriteServer) error { +func (s *service) SoSWrite(hs pb.Hedge_SoSWriteServer) error { var err error ctx := hs.Context() var writer *Writer @@ -136,11 +136,11 @@ loop: } name := in.Meta[metaName] - if _, ok := s.op.dms[name]; !ok { + if _, ok := s.op.soss[name]; !ok { mlimit, _ := strconv.ParseUint(in.Meta[metaMemLimit], 10, 64) dlimit, _ := strconv.ParseUint(in.Meta[metaDiskLimit], 10, 64) age, _ := strconv.ParseInt(in.Meta[metaExpire], 10, 64) - s.op.dms[name] = s.op.NewDistMem(name, &DistMemOptions{ + s.op.soss[name] = s.op.NewSoS(name, &SoSOptions{ MemLimit: mlimit, DiskLimit: dlimit, Expiration: age, @@ -148,7 +148,7 @@ loop: } if writer == nil { - writer, _ = s.op.dms[name].Writer(&writerOptions{ + writer, _ = s.op.soss[name].Writer(&writerOptions{ LocalOnly: true, }) } @@ -164,7 +164,7 @@ loop: return err } -func (s *service) DMemRead(hs pb.Hedge_DMemReadServer) error { +func (s *service) SoSRead(hs pb.Hedge_SoSReadServer) error { var err error in, err := hs.Recv() if err == io.EOF { @@ -177,18 +177,18 @@ func (s *service) DMemRead(hs pb.Hedge_DMemReadServer) error { } name := in.Meta[metaName] - if _, ok := s.op.dms[name]; !ok { + if _, ok := s.op.soss[name]; !ok { mlimit, _ := strconv.ParseUint(in.Meta[metaMemLimit], 10, 64) dlimit, _ := strconv.ParseUint(in.Meta[metaDiskLimit], 10, 64) age, _ := strconv.ParseInt(in.Meta[metaExpire], 10, 64) - s.op.dms[name] = s.op.NewDistMem(name, &DistMemOptions{ + s.op.soss[name] = s.op.NewSoS(name, &SoSOptions{ MemLimit: mlimit, DiskLimit: dlimit, Expiration: age, }) } - reader, _ := s.op.dms[name].Reader(&readerOptions{LocalOnly: true}) + reader, _ := s.op.soss[name].Reader(&readerOptions{LocalOnly: true}) out := make(chan []byte) eg := new(errgroup.Group) eg.Go(func() error { @@ -212,8 +212,8 @@ func (s *service) DMemRead(hs pb.Hedge_DMemReadServer) error { return nil } -func (s *service) DMemClose(ctx context.Context, in *pb.Payload) (*pb.Payload, error) { +func (s *service) SoSClose(ctx context.Context, in *pb.Payload) (*pb.Payload, error) { name := in.Meta[metaName] - s.op.dms[name].Close() + s.op.soss[name].Close() return &pb.Payload{}, nil } diff --git a/distmem.go b/sos.go similarity index 59% rename from distmem.go rename to sos.go index 226956a..4157eb0 100644 --- a/distmem.go +++ b/sos.go @@ -27,7 +27,7 @@ const ( ) var ( - errNoInit = fmt.Errorf("distmem: not properly initialized") + errNoInit = fmt.Errorf("sos: not properly initialized") ) type metaT struct { @@ -36,11 +36,11 @@ type metaT struct { grpc atomic.Int32 conn *grpc.ClientConn client pb.HedgeClient - writer pb.Hedge_DMemWriteClient - reader pb.Hedge_DMemReadClient + writer pb.Hedge_SoSWriteClient + reader pb.Hedge_SoSReadClient } -type DistMemOptions struct { +type SoSOptions struct { // MemLimit sets the memory limit in bytes to be used per node. MemLimit uint64 @@ -57,13 +57,13 @@ type memT struct { mlocs []int } -// DistMem represents an object for distributed memory read/writes. -// Useful only for load-process-discard types of data processing. +// SoS (Spillover-Store) represents an object for spill-over (or stitched) +// storage. Useful only for load-process-discard types of data processing. // See limitation below. // -// Limitation: At the moment, it's not allowed to reuse a name for -// DistMem once it's used and closed within hedge's lifetime. -type DistMem struct { +// Limitation: At the moment, it's not allowed to reuse a name for SOS +// once it's used and closed within hedge's lifetime. +type SoS struct { sync.Mutex Name string // the name of this instance @@ -91,7 +91,7 @@ type DistMem struct { type Writer struct { sync.Mutex lo bool // local write only - dm *DistMem + sos *SoS ch chan []byte on atomic.Int32 err error @@ -117,15 +117,15 @@ func (w *Writer) Close() { close(w.ch) <-w.done // wait for start() w.on.Store(0) - w.dm.wrefs.Add(-1) - w.dm.wmtx.Unlock() + w.sos.wrefs.Add(-1) + w.sos.wmtx.Unlock() } func (w *Writer) start() { defer func() { w.done <- struct{}{} }() w.on.Store(1) ctx := context.Background() - node := w.dm.nodes[0] + node := w.sos.nodes[0] var file *os.File var allCount int @@ -141,14 +141,14 @@ func (w *Writer) start() { allCount++ var err error var nextName string - msize := w.dm.meta[node].msize.Load() - mlimit := w.dm.mlimit.Load() - dsize := w.dm.meta[node].dsize.Load() - dlimit := w.dm.dlimit.Load() + msize := w.sos.meta[node].msize.Load() + mlimit := w.sos.mlimit.Load() + dsize := w.sos.meta[node].dsize.Load() + dlimit := w.sos.dlimit.Load() // Local (or next hop) is full. Go to the next node. if !w.lo && ((msize + dsize) >= (mlimit + dlimit)) { - nextName, node = w.dm.nextNode() + nextName, node = w.sos.nextNode() if nextName == "" { failCount++ w.Lock() @@ -157,7 +157,7 @@ func (w *Writer) start() { continue } - if w.dm.meta[node].grpc.Load() == 0 { + if w.sos.meta[node].grpc.Load() == 0 { err = func() error { host, port, _ := net.SplitHostPort(nextName) pi, _ := strconv.Atoi(port) @@ -165,18 +165,18 @@ func (w *Writer) start() { var opts []grpc.DialOption opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - w.dm.meta[node].conn, err = grpc.NewClient(nextName, opts...) + w.sos.meta[node].conn, err = grpc.NewClient(nextName, opts...) if err != nil { return fmt.Errorf("NewClient (%v) failed: %w", nextName, err) } - w.dm.meta[node].client = pb.NewHedgeClient(w.dm.meta[node].conn) - w.dm.meta[node].writer, err = w.dm.meta[node].client.DMemWrite(ctx) + w.sos.meta[node].client = pb.NewHedgeClient(w.sos.meta[node].conn) + w.sos.meta[node].writer, err = w.sos.meta[node].client.SoSWrite(ctx) if err != nil { return fmt.Errorf("DMemWrite (%v) failed: %w", nextName, err) } - w.dm.meta[node].grpc.Add(1) + w.sos.meta[node].grpc.Add(1) return nil }() @@ -189,14 +189,14 @@ func (w *Writer) start() { } switch { - case !w.lo && node != w.dm.me(): + case !w.lo && node != w.sos.me(): netCount++ - err := w.dm.meta[node].writer.Send(&pb.Payload{ + err := w.sos.meta[node].writer.Send(&pb.Payload{ Meta: map[string]string{ - metaName: w.dm.Name, - metaMemLimit: fmt.Sprintf("%v", w.dm.mlimit.Load()), - metaDiskLimit: fmt.Sprintf("%v", w.dm.dlimit.Load()), - metaExpire: fmt.Sprintf("%v", int64(w.dm.age.Seconds())), + metaName: w.sos.Name, + metaMemLimit: fmt.Sprintf("%v", w.sos.mlimit.Load()), + metaDiskLimit: fmt.Sprintf("%v", w.sos.dlimit.Load()), + metaExpire: fmt.Sprintf("%v", int64(w.sos.age.Seconds())), }, Data: data, }) @@ -207,37 +207,37 @@ func (w *Writer) start() { w.Unlock() } - w.dm.meta[node].msize.Add(uint64(len(data))) + w.sos.meta[node].msize.Add(uint64(len(data))) default: if msize < mlimit { memCount++ if !mlock { - w.dm.mlock.Lock() + w.sos.mlock.Lock() mlock = true } - if _, ok := w.dm.data[node]; !ok { - w.dm.data[node] = &memT{ + if _, ok := w.sos.data[node]; !ok { + w.sos.data[node] = &memT{ data: []byte{}, mlocs: []int{}, } } - w.dm.data[node].data = append(w.dm.data[node].data, data...) - w.dm.data[node].mlocs = append(w.dm.data[node].mlocs, len(data)) - w.dm.meta[node].msize.Add(uint64(len(data))) + w.sos.data[node].data = append(w.sos.data[node].data, data...) + w.sos.data[node].mlocs = append(w.sos.data[node].mlocs, len(data)) + w.sos.meta[node].msize.Add(uint64(len(data))) } else { diskCount++ if !dlock { - w.dm.dlock.Lock() + w.sos.dlock.Lock() dlock = true } if file == nil { flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC - file, err = os.OpenFile(w.dm.localFile(), flag, 0644) + file, err = os.OpenFile(w.sos.localFile(), flag, 0644) if err != nil { - w.dm.op.logger.Println("OpenFile failed:", err) + w.sos.op.logger.Println("OpenFile failed:", err) } } @@ -247,8 +247,8 @@ func (w *Writer) start() { w.err = fmt.Errorf("Write failed: %w", err) w.Unlock() } else { - w.dm.dlocs = append(w.dm.dlocs, n) - w.dm.meta[node].dsize.Add(uint64(n)) + w.sos.dlocs = append(w.sos.dlocs, n) + w.sos.meta[node].dsize.Add(uint64(n)) } } } @@ -262,27 +262,27 @@ func (w *Writer) start() { // "disk", diskCount, // "net", netCount, // "fail", failCount, - // "nodes", w.dm.nodes, + // "nodes", w.sos.nodes, // ) if mlock { - w.dm.mlock.Unlock() + w.sos.mlock.Unlock() } file.Sync() file.Close() if dlock { - w.dm.dlock.Unlock() + w.sos.dlock.Unlock() } nodes := []uint64{} - for k := range w.dm.meta { + for k := range w.sos.meta { nodes = append(nodes, k) } for _, n := range nodes { - if w.dm.meta[n].writer != nil { - w.dm.meta[n].writer.CloseSend() + if w.sos.meta[n].writer != nil { + w.sos.meta[n].writer.CloseSend() } } } @@ -291,36 +291,36 @@ type writerOptions struct { LocalOnly bool } -// Writer returns a writer object for writing data to DistMem. The -// caller needs to call writer.Close() after use. Options is only -// used internally, not exposed to callers. -func (dm *DistMem) Writer(opts ...*writerOptions) (*Writer, error) { - if dm.on.Load() == 0 { +// Writer returns a writer object for writing data to SoS. The +// caller needs to call writer.Close() after use. Options is +// only used internally, not exposed to callers. +func (sos *SoS) Writer(opts ...*writerOptions) (*Writer, error) { + if sos.on.Load() == 0 { return nil, errNoInit } - dm.wmtx.Lock() + sos.wmtx.Lock() var localOnly bool if len(opts) > 0 { localOnly = opts[0].LocalOnly } - dm.writer = &Writer{ + sos.writer = &Writer{ lo: localOnly, - dm: dm, + sos: sos, ch: make(chan []byte), done: make(chan struct{}, 1), } - go dm.writer.start() - dm.wrefs.Add(1) - return dm.writer, nil + go sos.writer.start() + sos.wrefs.Add(1) + return sos.writer, nil } type Reader struct { sync.Mutex lo bool // local read only - dm *DistMem + sos *SoS on atomic.Int32 err error done chan struct{} @@ -332,12 +332,12 @@ func (r *Reader) Read(out chan []byte) { eg.Go(func() error { r.on.Store(1) ctx := context.Background() - for _, node := range r.dm.nodes { + for _, node := range r.sos.nodes { var err error switch { - case !r.lo && node != r.dm.me(): + case !r.lo && node != r.sos.me(): func() { - r.dm.meta[node].reader, err = r.dm.meta[node].client.DMemRead(ctx) + r.sos.meta[node].reader, err = r.sos.meta[node].client.SoSRead(ctx) if err != nil { r.Lock() r.err = fmt.Errorf("DMemRead failed: %v", err) @@ -346,12 +346,12 @@ func (r *Reader) Read(out chan []byte) { } }() - err = r.dm.meta[node].reader.Send(&pb.Payload{ + err = r.sos.meta[node].reader.Send(&pb.Payload{ Meta: map[string]string{ - metaName: r.dm.Name, - metaMemLimit: fmt.Sprintf("%v", r.dm.mlimit.Load()), - metaDiskLimit: fmt.Sprintf("%v", r.dm.dlimit.Load()), - metaExpire: fmt.Sprintf("%v", int64(r.dm.age.Seconds())), + metaName: r.sos.Name, + metaMemLimit: fmt.Sprintf("%v", r.sos.mlimit.Load()), + metaDiskLimit: fmt.Sprintf("%v", r.sos.dlimit.Load()), + metaExpire: fmt.Sprintf("%v", int64(r.sos.age.Seconds())), }, }) @@ -363,7 +363,7 @@ func (r *Reader) Read(out chan []byte) { } for { - in, err := r.dm.meta[node].reader.Recv() + in, err := r.sos.meta[node].reader.Recv() if err == io.EOF { break } @@ -379,23 +379,23 @@ func (r *Reader) Read(out chan []byte) { } default: func() { - r.dm.mlock.Lock() - defer r.dm.mlock.Unlock() + r.sos.mlock.Lock() + defer r.sos.mlock.Unlock() var n int - for _, off := range r.dm.data[node].mlocs { - out <- r.dm.data[node].data[n : n+off] + for _, off := range r.sos.data[node].mlocs { + out <- r.sos.data[node].data[n : n+off] n += off } }() func() { - r.dm.dlock.Lock() - defer r.dm.dlock.Unlock() - if len(r.dm.dlocs) == 0 { + r.sos.dlock.Lock() + defer r.sos.dlock.Unlock() + if len(r.sos.dlocs) == 0 { return } - ra, err := mmap.Open(r.dm.localFile()) + ra, err := mmap.Open(r.sos.localFile()) if err != nil { r.Lock() r.err = fmt.Errorf("Open failed: %v", err) @@ -405,7 +405,7 @@ func (r *Reader) Read(out chan []byte) { defer ra.Close() var off int64 - for _, loc := range r.dm.dlocs { + for _, loc := range r.sos.dlocs { buf := make([]byte, loc) n, err := ra.ReadAt(buf, off) if err != nil { @@ -443,7 +443,7 @@ func (r *Reader) Close() { } <-r.done // wait for loop() - r.dm.rrefs.Add(-1) + r.sos.rrefs.Add(-1) r.on.Store(0) } @@ -451,11 +451,11 @@ type readerOptions struct { LocalOnly bool } -// Reader returns a reader object for reading data from DistMem. -// The caller needs to call reader.Close() after use. Options is -// only used internally, not exposed to callers. -func (dm *DistMem) Reader(opts ...*readerOptions) (*Reader, error) { - if dm.on.Load() == 0 { +// Reader returns a reader object for reading data from SoS. The +// caller needs to call reader.Close() after use. Options is only +// used internally, not exposed to callers. +func (sos *SoS) Reader(opts ...*readerOptions) (*Reader, error) { + if sos.on.Load() == 0 { return nil, errNoInit } @@ -466,102 +466,102 @@ func (dm *DistMem) Reader(opts ...*readerOptions) (*Reader, error) { reader := &Reader{ lo: localOnly, - dm: dm, + sos: sos, done: make(chan struct{}, 1), } - dm.rrefs.Add(1) + sos.rrefs.Add(1) return reader, nil } -// Close closes the DistMem object. -func (dm *DistMem) Close() { - if dm.on.Load() == 0 { +// Close closes the SoS object. +func (sos *SoS) Close() { + if sos.on.Load() == 0 { return } - dm.Lock() - defer dm.Unlock() + sos.Lock() + defer sos.Unlock() nodes := []uint64{} - for k := range dm.meta { + for k := range sos.meta { nodes = append(nodes, k) } ctx := context.Background() for _, n := range nodes { - if dm.meta[n].conn != nil { - dm.meta[n].client.DMemClose(ctx, &pb.Payload{ - Meta: map[string]string{metaName: dm.Name}, + if sos.meta[n].conn != nil { + sos.meta[n].client.SoSClose(ctx, &pb.Payload{ + Meta: map[string]string{metaName: sos.Name}, }) } } - dm.on.Store(0) + sos.on.Store(0) } -func (dm *DistMem) nextNode() (string, uint64) { +func (sos *SoS) nextNode() (string, uint64) { var mb string - members := dm.op.Members() + members := sos.op.Members() for _, member := range members { - nn := dm.hasher.Sum64([]byte(member)) - if nn == dm.me() { + nn := sos.hasher.Sum64([]byte(member)) + if nn == sos.me() { continue } - if _, ok := dm.data[nn]; ok { + if _, ok := sos.data[nn]; ok { continue } mb = member - dm.nodes = append(dm.nodes, nn) - dm.meta[nn] = &metaT{} - dm.data[nn] = &memT{data: []byte{}, mlocs: []int{}} + sos.nodes = append(sos.nodes, nn) + sos.meta[nn] = &metaT{} + sos.data[nn] = &memT{data: []byte{}, mlocs: []int{}} break } - return mb, dm.nodes[len(dm.nodes)-1] + return mb, sos.nodes[len(sos.nodes)-1] } -func (dm *DistMem) me() uint64 { return dm.hasher.Sum64([]byte(dm.op.Name())) } +func (sos *SoS) me() uint64 { return sos.hasher.Sum64([]byte(sos.op.Name())) } -func (dm *DistMem) localFile() string { - name1 := fmt.Sprintf("%v", dm.me()) - name2 := dm.hasher.Sum64([]byte(dm.Name)) +func (sos *SoS) localFile() string { + name1 := fmt.Sprintf("%v", sos.me()) + name2 := sos.hasher.Sum64([]byte(sos.Name)) return fmt.Sprintf("%v_%v.dat", name1, name2) } -func (dm *DistMem) cleaner() { +func (sos *SoS) cleaner() { eg := new(errgroup.Group) eg.Go(func() error { - started := dm.start + started := sos.start for { time.Sleep(time.Second * 5) - wrefs := dm.wrefs.Load() - rrefs := dm.rrefs.Load() + wrefs := sos.wrefs.Load() + rrefs := sos.rrefs.Load() if (wrefs + rrefs) > 0 { started = time.Now() continue } - if time.Since(started) > dm.age { + if time.Since(started) > sos.age { func() { // Cleanup memory area: - dm.op.dms[dm.Name].mlock.Lock() - dm.op.dms[dm.Name].mlock.Unlock() - for _, node := range dm.op.dms[dm.Name].nodes { - dm.op.dms[dm.Name].data[node].data = []byte{} + sos.op.soss[sos.Name].mlock.Lock() + sos.op.soss[sos.Name].mlock.Unlock() + for _, node := range sos.op.soss[sos.Name].nodes { + sos.op.soss[sos.Name].data[node].data = []byte{} } }() // Cleanup disk area: - dm.op.dms[dm.Name].dlock.Lock() - os.Remove(dm.localFile()) - dm.op.dms[dm.Name].dlock.Unlock() + sos.op.soss[sos.Name].dlock.Lock() + os.Remove(sos.localFile()) + sos.op.soss[sos.Name].dlock.Unlock() // Remove the main map entry: - dm.op.dmsLock.Lock() - delete(dm.op.dms, dm.Name) - dm.op.dmsLock.Unlock() + sos.op.sosLock.Lock() + delete(sos.op.soss, sos.Name) + sos.op.sosLock.Unlock() break } } @@ -572,8 +572,8 @@ func (dm *DistMem) cleaner() { eg.Wait() } -func newDistMem(name string, op *Op, opts ...*DistMemOptions) *DistMem { - dm := &DistMem{ +func newSoS(name string, op *Op, opts ...*SoSOptions) *SoS { + sos := &SoS{ Name: name, op: op, meta: make(map[uint64]*metaT), @@ -584,37 +584,37 @@ func newDistMem(name string, op *Op, opts ...*DistMemOptions) *DistMem { wmtx: &sync.Mutex{}, } - dm.on.Store(1) - dm.nodes = []uint64{dm.me()} - dm.meta[dm.me()] = &metaT{} - dm.data[dm.me()] = &memT{ + sos.on.Store(1) + sos.nodes = []uint64{sos.me()} + sos.meta[sos.me()] = &metaT{} + sos.data[sos.me()] = &memT{ data: []byte{}, mlocs: []int{}, } if len(opts) > 0 { - dm.mlimit.Store(opts[0].MemLimit) - dm.dlimit.Store(opts[0].DiskLimit) + sos.mlimit.Store(opts[0].MemLimit) + sos.dlimit.Store(opts[0].DiskLimit) if opts[0].Expiration > 0 { - dm.age = time.Second * time.Duration(opts[0].Expiration) + sos.age = time.Second * time.Duration(opts[0].Expiration) } } - if dm.mlimit.Load() == 0 { + if sos.mlimit.Load() == 0 { si := syscall.Sysinfo_t{} syscall.Sysinfo(&si) - dm.mlimit.Store(si.Freeram / 2) // half of free mem + sos.mlimit.Store(si.Freeram / 2) // half of free mem } - if dm.dlimit.Load() == 0 { - dm.dlimit.Store(1 << 30) // 1GB by default + if sos.dlimit.Load() == 0 { + sos.dlimit.Store(1 << 30) // 1GB by default } - if dm.age == 0 { - dm.age = time.Hour * 1 + if sos.age == 0 { + sos.age = time.Hour * 1 } - dm.start = time.Now() - go dm.cleaner() - return dm + sos.start = time.Now() + go sos.cleaner() + return sos }