Skip to content

Commit

Permalink
[Java] Complete adding header and version information to the recordin…
Browse files Browse the repository at this point in the history
…g log.
  • Loading branch information
mikeb01 committed Sep 2, 2024
1 parent 4b94275 commit b82ecfb
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 95 deletions.
138 changes: 86 additions & 52 deletions aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.Strings;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.Long2LongHashMap;
Expand Down Expand Up @@ -50,6 +51,9 @@
import static io.aeron.archive.client.AeronArchive.NULL_POSITION;
import static java.lang.Math.max;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.*;
import static org.agrona.BitUtil.*;

Expand All @@ -68,12 +72,13 @@
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | 0xFFA3F010 (Magic Number) |
* | |
* | 0xFFA3F010 (Magic Number) |
* | 0x00000000 |
* +---------------------------------------------------------------+
* | Version |
* +---------------------------------------------------------------+
* | Reserved |
* | Reserved (52 bytes) ...
* ... |
* +---------------------------------------------------------------+
* </pre>
* Recording log entry as follows:
Expand Down Expand Up @@ -117,7 +122,13 @@
public final class RecordingLog implements AutoCloseable
{
public static final long MAGIC_NUMBER = 0xFFA3F010_00000000L;
private static final int HEADER_SIZE = 64;
public static final int HEADER_SIZE = 64;
public static final int MAJOR_VERSION = 0;
public static final int MINOR_VERSION = 1;
public static final int PATCH_VERSION = 0;
public static final int SEMANTIC_VERSION = SemanticVersion.compose(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
private static final int MAGIC_NUMBER_OFFSET = 0;
private static final int VERSION_OFFSET = MAGIC_NUMBER_OFFSET + SIZE_OF_LONG;

/**
* Representation of the entry in the {@link RecordingLog}.
Expand Down Expand Up @@ -646,6 +657,9 @@ public String toString()
*/
public static final String RECORDING_LOG_FILE_NAME = "recording.log";

static final String RECORDING_LOG_MIGRATED_FILE_NAME = RECORDING_LOG_FILE_NAME + ".migrated";
static final String RECORDING_LOG_NEW_FILE_NAME = RECORDING_LOG_FILE_NAME + ".new";

/**
* The log entry is for a recording of messages within a leadership term to the log.
*/
Expand Down Expand Up @@ -785,6 +799,7 @@ public RecordingLog(final File parentDir, final boolean createNew)
if (isNewFile)
{
syncDirectory(parentDir);
writeHeader(fileChannel);
}
else
{
Expand All @@ -802,54 +817,6 @@ public RecordingLog(final File parentDir, final boolean createNew)
}
}

private void checkForVersionAndMigrate(final File logFile) throws IOException
{
if (requiresMigration(logFile))
{
final File oldMigratedFile = new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME + ".migrated");
if (!logFile.renameTo(oldMigratedFile))
{
throw new IOException("Unable to backup old file to new one");
}

final File newFile = new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME);
final MutableDirectBuffer header = new UnsafeBuffer(new byte[HEADER_SIZE]);
// TODO: Offset constants
header.putLong(0, MAGIC_NUMBER, LITTLE_ENDIAN);
header.putInt(8, 0 /* TODO: version */, LITTLE_ENDIAN);

try (FileOutputStream outputStream = new FileOutputStream(newFile))
{
outputStream.write(header.byteArray());
Files.copy(oldMigratedFile.toPath(), outputStream);
}
}
}

private boolean requiresMigration(final File logFile) throws IOException
{
if (logFile.length() < HEADER_SIZE)
{
return true;
}

try (FileChannel fileChannel = FileChannel.open(logFile.toPath(), READ))
{
return isMagicNumberInvalid(fileChannel);
}
}

private static boolean isMagicNumberInvalid(final FileChannel fileChannel) throws IOException
{
final DirectBuffer header = new UnsafeBuffer(ByteBuffer.allocateDirect(HEADER_SIZE));
if (HEADER_SIZE != fileChannel.read(header.byteBuffer()))
{
throw new IOException("Unable to read header");
}
final long magicNumber = header.getLong(0, LITTLE_ENDIAN);
return magicNumber != MAGIC_NUMBER;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -1884,6 +1851,73 @@ private int captureEntriesFromBuffer(
return consumed;
}

private void applyHeader(final MutableDirectBuffer header)
{
header.putLong(MAGIC_NUMBER_OFFSET, MAGIC_NUMBER, LITTLE_ENDIAN);
header.putInt(VERSION_OFFSET, SEMANTIC_VERSION, LITTLE_ENDIAN);
}

private void writeHeader(final FileChannel fileChannel) throws IOException
{
final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(HEADER_SIZE);
final MutableDirectBuffer header = new UnsafeBuffer(headerBuffer);
applyHeader(header);

if (HEADER_SIZE != fileChannel.write(headerBuffer))
{
throw new IOException("Failed to write full header");
}
}

private void checkForVersionAndMigrate(final File logFile) throws IOException
{
if (requiresMigration(logFile))
{
final File oldMigratedFile = new File(logFile.getParentFile(), RECORDING_LOG_MIGRATED_FILE_NAME);
Files.copy(logFile.toPath(), oldMigratedFile.toPath(), COPY_ATTRIBUTES, REPLACE_EXISTING);

final File newFile = new File(logFile.getParentFile(), RECORDING_LOG_NEW_FILE_NAME);
Files.deleteIfExists(newFile.toPath());

final MutableDirectBuffer header = new UnsafeBuffer(new byte[HEADER_SIZE]);
applyHeader(header);

try (FileOutputStream outputStream = new FileOutputStream(newFile, false))
{
outputStream.write(header.byteArray());
Files.copy(oldMigratedFile.toPath(), outputStream);
}

Files.move(
newFile.toPath(), new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME).toPath(),
ATOMIC_MOVE, REPLACE_EXISTING);
}
}

private boolean requiresMigration(final File logFile) throws IOException
{
if (logFile.length() < HEADER_SIZE)
{
return true;
}

try (FileChannel fileChannel = FileChannel.open(logFile.toPath(), READ))
{
return isMagicNumberInvalid(fileChannel);
}
}

private static boolean isMagicNumberInvalid(final FileChannel fileChannel) throws IOException
{
final DirectBuffer header = new UnsafeBuffer(ByteBuffer.allocateDirect(HEADER_SIZE));
if (HEADER_SIZE != fileChannel.read(header.byteBuffer()))
{
throw new IOException("Unable to read header");
}
final long magicNumber = header.getLong(0, LITTLE_ENDIAN);
return magicNumber != MAGIC_NUMBER;
}

