Skip to content

Commit

Permalink
Implement flush to disk but not yet cache miss
Browse files Browse the repository at this point in the history
  • Loading branch information
Russell-Tran committed Jun 3, 2024
1 parent 4484dee commit 56793db
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
34 changes: 31 additions & 3 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ func main() {
}
_ = lis
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
pb.RegisterGCSObjServer(s, NewGCSObjServer(cfg.GCS.FlushIntervalSec))
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}

}

// server is used to implement your gRPC service.
Expand All @@ -53,9 +52,11 @@ type GCSObjServer struct {
waitlist map[uint64][]string // object uid -> list of IP addresses as string
mu sync.Mutex // lock should be used for both objectLocations and waitlist
database *sql.DB // connection to SQLite persistent datastore
ticker *time.Ticker // for GCS flushing
}

func NewGCSObjServer() *GCSObjServer {
/* set flushIntervalSec to -1 to disable GCS flushing */
func NewGCSObjServer(flushIntervalSec int) *GCSObjServer {
/* Set up SQLite */
// Note: You don't need to call database.Close() in Golang: https://stackoverflow.com/a/50788205
database, err := sql.Open("sqlite3", "./gcsobjtable.db") // TODO: Remove hardcode to config
Expand All @@ -71,10 +72,37 @@ func NewGCSObjServer() *GCSObjServer {
waitlist: make(map[uint64][]string),
mu: sync.Mutex{},
database: database,
ticker: nil,
}
if flushIntervalSec != -1 {
// Launch periodic disk flushing
interval := time.Duration(flushIntervalSec) * time.Second
server.ticker = time.NewTicker(interval)
go func() {
for range server.ticker.C {
err := server.flushToDisk()
if err != nil {
log.Printf("Error flushing to disk: %v", err)
}
}
}()
}
return server
}

func (s *GCSObjServer) flushToDisk() error {
s.mu.Lock()
defer s.mu.Unlock()

err := insertOrUpdateObjectLocations(s.database, s.objectLocations)
if err != nil {
return err
}
// Completely delete the current map in memory and start blank
s.objectLocations = make(map[uint64][]uint64) // orphaning the old map will get it garbage collected in Go
return nil
}

/*
Returns a nodeId that has object uid. If it doesn't exist anywhere,
then the second return value will be false.
Expand Down
76 changes: 74 additions & 2 deletions go/cmd/gcsobjtable/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"database/sql"
"fmt"
"log"
"math/rand"
Expand All @@ -27,7 +28,7 @@ func init() {
cfg = config.GetConfig() // Load configuration
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
pb.RegisterGCSObjServer(s, NewGCSObjServer(-1))
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestGetNodeId(t *testing.T) {
// Test the getNodeId function
func TestGetNodeId_2(t *testing.T) {
// Initialize the server
server := NewGCSObjServer()
server := NewGCSObjServer(-1)

// Seed the random number generator to produce consistent results
rand.Seed(time.Now().UnixNano())
Expand Down Expand Up @@ -566,3 +567,74 @@ func contains(slice []uint64, value uint64) bool {
}
return false
}

func TestFlushToDisk(t *testing.T) {
// Set up in-memory SQLite database for testing
database, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("Failed to open SQLite database: %v", err)
}
defer database.Close()

// Create the table schema
createObjectLocationsTable(database)

// Create the server object with a short flush interval for testing
server := &GCSObjServer{
objectLocations: make(map[uint64][]uint64),
waitlist: make(map[uint64][]string),
mu: sync.Mutex{},
database: database,
ticker: nil,
}

// Populate the objectLocations map
server.objectLocations[1] = []uint64{100, 101, 102}
server.objectLocations[2] = []uint64{200, 201}

// Call the flushToDisk method
err = server.flushToDisk()
if err != nil {
t.Fatalf("flushToDisk failed: %v", err)
}

// Verify the data has been written to the database
rows, err := database.Query("SELECT object_uid, node_id FROM object_locations")
if err != nil {
t.Fatalf("Failed to query database: %v", err)
}
defer rows.Close()

// Read the results
results := make(map[uint64][]uint64)
for rows.Next() {
var objectUID uint64
var nodeID uint64
if err := rows.Scan(&objectUID, &nodeID); err != nil {
t.Fatalf("Failed to scan row: %v", err)
}
results[objectUID] = append(results[objectUID], nodeID)
}

// Expected results
expectedResults := map[uint64][]uint64{
1: {100, 101, 102},
2: {200, 201},
}

// Compare the results
if len(results) != len(expectedResults) {
t.Fatalf("Expected %d results, got %d", len(expectedResults), len(results))
}

for objectUID, nodeIDs := range expectedResults {
if len(results[objectUID]) != len(nodeIDs) {
t.Fatalf("Expected %d nodeIDs for object %d, got %d", len(nodeIDs), objectUID, len(results[objectUID]))
}
for i, nodeID := range nodeIDs {
if results[objectUID][i] != nodeID {
t.Fatalf("Expected nodeID %d for object %d, got %d", nodeID, objectUID, results[objectUID][i])
}
}
}
}

0 comments on commit 56793db

Please sign in to comment.