Skip to content

Commit

Permalink
Support for partitioning scans.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Hale committed Jan 11, 2024
1 parent 61ab43b commit 9cbfc20
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 12 deletions.
4 changes: 2 additions & 2 deletions common/src/main/java/com/msd/gin/halyard/common/RDFValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public static <V extends Value, T extends SPOC<V>> boolean matches(V value, RDFV
}


protected RDFValue(RDFRole.Name role, V val, RDFFactory valueIO) {
protected RDFValue(RDFRole.Name role, V val, RDFFactory rdfFactory) {
super(role);
this.val = Objects.requireNonNull(val);
this.rdfFactory = Objects.requireNonNull(valueIO);
this.rdfFactory = Objects.requireNonNull(rdfFactory);
}

boolean isWellKnownIRI() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ public int size() {
};
}

static ByteSequence prefixWithPartition(int partition, int nbits, ByteSequence bseq) {
int nbytes = nbits/8;
int rem = nbits - 8*nbytes;
int numPartitions = (1 << nbits);
if (partition >= numPartitions) {
throw new IllegalArgumentException(String.format("Partition number %d must be less than %d (%d bits)", partition, numPartitions, nbits));
}
byte[] b = bseq.copyBytes();
if (b.length < nbytes + (rem > 0 ? 1 : 0)) {
throw new IllegalArgumentException(String.format("Byte array not long enough to hold %d bits", nbits));
}
for (int i=0; i<nbytes; i++) {
b[i] = (byte) (partition >> (nbits - 8*(i + 1)));
}
if (rem > 0) {
int shift = 8 - rem;
// zero top bits
b[nbytes] &= (byte) ((1 << shift) - 1);
// replace top bits
b[nbytes] |= (byte) (partition << shift);
}
return new ByteArray(b);
}

private final Name name;
final byte prefix;
final RDFRole<T1> role1;
Expand Down Expand Up @@ -367,20 +391,26 @@ Scan scan(ByteSequence k1Start, ByteSequence k2Start, ByteSequence k3Start, Byte
}