private static void syncDirectory(final File dir)
{
try (FileChannel fileChannel = FileChannel.open(dir.toPath()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ void entriesInTheRecordingLogShouldBeSorted()

recordingLog.reload();

assertEquals(sortedList.size(), recordingLog.entries().size());
assertEquals(sortedList, recordingLog.entries()); // reload from disc and re-sort
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,34 @@
*/
package io.aeron.cluster;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;

import static io.aeron.cluster.RecordingLog.ENTRY_TYPE_SNAPSHOT;
import static io.aeron.cluster.RecordingLog.ENTRY_TYPE_STANDBY_SNAPSHOT;
import static io.aeron.cluster.RecordingLog.ENTRY_TYPE_TERM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RecordingLogVersioningTest
{
private static final byte[] CHARACTER_TABLE = new byte[27];
public static final long FIXED_SEED_FOR_CONSISTENT_DATA = 892374458763L;

@TempDir File tempDirA;
@TempDir File tempDirB;

static
{
for (int i = 0; i < 26; i++)
Expand All @@ -60,7 +64,7 @@ private static class TestEntry
private final int serviceId;
private final String endpoint;

public TestEntry(
TestEntry(
final int entryType,
final long recordingId,
final long leadershipTermId,
Expand Down Expand Up @@ -140,13 +144,19 @@ private List<TestEntry> generateData()
return entries;
}

@SuppressWarnings("checkstyle:MethodName")
@Test
@Disabled // Used to generate existing data.
void generateRecordingLogForVersionTest_1_45()
void verifyDataRemainsConsistent()
{
assertEquals(generateData(), generateData());
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
void shouldLoadOldVersionAndMigrate(final boolean hasPartiallyProcessedFiles) throws IOException
{
final File parentDir = new File("src/test/resources/v1_45_x");
try (UnversionedRecordingLog recordingLog = new UnversionedRecordingLog(parentDir, true))
assertNotEquals(tempDirA, tempDirB);

try (UnversionedRecordingLog recordingLog = new UnversionedRecordingLog(tempDirA, true))
{
for (final TestEntry testEntry : generateData())
{
Expand All @@ -160,17 +170,20 @@ void generateRecordingLogForVersionTest_1_45()
testEntry.serviceId,
testEntry.endpoint);
}
final List<UnversionedRecordingLog.Entry> entries = recordingLog.entries();
final String s = entries.toString();
System.out.println(s);
}
}

private static void appendToRecordingLog(final RecordingLog recordingLog, final List<TestEntry> testEntries)
{
for (final TestEntry testEntry : testEntries)
if (hasPartiallyProcessedFiles)
{
Files.createFile(new File(tempDirA, RecordingLog.RECORDING_LOG_MIGRATED_FILE_NAME).toPath());
Files.createFile(new File(tempDirA, RecordingLog.RECORDING_LOG_NEW_FILE_NAME).toPath());
}

final RecordingLog recordingLogMigrated = new RecordingLog(tempDirA, false);
final RecordingLog recordingLogNew = new RecordingLog(tempDirB, true);

for (final TestEntry testEntry : generateData())
{
recordingLog.append(
recordingLogNew.append(
testEntry.entryType,
testEntry.recordingId,
testEntry.leadershipTermId,
Expand All @@ -180,28 +193,6 @@ private static void appendToRecordingLog(final RecordingLog recordingLog, final
testEntry.serviceId,
testEntry.endpoint);
}
}

@Test
void verifyDataRemainsConsistent()
{
assertEquals(generateData(), generateData());
}

@Test
void shouldLoadOldVersionAndMigrate(@TempDir final File tempDirA, @TempDir final File tempDirB) throws IOException
{
assertNotEquals(tempDirA, tempDirB);

final File parentDir = new File("src/test/resources/v1_45_x");
final File oldFile = new File(parentDir, RecordingLog.RECORDING_LOG_FILE_NAME);
final File tempOldFile = new File(tempDirA, RecordingLog.RECORDING_LOG_FILE_NAME);

Files.copy(oldFile.toPath(), tempOldFile.toPath());

final RecordingLog recordingLogMigrated = new RecordingLog(tempDirA, false);
final RecordingLog recordingLogNew = new RecordingLog(tempDirB, true);
appendToRecordingLog(recordingLogNew, generateData());

assertEquals(recordingLogMigrated.entries(), recordingLogNew.entries());
}
Expand All @@ -213,16 +204,16 @@ private int recordingType(final Random r)
switch (type)
{
case 0: return ENTRY_TYPE_TERM;
case 1: return RecordingLog.ENTRY_TYPE_SNAPSHOT;
case 2: return RecordingLog.ENTRY_TYPE_STANDBY_SNAPSHOT;
case 1: return ENTRY_TYPE_SNAPSHOT;
case 2: return ENTRY_TYPE_STANDBY_SNAPSHOT;
}

throw new IllegalStateException();
}

private String endpoint(final Random r, final int type, final byte[] bs)
{
if (RecordingLog.ENTRY_TYPE_STANDBY_SNAPSHOT == type)
if (ENTRY_TYPE_STANDBY_SNAPSHOT == type)
{
final int length = 50 + r.nextInt(50);
for (int i = 0; i < length; i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.aeron.test.SystemTestWatcher;
import io.aeron.test.cluster.TestCluster;
import io.aeron.test.cluster.TestNode;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -38,6 +40,7 @@
import java.util.regex.Pattern;

import static io.aeron.test.cluster.TestCluster.aCluster;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
Expand Down Expand Up @@ -220,7 +223,12 @@ void sortRecordingLogIsANoOpIfRecordLogIsEmpty(final @TempDir Path emptyClusterD
final boolean result = ClusterTool.sortRecordingLog(clusterDir);

assertFalse(result);
assertArrayEquals(new byte[0], Files.readAllBytes(logFile));

final MutableDirectBuffer header = new UnsafeBuffer(new byte[RecordingLog.HEADER_SIZE]);
header.putLong(0, RecordingLog.MAGIC_NUMBER, LITTLE_ENDIAN);
header.putInt(8, RecordingLog.SEMANTIC_VERSION, LITTLE_ENDIAN);

assertArrayEquals(header.byteArray(), Files.readAllBytes(logFile));
}

@Test
Expand Down

0 comments on commit b82ecfb

Please sign in to comment.