From 56793db16a11698c7776aac1d898fab25f794037 Mon Sep 17 00:00:00 2001 From: Russell Tran Date: Sun, 2 Jun 2024 18:26:06 -0700 Subject: [PATCH] Implement flush to disk but not yet cache miss --- go/cmd/gcsobjtable/main.go | 34 +++++++++++++-- go/cmd/gcsobjtable/main_test.go | 76 ++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/go/cmd/gcsobjtable/main.go b/go/cmd/gcsobjtable/main.go index 613d9045..0f27eb8b 100644 --- a/go/cmd/gcsobjtable/main.go +++ b/go/cmd/gcsobjtable/main.go @@ -38,12 +38,11 @@ func main() { } _ = lis s := grpc.NewServer() - pb.RegisterGCSObjServer(s, NewGCSObjServer()) + pb.RegisterGCSObjServer(s, NewGCSObjServer(cfg.GCS.FlushIntervalSec)) log.Printf("server listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } - } // server is used to implement your gRPC service. @@ -53,9 +52,11 @@ type GCSObjServer struct { waitlist map[uint64][]string // object uid -> list of IP addresses as string mu sync.Mutex // lock should be used for both objectLocations and waitlist database *sql.DB // connection to SQLite persistent datastore + ticker *time.Ticker // for GCS flushing } -func NewGCSObjServer() *GCSObjServer { +/* set flushIntervalSec to -1 to disable GCS flushing */ +func NewGCSObjServer(flushIntervalSec int) *GCSObjServer { /* Set up SQLite */ // Note: You don't need to call database.Close() in Golang: https://stackoverflow.com/a/50788205 database, err := sql.Open("sqlite3", "./gcsobjtable.db") // TODO: Remove hardcode to config @@ -71,10 +72,37 @@ func NewGCSObjServer() *GCSObjServer { waitlist: make(map[uint64][]string), mu: sync.Mutex{}, database: database, + ticker: nil, + } + if flushIntervalSec != -1 { + // Launch periodic disk flushing + interval := time.Duration(flushIntervalSec) * time.Second + server.ticker = time.NewTicker(interval) + go func() { + for range server.ticker.C { + err := server.flushToDisk() + if err != nil { + log.Printf("Error flushing to disk: %v", err) + } + } + }() } return server } +func (s *GCSObjServer) flushToDisk() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := insertOrUpdateObjectLocations(s.database, s.objectLocations) + if err != nil { + return err + } + // Completely delete the current map in memory and start blank + s.objectLocations = make(map[uint64][]uint64) // orphaning the old map will get it garbage collected in Go + return nil +} + /* Returns a nodeId that has object uid. If it doesn't exist anywhere, then the second return value will be false. diff --git a/go/cmd/gcsobjtable/main_test.go b/go/cmd/gcsobjtable/main_test.go index 2e890a82..8ccf785b 100644 --- a/go/cmd/gcsobjtable/main_test.go +++ b/go/cmd/gcsobjtable/main_test.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "database/sql" "fmt" "log" "math/rand" @@ -27,7 +28,7 @@ func init() { cfg = config.GetConfig() // Load configuration lis = bufconn.Listen(bufSize) s := grpc.NewServer() - pb.RegisterGCSObjServer(s, NewGCSObjServer()) + pb.RegisterGCSObjServer(s, NewGCSObjServer(-1)) go func() { if err := s.Serve(lis); err != nil { log.Fatalf("Server exited with error: %v", err) @@ -93,7 +94,7 @@ func TestGetNodeId(t *testing.T) { // Test the getNodeId function func TestGetNodeId_2(t *testing.T) { // Initialize the server - server := NewGCSObjServer() + server := NewGCSObjServer(-1) // Seed the random number generator to produce consistent results rand.Seed(time.Now().UnixNano()) @@ -566,3 +567,74 @@ func contains(slice []uint64, value uint64) bool { } return false } + +func TestFlushToDisk(t *testing.T) { + // Set up in-memory SQLite database for testing + database, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("Failed to open SQLite database: %v", err) + } + defer database.Close() + + // Create the table schema + createObjectLocationsTable(database) + + // Create the server object with a short flush interval for testing + server := &GCSObjServer{ + objectLocations: make(map[uint64][]uint64), + waitlist: make(map[uint64][]string), + mu: sync.Mutex{}, + database: database, + ticker: nil, + } + + // Populate the objectLocations map + server.objectLocations[1] = []uint64{100, 101, 102} + server.objectLocations[2] = []uint64{200, 201} + + // Call the flushToDisk method + err = server.flushToDisk() + if err != nil { + t.Fatalf("flushToDisk failed: %v", err) + } + + // Verify the data has been written to the database + rows, err := database.Query("SELECT object_uid, node_id FROM object_locations") + if err != nil { + t.Fatalf("Failed to query database: %v", err) + } + defer rows.Close() + + // Read the results + results := make(map[uint64][]uint64) + for rows.Next() { + var objectUID uint64 + var nodeID uint64 + if err := rows.Scan(&objectUID, &nodeID); err != nil { + t.Fatalf("Failed to scan row: %v", err) + } + results[objectUID] = append(results[objectUID], nodeID) + } + + // Expected results + expectedResults := map[uint64][]uint64{ + 1: {100, 101, 102}, + 2: {200, 201}, + } + + // Compare the results + if len(results) != len(expectedResults) { + t.Fatalf("Expected %d results, got %d", len(expectedResults), len(results)) + } + + for objectUID, nodeIDs := range expectedResults { + if len(results[objectUID]) != len(nodeIDs) { + t.Fatalf("Expected %d nodeIDs for object %d, got %d", len(nodeIDs), objectUID, len(results[objectUID])) + } + for i, nodeID := range nodeIDs { + if results[objectUID][i] != nodeID { + t.Fatalf("Expected nodeID %d for object %d, got %d", nodeID, objectUID, results[objectUID][i]) + } + } + } +}