Skip to content

Commit

Permalink
feat: read in range
Browse files Browse the repository at this point in the history
  • Loading branch information
gc-garcol committed Dec 29, 2024
1 parent 8903c3c commit 9deed88
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

group = "io.github.gc-garcol"
version = "0.4.0"
version = "1.0.0"

java {
withJavadocJar()
Expand Down
2 changes: 1 addition & 1 deletion wal-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

group = "io.github.gc-garcol"
version = "0.4.0"
version = "1.0.0"

java {
toolchain {
Expand Down
26 changes: 26 additions & 0 deletions wal-core/src/main/java/gc/garcol/walcore/LogReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package gc.garcol.walcore;

import java.nio.ByteBuffer;

/**
* The `LogReader` interface defines a functional interface for handling log entry data.
*
* <p> Implementations of this interface provide a method to process log entry data stored in a `ByteBuffer`.
* This interface is used in conjunction with the `LogRepository` class to read and process log entries.
*
* @author thaivc
* @since 2024
*/
@FunctionalInterface
public interface LogReader
{
/**
* Handles log entry data.
*
* <p> This method is called to process log entry data stored in the provided `ByteBuffer`.
* Implementations should define the logic to handle the log entry data.
*
* @param readBuffer the buffer containing the log entry data
*/
void handle(ByteBuffer readBuffer);
}
155 changes: 122 additions & 33 deletions wal-core/src/main/java/gc/garcol/walcore/LogRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import java.util.Optional;

/**
* The `LogRepository` class provides methods to read, write, truncate, and get the total records of log files.
* It uses `RandomAccessFile` and `FileChannel` to perform file operations.
* The `LogRepository` class provides methods to read, write, truncate, and get the total records of
* log files. It uses `RandomAccessFile` and `FileChannel` to perform file operations.
*
* @author thaivc
* @since 2024
*/
public class LogRepository {
public class LogRepository
{

/**
* The first segment number.
*/
Expand All @@ -45,7 +47,8 @@ public class LogRepository {
*
* @param baseLogDir the base directory for log files
*/
public LogRepository(String baseLogDir) {
public LogRepository(String baseLogDir)
{
LogUtil.createDirectoryNX(baseLogDir);
this.baseLogDir = baseLogDir;
}
Expand All @@ -54,20 +57,22 @@ public LogRepository(String baseLogDir) {
* Retrieves the latest log segment number from the base log directory.
*
* <p> This method scans the base log directory for log files, sorts them by file name,
* and returns the segment number of the most recent log file. If no log files are found,
* it returns 0.
* and returns the segment number of the most recent log file. If no log files are found, it
* returns 0.
*
* @return the segment number of the latest log file, or 0 if no log files are found
* @throws IOException if an I/O error occurs while accessing the log directory
*/
public long getLatestSegment() throws IOException {
public long getLatestSegment() throws IOException
{
Optional<Path> lastFile = Files.list(Paths.get(baseLogDir))
.filter(Files::isRegularFile)
.max(Comparator.comparing(Path::getFileName));
return lastFile.map(path -> LogUtil.segment(path.getFileName().toString())).orElse(-1L);
}

private void generateFiles(long segment) throws IOException {
private void generateFiles(long segment) throws IOException
{
indexFile = new RandomAccessFile(indexPath(segment), "rw");
logFile = new RandomAccessFile(logPath(segment), "rw");
currentIndex = totalRecords(currentSegment);
Expand All @@ -80,16 +85,19 @@ private void generateFiles(long segment) throws IOException {
/**
* Switches to the specified log segment and initializes the corresponding log and index files.
*
* <p> This method updates the current segment to the specified segment, generates the necessary log and index files,
* <p> This method updates the current segment to the specified segment, generates the necessary
* log and index files,
* and sets the current index to the total number of records in the new segment.
*
* @param segment the segment number to switch to
* @throws IOException if an I/O error occurs during the file operations
* @throws IllegalArgumentException if the specified segment is older than the current segment
*/
public void switchToSegment(long segment) throws IOException {
public void switchToSegment(long segment) throws IOException
{
long latestSegment = getLatestSegment();
if (segment < latestSegment) {
if (segment < latestSegment)
{
throw new IllegalArgumentException("Cannot switch to an older segment");
}
currentSegment = segment;
Expand All @@ -98,41 +106,103 @@ public void switchToSegment(long segment) throws IOException {
}

/**
* Reads a log entry from the specified path and index into the provided `ByteBuffer`.
* Reads a log entry from the specified segment and index into the provided `ByteBuffer`.
*
* @param segment the segment of the log files
* @param index the index of the log entry to read
* @param readerBuffer the buffer to read the log entry into
* @throws IOException if an I/O error occurs
*/
public void read(long segment, long index, ByteBuffer readerBuffer) throws IOException {
RandomAccessFile indexFileRead = indexFile != null && segment == currentSegment ? indexFile : new RandomAccessFile(indexPath(segment), "r");
RandomAccessFile logFileRead = logFile != null && segment == currentSegment ? logFile : new RandomAccessFile(logPath(segment), "r");
public void read(long segment, long index, ByteBuffer readerBuffer) throws IOException
{
RandomAccessFile indexFileRead = indexFile != null && segment == currentSegment ? indexFile
: new RandomAccessFile(indexPath(segment), "r");
RandomAccessFile logFileRead = logFile != null && segment == currentSegment ? logFile
: new RandomAccessFile(logPath(segment), "r");

indexFileRead.seek(index * LogIndex.SIZE);
var logIndex = new LogIndex(indexFileRead.readLong(), indexFileRead.readInt());
logFileRead.seek(logIndex.index());
var buffer = new byte[logIndex.entryLength()];
logFileRead.readFully(buffer);

long logIndex = indexFileRead.readLong();
int logLength = indexFileRead.readInt();
readerBuffer.clear();
readerBuffer.limit(logIndex.entryLength());
readerBuffer.limit(logLength);
var logChannel = logFileRead.getChannel();
logChannel.read(readerBuffer, logIndex.index());
logChannel.position(logIndex);
logChannel.read(readerBuffer);
readerBuffer.flip();
}

/**
* Reads log entries from the specified segment, reading from fromIndex to toIndex, the log entries are then handled by a `LogReader`.
*
* @param segment the segment of the log files
* @param fromIndex the starting index of the log entries to read
* @param toIndex the ending index of the log entries to read
* @param readBuffer the buffer to read the log entries into
* @param logReader the log reader to handle the log entries
* @throws IOException if an I/O error occurs
*/
public void read(long segment, long fromIndex, long toIndex, ByteBuffer readBuffer, LogReader logReader) throws IOException
{
RandomAccessFile indexFileRead = indexFile != null && segment == currentSegment ? indexFile
: new RandomAccessFile(indexPath(segment), "r");
RandomAccessFile logFileRead = logFile != null && segment == currentSegment ? logFile
: new RandomAccessFile(logPath(segment), "r");

readLogInRange(fromIndex, toIndex, readBuffer, logReader, indexFileRead, logFileRead);
}

/**
* Reads log entries from the specified segment and index range, reading from fromIndex to the end of log segment, the log entries are then handled by a `LogReader`.
*
* @param segment the segment of the log files
* @param fromIndex the starting index of the log entries to read
* @param readBuffer the buffer to read the log entries into
* @param logReader the log reader to handle the log entries
* @throws IOException if an I/O error occurs
*/
public void read(long segment, long fromIndex, ByteBuffer readBuffer, LogReader logReader) throws IOException
{
RandomAccessFile indexFileRead = indexFile != null && segment == currentSegment ? indexFile
: new RandomAccessFile(indexPath(segment), "r");
RandomAccessFile logFileRead = logFile != null && segment == currentSegment ? logFile
: new RandomAccessFile(logPath(segment), "r");

long endIndex = totalRecords(indexFileRead) - 1;

readLogInRange(fromIndex, endIndex, readBuffer, logReader, indexFileRead, logFileRead);
}

private void readLogInRange(final long fromIndex, final long toIndex, final ByteBuffer readBuffer, final LogReader logReader, final RandomAccessFile indexFileRead, final RandomAccessFile logFileRead) throws IOException
{
FileChannel logChannel = logFileRead.getChannel();
for (long index = fromIndex; index <= toIndex; index++)
{
indexFileRead.seek(index * LogIndex.SIZE);
long logIndex = indexFileRead.readLong();
int logLength = indexFileRead.readInt();
logFileRead.seek(logIndex);
readBuffer.clear();
readBuffer.limit(logLength);
logChannel.position(logIndex);
logChannel.read(readBuffer);
readBuffer.flip();
logReader.handle(readBuffer);
}
}

/**
* Appends log entries to the current log segment.
*
* <p> This method writes the provided log entries to the current log file and updates the index file accordingly.
* The log entries are written at the end of the current log file, and the index file is updated with the position
* and length of each log entry.
* <p> This method writes the provided log entries to the current log file and updates the index
* file accordingly.
* The log entries are written at the end of the current log file, and the index file is updated
* with the position and length of each log entry.
*
* @param logs the buffer containing the log entries to append
* @throws IOException if an I/O error occurs during the write operation
*/
public void append(ByteBuffer logs) throws IOException {
public void append(ByteBuffer logs) throws IOException
{
indexBufferWriter.clear();
indexBufferWriter.putLong(logFile.length());
indexBufferWriter.putInt(logs.limit());
Expand All @@ -154,16 +224,19 @@ public void append(ByteBuffer logs) throws IOException {
* @param fromIndex the index from which to truncate
* @throws IOException if an I/O error occurs
*/
public void truncate(long segment, long fromIndex) throws IOException {
public void truncate(long segment, long fromIndex) throws IOException
{
File indexFileSys = new File(indexPath(segment));
if (!indexFileSys.exists()) {
if (!indexFileSys.exists())
{
return;
}

try (
RandomAccessFile indexFile = new RandomAccessFile(indexPath(segment), "rw");
RandomAccessFile logFile = new RandomAccessFile(logPath(segment), "rw")
) {
)
{
indexBufferReader.clear();
var indexChannel = indexFile.getChannel();
indexChannel.read(indexBufferReader, fromIndex * LogIndex.SIZE);
Expand All @@ -181,16 +254,32 @@ public void truncate(long segment, long fromIndex) throws IOException {
* @return the total number of records
* @throws IOException if an I/O error occurs
*/
public long totalRecords(long segment) throws IOException {
RandomAccessFile indexFileRead = segment == currentSegment ? indexFile : new RandomAccessFile(indexPath(segment), "r");
public long totalRecords(long segment) throws IOException
{
RandomAccessFile indexFileRead =
segment == currentSegment ? indexFile : new RandomAccessFile(indexPath(segment), "r");
return indexFileRead.length() / LogIndex.SIZE;
}

/**
* Returns the total number of records in the specified index file.
*
* @param indexFileRead the index file to read
* @return the total number of records
* @throws IOException if an I/O error occurs
*/
public long totalRecords(RandomAccessFile indexFileRead) throws IOException
{
return indexFileRead.length() / LogIndex.SIZE;
}

private String indexPath(long segment) {
private String indexPath(long segment)
{
return baseLogDir + "/" + LogUtil.indexName(segment);
}

private String logPath(long segment) {
private String logPath(long segment)
{
return baseLogDir + "/" + LogUtil.logName(segment);
}
}
4 changes: 4 additions & 0 deletions wal-core/src/main/java/gc/garcol/walcore/LogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
public class LogUtil
{

private LogUtil()
{
}

private static final int LONG_LENGTH = String.valueOf(Long.MAX_VALUE).length();
private static final String LOG_FORMAT = "%0" + LONG_LENGTH + "d" + ".dat";
private static final String INDEX_FORMAT = "%0" + LONG_LENGTH + "d" + ".index.dat";
Expand Down

0 comments on commit 9deed88

Please sign in to comment.