public Scan scan() {
return scanWithConstraint(null);
return scanWithConstraint(0, 0, null);
}
public Scan scanWithConstraint(@Nullable ValueConstraint constraint1) {
public Scan scanWithConstraint(int partition, int nbits, @Nullable ValueConstraint constraint1) {
int cardinality = cardinality1*cardinality2*cardinality3*cardinality4;
ByteSequence start1 = role1.startKey();
ByteSequence stop1 = role1.stopKey();
if (nbits > 0) {
start1 = prefixWithPartition(partition, nbits, start1);
stop1 = prefixWithPartition(partition, nbits, stop1);
}
Scan scan = scan(
role1.startKey(), role2.startKey(), role3.startKey(), role4.startKey(),
role1.stopKey(), role2.stopKey(), role3.stopKey(), role4.stopKey(),
start1, role2.startKey(), role3.startKey(), role4.startKey(),
stop1, role2.stopKey(), role3.stopKey(), role4.stopKey(),
cardinality,
true
);
if (constraint1 != null) {
List<Filter> filters = new ArrayList<>();
appendValueConstraintFilters(ByteSequence.EMPTY, null,
role1.startKey(), role1.stopKey(),
start1, stop1,
concat3(role2.startKey(), role3.startKey(), role4.startKey()),
concat3(role2.stopKey(), role3.stopKey(), role4.stopKey()),
constraint1, filters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private Scan scanAllLiterals() {
true
);
} else {
return index.scanWithConstraint(new ValueConstraint(ValueType.LITERAL));
return index.scanWithConstraint(0, 0, new ValueConstraint(ValueType.LITERAL));
}
}

Expand Down Expand Up @@ -378,7 +378,7 @@ private Scan scanWithSubjectConstraint(@Nonnull ValueConstraint subjConstraint,
if (ctx == null) {
if (pred == null) {
if (obj == null) {
return spo.scanWithConstraint(subjConstraint);
return spo.scanWithConstraint(0, 0, subjConstraint);
} else {
return osp.scanWithConstraint(obj, subjConstraint);
}
Expand All @@ -404,7 +404,7 @@ private Scan scanWithObjectConstraint(@Nullable RDFSubject subj, @Nullable RDFPr
if (ctx == null) {
if (subj == null) {
if (pred == null) {
return osp.scanWithConstraint(objConstraint);
return osp.scanWithConstraint(0, 0, objConstraint);
} else {
return pos.scanWithConstraint(pred, objConstraint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Resource;
Expand Down Expand Up @@ -251,6 +252,26 @@ public void testAllTermScan() throws Exception {
}
}

@Test
public void testScanAllPartitions() throws Exception {
int nbits = 1;
int numPartitions = (1 << nbits);
Set<Statement> actual = new HashSet<>();
for (int i=0; i<numPartitions; i++) {
Scan scan = stmtIndices.getSPOIndex().scanWithConstraint(i, nbits, null);
System.err.println(Bytes.toHex(scan.getStartRow()) + "\n" + Bytes.toHex(scan.getStopRow()));
try (ResultScanner rs = keyspaceConn.getScanner(scan)) {
Result r;
while ((r = rs.next()) != null) {
for (Statement stmt : parseStatements(null, null, null, null, r)) {
actual.add(stmt);
}
}
}
}
assertSets(allStatements, actual);
}

@Test
public void testScanStringLiterals_SPO() throws Exception {
Resource subj = vf.createIRI(SUBJ);
Expand Down Expand Up @@ -372,7 +393,7 @@ public void testScanTriples_POS() throws Exception {
@Test
public void testScanStringLiterals_OSP() throws Exception {
Set<Literal> actual = new HashSet<>();
Scan scan = stmtIndices.getOSPIndex().scanWithConstraint(new LiteralConstraint(XSD.STRING));
Scan scan = stmtIndices.getOSPIndex().scanWithConstraint(0, 0, new LiteralConstraint(XSD.STRING));
try (ResultScanner rs = keyspaceConn.getScanner(scan)) {
Result r;
while ((r = rs.next()) != null) {
Expand Down Expand Up @@ -407,7 +428,7 @@ public void testScanNonStringLiterals_COSP() throws Exception {
@Test
public void testScanTriples_OSP() throws Exception {
Set<Triple> actual = new HashSet<>();
Scan scan = stmtIndices.getOSPIndex().scanWithConstraint(new ValueConstraint(ValueType.TRIPLE));
Scan scan = stmtIndices.getOSPIndex().scanWithConstraint(0, 0, new ValueConstraint(ValueType.TRIPLE));
try (ResultScanner rs = keyspaceConn.getScanner(scan)) {
Result r;
while ((r = rs.next()) != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.msd.gin.halyard.common;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class StatementIndexTest {
@Test
public void testPrefixWithPartition_subByte() {
ByteSequence bseq = new ByteFiller((byte)0xFF, 6);
ByteSequence actual = StatementIndex.prefixWithPartition(5, 3, bseq);
ByteSequence expected = new ByteArray(new byte[] {(byte)0xBF, (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF});
assertEquals(expected, actual);
}

@Test
public void testPrefixWithPartition() {
ByteSequence bseq = new ByteFiller((byte)0xFF, 6);
ByteSequence actual = StatementIndex.prefixWithPartition(581, 13, bseq);
ByteSequence expected = new ByteArray(new byte[] {(byte)0x12, (byte)0x2F, (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF});
assertEquals(expected, actual);
}

@Test
public void testPrefixWithPartition_byteAligned() {
ByteSequence bseq = new ByteFiller((byte)0xFF, 6);
ByteSequence actual = StatementIndex.prefixWithPartition(5, 16, bseq);
ByteSequence expected = new ByteArray(new byte[] {(byte)0x00, (byte)0x05, (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF});
assertEquals(expected, actual);
}

@Test
public void testPartitionSizeTooBig() {
ByteSequence bseq = new ByteFiller((byte)0xFF, 1);
assertThrows(IllegalArgumentException.class, () ->
StatementIndex.prefixWithPartition(500, 10, bseq)
);
}
}

0 comments on commit 9cbfc20

Please sign in to comment.