diff --git a/lib/data/ourdb/sync.v b/lib/data/ourdb/sync.v new file mode 100644 index 00000000..9fa1c688 --- /dev/null +++ b/lib/data/ourdb/sync.v @@ -0,0 +1,94 @@ +module ourdb + +import encoding.binary + +// SyncRecord represents a single database update for synchronization +struct SyncRecord { + id u32 + data []u8 +} + +// get_last_index returns the highest ID currently in use in the database +pub fn (mut db OurDB) get_last_index() !u32 { + return db.lookup.get_next_id()! - 1 +} + +// push_updates serializes all updates from the given index onwards +pub fn (mut db OurDB) push_updates(index u32) ![]u8 { + mut updates := []u8{} + last_index := db.get_last_index()! + + // No updates if requested index is at or beyond our last index + if index >= last_index { + return updates + } + + // Write the number of updates as u32 + update_count := last_index - index + mut count_bytes := []u8{len: 4} + binary.little_endian_put_u32(mut count_bytes, update_count) + updates << count_bytes + + // Collect and serialize all updates after the given index + for i := index + 1; i <= last_index; i++ { + // Get data for this ID + data := db.get(i) or { continue } + + // Write ID (u32) + mut id_bytes := []u8{len: 4} + binary.little_endian_put_u32(mut id_bytes, i) + updates << id_bytes + + // Write data length (u32) + mut len_bytes := []u8{len: 4} + binary.little_endian_put_u32(mut len_bytes, u32(data.len)) + updates << len_bytes + + // Write data + updates << data + } + + return updates +} + +// sync_updates applies received updates to the database +pub fn (mut db OurDB) sync_updates(bytes []u8) ! { + if bytes.len < 4 { + return error('invalid update data: too short') + } + + mut pos := 0 + + // Read number of updates + update_count := binary.little_endian_u32(bytes[pos..pos + 4]) + pos += 4 + + // Process each update + for _ in 0 .. update_count { + if pos + 8 > bytes.len { + return error('invalid update data: truncated header') + } + + // Read ID + id := binary.little_endian_u32(bytes[pos..pos + 4]) + pos += 4 + + // Read data length + data_len := binary.little_endian_u32(bytes[pos..pos + 4]) + pos += 4 + + if pos + int(data_len) > bytes.len { + return error('invalid update data: truncated content') + } + + // Read data + data := bytes[pos..pos + int(data_len)] + pos += int(data_len) + + // Apply update + db.set(OurDBSetArgs{ + id: id + data: data.clone() + })! + } +} diff --git a/lib/data/ourdb/sync_test.v b/lib/data/ourdb/sync_test.v new file mode 100644 index 00000000..5284be31 --- /dev/null +++ b/lib/data/ourdb/sync_test.v @@ -0,0 +1,83 @@ +module ourdb + +fn test_db_sync() ! { + // Create two database instances + mut db1 := new_test_db('sync_test_db1')! + mut db2 := new_test_db('sync_test_db2')! + + defer { + db1.destroy()! + db2.destroy()! + } + + // Initial state - both DBs are synced + db1.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})! + db2.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})! + + assert db1.get(1)! == 'initial data'.bytes() + assert db2.get(1)! == 'initial data'.bytes() + + // Make updates to db1 + db1.set(OurDBSetArgs{id: 2, data: 'second update'.bytes()})! + db1.set(OurDBSetArgs{id: 3, data: 'third update'.bytes()})! + + // Verify db1 has the updates + assert db1.get(2)! == 'second update'.bytes() + assert db1.get(3)! == 'third update'.bytes() + + // Verify db2 is behind + assert db1.get_last_index()! == 3 + assert db2.get_last_index()! == 1 + + // Sync db2 with updates from db1 + last_synced_index := db2.get_last_index()! + updates := db1.push_updates(last_synced_index)! + db2.sync_updates(updates)! + + // Verify db2 is now synced + assert db2.get_last_index()! == 3 + assert db2.get(2)! == 'second update'.bytes() + assert db2.get(3)! == 'third update'.bytes() +} + +fn test_db_sync_empty_updates() ! { + mut db1 := new_test_db('sync_test_db1_empty')! + mut db2 := new_test_db('sync_test_db2_empty')! + + defer { + db1.destroy()! + db2.destroy()! + } + + // Both DBs are at the same index + db1.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})! + db2.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})! + + last_index := db2.get_last_index()! + updates := db1.push_updates(last_index)! + + // Should get empty updates since DBs are synced + assert updates.len == 0 + + db2.sync_updates(updates)! + assert db2.get_last_index()! == 1 +} + +fn test_db_sync_invalid_data() ! { + mut db := new_test_db('sync_test_db_invalid')! + + defer { + db.destroy()! + } + + // Test with empty data + if _ := db.sync_updates([]u8{}) { + assert false, 'should fail with empty data' + } + + // Test with invalid data length + invalid_data := []u8{len: 2, init: 0} + if _ := db.sync_updates(invalid_data) { + assert false, 'should fail with invalid data length' + } +}