Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make I/O during indexing generic #438

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 78 additions & 32 deletions src/main/java/com/github/dbmdz/solrocr/model/SourcePointer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand All @@ -22,31 +23,58 @@
public class SourcePointer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static class FileSource {
public enum SourceType {
FILESYSTEM,
};

public final Path path;
public static class Source {

public final SourceType type;
public final String target;
public List<Region> regions;
public boolean isAscii;

public FileSource(Path path, List<Region> regions, boolean isAscii) throws IOException {
this.path = path;
if (!path.toFile().exists()) {
throw new FileNotFoundException(
String.format(Locale.US, "File at %s does not exist.", path));
}
if (path.toFile().length() == 0) {
throw new IOException(String.format(Locale.US, "File at %s is empty.", path));
}
public Source(String target, List<Region> regions, boolean isAscii) throws IOException {
this.type = determineType(target);
Source.validateTarget(target, this.type);
this.target = target;
this.regions = regions;
this.isAscii = isAscii;
}

static FileSource parse(String pointer) {
static SourceType determineType(String target) throws IOException {
if (target.startsWith("/")) {
return SourceType.FILESYSTEM;
} else if (Files.exists(Paths.get(target))) {
return SourceType.FILESYSTEM;
} else {
throw new IOException(
String.format(Locale.US, "Target %s is currently not supported.", target));
}
}

static void validateTarget(String target, SourceType type) throws IOException {
if (type == SourceType.FILESYSTEM) {
Path path = Paths.get(target);
if (!Files.exists(path)) {
throw new FileNotFoundException(
String.format(Locale.US, "File at %s does not exist.", target));
}
if (Files.size(path) == 0) {
throw new IOException(String.format(Locale.US, "File at %s is empty.", target));
}
} else {
throw new IOException(
String.format(Locale.US, "Target %s is currently not supported.", target));
}
}

static Source parse(String pointer) {
Matcher m = POINTER_PAT.matcher(pointer);
if (!m.find()) {
throw new RuntimeException("Could not parse source pointer from '" + pointer + ".");
}
Path sourcePath = Paths.get(m.group("path"));
String target = m.group("target");
List<Region> regions = ImmutableList.of();
if (m.group("regions") != null) {
regions =
Expand All @@ -56,16 +84,25 @@ static FileSource parse(String pointer) {
.collect(Collectors.toList());
}
try {
return new FileSource(sourcePath, regions, m.group("isAscii") != null);
return new Source(target, regions, m.group("isAscii") != null);
} catch (FileNotFoundException e) {
throw new RuntimeException("Could not locate file at '" + sourcePath + ".");
throw new RuntimeException("Could not locate file at '" + target + ".");
} catch (IOException e) {
throw new RuntimeException("Could not read file at '" + sourcePath + ".");
throw new RuntimeException("Could not read target at '" + target + ".");
}
}

public SourceReader getReader(int sectionSize, int maxCacheEntries) throws IOException {
if (this.type == SourceType.FILESYSTEM) {
return new FileSourceReader(
Paths.get(this.target), SourcePointer.parse(this.target), sectionSize, maxCacheEntries);
} else {
throw new UnsupportedOperationException("Unsupported source type '" + this.type + "'.");
}
}

public String toString() {
StringBuilder sb = new StringBuilder(path.toString());
StringBuilder sb = new StringBuilder(target);
if (isAscii) {
sb.append("{ascii}");
}
Expand Down Expand Up @@ -117,9 +154,9 @@ public String toString() {
}

static final Pattern POINTER_PAT =
Pattern.compile("^(?<path>.+?)(?<isAscii>\\{ascii})?(?:\\[(?<regions>[0-9:,]+)])?$");
Pattern.compile("^(?<target>.+?)(?<isAscii>\\{ascii})?(?:\\[(?<regions>[0-9:,]+)])?$");

public final List<FileSource> sources;
public final List<Source> sources;

public static boolean isPointer(String pointer) {
if (pointer.startsWith("<")) {
Expand All @@ -134,34 +171,43 @@ public static SourcePointer parse(String pointer) {
throw new RuntimeException("Could not parse pointer: " + pointer);
}
String[] sourceTokens = pointer.split("\\+");
List<FileSource> fileSources =
Arrays.stream(sourceTokens).map(FileSource::parse).collect(Collectors.toList());
if (fileSources.isEmpty()) {
List<Source> sources =
Arrays.stream(sourceTokens).map(Source::parse).collect(Collectors.toList());
if (sources.isEmpty()) {
return null;
} else {
return new SourcePointer(fileSources);
return new SourcePointer(sources);
}
}

public SourcePointer(List<FileSource> sources) {
public SourcePointer(List<Source> sources) {
this.sources = sources;
}

@Override
public String toString() {
return sources.stream().map(FileSource::toString).collect(Collectors.joining("+"));
return sources.stream().map(Source::toString).collect(Collectors.joining("+"));
}

/** Create a reader for the data pointed at by this source pointer. */
public SourceReader getReader(int sectionSize, int maxCacheEntries) throws IOException {
if (this.sources.size() == 1) {
return new FileSourceReader(this.sources.get(0).path, this, sectionSize, maxCacheEntries);
if (this.sources.stream().allMatch(s -> s.type == SourceType.FILESYSTEM)) {
if (this.sources.size() == 1) {
return new FileSourceReader(
Paths.get(this.sources.get(0).target), this, sectionSize, maxCacheEntries);
} else {
return new MultiFileSourceReader(
this.sources.stream().map(s -> Paths.get(s.target)).collect(Collectors.toList()),
this,
sectionSize,
maxCacheEntries);
}
} else {
return new MultiFileSourceReader(
this.sources.stream().map(s -> s.path).collect(Collectors.toList()),
this,
sectionSize,
maxCacheEntries);
throw new IOException(
String.format(
Locale.US,
"Pointer %s contains unsupported target types or a mix of target types.",
this));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ public BaseSourceReader(SourcePointer pointer, int sectionSize, int maxCacheEntr
this.maxCacheEntries = maxCacheEntries;
}

/**
* Read {@param len} bytes starting at {@param start} from the source into the buffer {@param dst}
* starting at offset {@param dstOffset}, returning the number of bytes read.
*/
protected abstract int readBytes(byte[] dst, int dstOffset, int start, int len)
throws IOException;

@Override
public abstract int length() throws IOException;

Expand Down Expand Up @@ -127,7 +120,7 @@ public String readAsciiString(int start, int len) throws IOException {
if (start + len > this.length()) {
len = this.length() - start;
}
StringBuilder sb = new StringBuilder();
StringBuilder sb = new StringBuilder(len);
int numRead = 0;
while (numRead < len) {
Section section = getAsciiSection(start + numRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.github.dbmdz.solrocr.model.SourcePointer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.Locale;
import org.apache.lucene.index.QueryTimeout;

Expand Down Expand Up @@ -74,4 +76,17 @@ public Section getAsciiSection(int offset) throws IOException {
checkAndThrow();
return input.getAsciiSection(offset);
}

@Override
public int readBytes(ByteBuffer dst, int start) throws IOException {
checkAndThrow();
return input.readBytes(dst, start);
}

@Override
public SeekableByteChannel getByteChannel() throws IOException {
// Just provided for completeness, this type is not used during indexing where this method
// matters.
return input.getByteChannel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
Expand All @@ -22,8 +23,8 @@ public FileSourceReader(Path path, SourcePointer ptr, int sectionSize, int maxCa
}

@Override
protected int readBytes(byte[] dst, int dstOffset, int start, int len) throws IOException {
return this.chan.read(ByteBuffer.wrap(dst, dstOffset, len), start);
public int readBytes(ByteBuffer dst, int start) throws IOException {
return this.chan.read(dst, start);
}

@Override
Expand All @@ -43,4 +44,9 @@ public void close() throws IOException {
public String getIdentifier() {
return this.path.toString();
}

@Override
public SeekableByteChannel getByteChannel() throws IOException {
return this.chan;
}
}
74 changes: 0 additions & 74 deletions src/main/java/com/github/dbmdz/solrocr/reader/MultiFileReader.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private OpenFile(Path p, int startOffset) throws IOException {
this.startOffset = startOffset;
}

public int read(byte[] dst, int dstOffset, int start, int len) throws IOException {
return this.channel.read(ByteBuffer.wrap(dst, dstOffset, len), start);
public int read(ByteBuffer dst, int start) throws IOException {
return this.channel.read(dst, start);
}

public void close() throws IOException {
Expand Down Expand Up @@ -68,7 +68,7 @@ public MultiFileSourceReader(
}

@Override
protected int readBytes(byte[] dst, int dstOffset, int start, int len) throws IOException {
public int readBytes(ByteBuffer dst, int start) throws IOException {
int fileIdx = ArrayUtils.binaryFloorIdxSearch(startOffsets, start);
if (fileIdx < 0) {
throw new RuntimeException(String.format("Offset %d is out of bounds", start));
Expand All @@ -79,9 +79,10 @@ protected int readBytes(byte[] dst, int dstOffset, int start, int len) throws IO
}
OpenFile file = openFiles[fileIdx];

int len = dst.remaining();
int numRead = 0;
while (numRead < len) {
numRead += file.read(dst, dstOffset + numRead, (start + numRead) - fileOffset, len - numRead);
numRead += file.read(dst, (start + numRead) - fileOffset);
if (numRead < len) {
fileIdx++;
if (fileIdx >= paths.length) {
Expand Down
Loading
Loading