-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Usage of InputStream instead of file path #16
Comments
Note that InputStream does not have a way to do seeks/positioning in a stream, and that's vital for many formats. You're welcome to contribute an implementation of KaitaiStream which will use InputStream (for example, you can throw RuntimeExceptions on attempts to seek, or try some magic with |
Consider the code below: import io.kaitai.struct.ByteBufferKaitaiStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.agrona.LangUtil;
public class KaitaiInputStream extends ByteBufferKaitaiStream {
private static final int BUFFER_SIZE = 32 * 1024;
private final ReadableByteChannel channel;
private final ByteBuffer buffer;
private final ByteBuffer helper;
private long pos;
public static KaitaiInputStream fromStream(InputStream is) {
final var channel = Channels.newChannel(is);
final var buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
return new KaitaiInputStream(channel, buffer);
}
private KaitaiInputStream(ReadableByteChannel ch, ByteBuffer buffer) {
super(buffer);
this.channel = ch;
this.buffer = buffer;
this.buffer.limit(0);
this.helper = ByteBuffer.allocateDirect(buffer.capacity());
this.helper.limit(0);
}
private void readBytesToBuffer(int count) {
int unused = buffer.capacity() - buffer.limit();
if (unused > count) {
append(buffer, count);
return;
}
pos += buffer.position();
moveRemainingToFront();
append(buffer, count);
}
private void moveRemainingToFront() {
helper.clear();
helper.put(buffer);
helper.flip();
buffer.clear();
buffer.put(helper);
buffer.flip();
}
private void append(ByteBuffer dst, int count) {
int mark = dst.position();
dst.position(dst.limit());
dst.limit(dst.capacity());
fillBuffer(dst, count);
dst.position(mark);
}
private void fillBuffer(ByteBuffer dst, int count) {
int remaining = count;
try {
while (remaining > 0 && channel.isOpen()) {
int bytesRead = channel.read(dst);
if (bytesRead == -1) {
throw new IOException("End of stream");
}
if (bytesRead > 0) {
remaining -= bytesRead;
}
}
dst.flip();
} catch (IOException ex) {
LangUtil.rethrowUnchecked(ex);
}
}
private void ensureBytes(int byteCount) {
if (buffer.remaining() >= byteCount) {
return;
}
readBytesToBuffer(byteCount - buffer.remaining());
}
@Override
public void close() throws IOException {
channel.close();
}
@Override
public boolean isEof() {
return !channel.isOpen();
}
@Override
public void seek(int i) {
ensureBytes(i);
buffer.position(buffer.position() + i);
pos += i;
}
@Override
public void seek(long l) {
if (l > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Not supported yet.");
}
seek((int) l);
}
@Override
public int pos() {
return (int) pos;
}
@Override
public long size() {
return Integer.MAX_VALUE;
}
@Override
public byte readS1() {
ensureBytes(1);
return super.readS1();
}
@Override
public short readS2be() {
ensureBytes(2);
return super.readS2be();
}
@Override
public int readS4be() {
ensureBytes(4);
return super.readS4be();
}
@Override
public long readS8be() {
ensureBytes(8);
return super.readS8be();
}
@Override
public short readS2le() {
ensureBytes(2);
return super.readS2le();
}
@Override
public int readS4le() {
ensureBytes(4);
return super.readS4le();
}
@Override
public long readS8le() {
ensureBytes(8);
return super.readS8le();
}
@Override
public int readU1() {
ensureBytes(1);
return super.readU1();
}
@Override
public int readU2be() {
ensureBytes(2);
return super.readU2be();
}
@Override
public long readU4be() {
ensureBytes(4);
return super.readU4be();
}
@Override
public int readU2le() {
ensureBytes(2);
return super.readU2le();
}
@Override
public long readU4le() {
ensureBytes(4);
return super.readU4le();
}
@Override
public float readF4be() {
ensureBytes(4);
return super.readF4be();
}
@Override
public double readF8be() {
ensureBytes(8);
return super.readF8be();
}
@Override
public float readF4le() {
ensureBytes(4);
return super.readF4le();
}
@Override
public double readF8le() {
ensureBytes(8);
return super.readF8le();
}
@Override
public byte[] readBytes(long n) {
ensureBytes(toByteArrayLength(n));
return super.readBytes(n);
}
@Override
public byte[] readBytesFull() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public byte[] readBytesTerm(
byte term, boolean includeTerm, boolean consumeTerm, boolean eosError) {
throw new UnsupportedOperationException("Not supported yet.");
}
} with the above, we can have a streaming public class StreamingPCap extends Pcap implements Iterable<Packet> {
private Pcap.Header hdr;
public StreamingPCap(KaitaiStream io) {
super(io);
hdr = new Pcap.Header(this._io, this, this);
}
@Override
public Header hdr() {
return hdr;
}
@Override
protected void _read() {
// do nothing
}
public Stream<Packet> packetStream() {
return StreamSupport.stream(this.spliterator(), false);
}
@Override
public Iterator<Packet> iterator() {
return new PacketIterator(_io, this);
}
private static class PacketIterator implements Iterator<Packet> {
private final KaitaiStream io;
private final Pcap parent;
private PacketIterator(KaitaiStream io, Pcap parent) {
this.io = io;
this.parent = parent;
}
@Override
public boolean hasNext() {
return !io.isEof();
}
@Override
public Packet next() {
return new Packet(this.io, parent, parent);
}
}
} |
maybe generated code can be more flexible with usage of of InputStream instead of file path
e.g.
it won't give chance to process files that comes from network(gdrive) for example without saving on disk
The text was updated successfully, but these errors were encountered: