Skip to content

Commit

Permalink
start implementing ourdb sync
Browse files Browse the repository at this point in the history
  • Loading branch information
timurgordon committed Feb 26, 2025
1 parent 17979b4 commit a798b23
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 0 deletions.
94 changes: 94 additions & 0 deletions lib/data/ourdb/sync.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
module ourdb

import encoding.binary

// SyncRecord represents a single database update for synchronization
struct SyncRecord {
id u32
data []u8
}

// get_last_index returns the highest ID currently in use in the database
pub fn (mut db OurDB) get_last_index() !u32 {
return db.lookup.get_next_id()! - 1
}

// push_updates serializes all updates from the given index onwards
pub fn (mut db OurDB) push_updates(index u32) ![]u8 {
mut updates := []u8{}
last_index := db.get_last_index()!

// No updates if requested index is at or beyond our last index
if index >= last_index {
return updates
}

// Write the number of updates as u32
update_count := last_index - index
mut count_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut count_bytes, update_count)
updates << count_bytes

// Collect and serialize all updates after the given index
for i := index + 1; i <= last_index; i++ {
// Get data for this ID
data := db.get(i) or { continue }

// Write ID (u32)
mut id_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut id_bytes, i)
updates << id_bytes

// Write data length (u32)
mut len_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut len_bytes, u32(data.len))
updates << len_bytes

// Write data
updates << data
}

return updates
}

// sync_updates applies received updates to the database
pub fn (mut db OurDB) sync_updates(bytes []u8) ! {
if bytes.len < 4 {
return error('invalid update data: too short')
}

mut pos := 0

// Read number of updates
update_count := binary.little_endian_u32(bytes[pos..pos + 4])
pos += 4

// Process each update
for _ in 0 .. update_count {
if pos + 8 > bytes.len {
return error('invalid update data: truncated header')
}

// Read ID
id := binary.little_endian_u32(bytes[pos..pos + 4])
pos += 4

// Read data length
data_len := binary.little_endian_u32(bytes[pos..pos + 4])
pos += 4

if pos + int(data_len) > bytes.len {
return error('invalid update data: truncated content')
}

// Read data
data := bytes[pos..pos + int(data_len)]
pos += int(data_len)

// Apply update
db.set(OurDBSetArgs{
id: id
data: data.clone()
})!
}
}
83 changes: 83 additions & 0 deletions lib/data/ourdb/sync_test.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
module ourdb

fn test_db_sync() ! {
// Create two database instances
mut db1 := new_test_db('sync_test_db1')!
mut db2 := new_test_db('sync_test_db2')!

defer {
db1.destroy()!
db2.destroy()!
}

// Initial state - both DBs are synced
db1.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})!
db2.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})!

assert db1.get(1)! == 'initial data'.bytes()
assert db2.get(1)! == 'initial data'.bytes()

// Make updates to db1
db1.set(OurDBSetArgs{id: 2, data: 'second update'.bytes()})!
db1.set(OurDBSetArgs{id: 3, data: 'third update'.bytes()})!

// Verify db1 has the updates
assert db1.get(2)! == 'second update'.bytes()
assert db1.get(3)! == 'third update'.bytes()

// Verify db2 is behind
assert db1.get_last_index()! == 3
assert db2.get_last_index()! == 1

// Sync db2 with updates from db1
last_synced_index := db2.get_last_index()!
updates := db1.push_updates(last_synced_index)!
db2.sync_updates(updates)!

// Verify db2 is now synced
assert db2.get_last_index()! == 3
assert db2.get(2)! == 'second update'.bytes()
assert db2.get(3)! == 'third update'.bytes()
}

fn test_db_sync_empty_updates() ! {
mut db1 := new_test_db('sync_test_db1_empty')!
mut db2 := new_test_db('sync_test_db2_empty')!

defer {
db1.destroy()!
db2.destroy()!
}

// Both DBs are at the same index
db1.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})!
db2.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})!

last_index := db2.get_last_index()!
updates := db1.push_updates(last_index)!

// Should get empty updates since DBs are synced
assert updates.len == 0

db2.sync_updates(updates)!
assert db2.get_last_index()! == 1
}

fn test_db_sync_invalid_data() ! {
mut db := new_test_db('sync_test_db_invalid')!

defer {
db.destroy()!
}

// Test with empty data
if _ := db.sync_updates([]u8{}) {
assert false, 'should fail with empty data'
}

// Test with invalid data length
invalid_data := []u8{len: 2, init: 0}
if _ := db.sync_updates(invalid_data) {
assert false, 'should fail with invalid data length'
}
}

0 comments on commit a798b23

Please sign in to comment.