Skip to content

Commit

Permalink
Implement synthetic columns and ORDER BY BM25 (#1434)
Browse files Browse the repository at this point in the history
### What is the issue
riptano/cndb#11725

# New functionality

ORDER BY BM25, enabled by SAI Version.EC

# Enhanced functionality

ORDER BY ANN can also used the synthetic score column so that coordinator does not need to recompute similarity for every row returned by the different replicas.  Controlled by SelectStatement.ANN_USE_SYNTHETIC_SCORE (default false)
  • Loading branch information
jbellis authored Feb 27, 2025
1 parent 303c520 commit b0cdc37
Show file tree
Hide file tree
Showing 111 changed files with 3,937 additions and 1,025 deletions.
1 change: 1 addition & 0 deletions src/antlr/Lexer.g
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ K_DROPPED: D R O P P E D;
K_COLUMN: C O L U M N;
K_RECORD: R E C O R D;
K_ANN: A N N;
K_BM25: B M '2' '5';

// Case-insensitive alpha characters
fragment A: ('a'|'A');
Expand Down
15 changes: 10 additions & 5 deletions src/antlr/Parser.g
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,18 @@ customIndexExpression [WhereClause.Builder clause]
;

orderByClause[List<Ordering.Raw> orderings]
@init{
@init {
Ordering.Direction direction = Ordering.Direction.ASC;
Ordering.Raw.Expression expr = null;
}
: c=cident (K_ANN K_OF t=term)? (K_ASC | K_DESC { direction = Ordering.Direction.DESC; })?
: c=cident
( K_ANN K_OF t=term { expr = new Ordering.Raw.Ann(c, t); }
| K_BM25 K_OF t=term { expr = new Ordering.Raw.Bm25(c, t); }
)?
(K_ASC | K_DESC { direction = Ordering.Direction.DESC; })?
{
Ordering.Raw.Expression expr = (t == null)
? new Ordering.Raw.SingleColumn(c)
: new Ordering.Raw.Ann(c, t);
if (expr == null)
expr = new Ordering.Raw.SingleColumn(c);
orderings.add(new Ordering.Raw(expr, direction));
}
;
Expand Down Expand Up @@ -1969,6 +1973,7 @@ basic_unreserved_keyword returns [String str]
| K_COLUMN
| K_RECORD
| K_ANN
| K_BM25
| K_OFFSET
) { $str = $k.text; }
;
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/cql3/GeoDistanceRelation.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ protected Restriction newAnnRestriction(TableMetadata table, VariableSpecificati
throw invalidRequest("%s cannot be used with the GEO_DISTANCE function", operator());
}

@Override
protected Restriction newBm25Restriction(TableMetadata table, VariableSpecifications boundNames)
{
throw invalidRequest("%s cannot be used with the GEO_DISTANCE function", operator());
}

@Override
protected Restriction newAnalyzerMatchesRestriction(TableMetadata table, VariableSpecifications boundNames)
{
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ protected Restriction newAnnRestriction(TableMetadata table, VariableSpecificati
throw invalidRequest("%s cannot be used for multi-column relations", operator());
}

@Override
protected Restriction newBm25Restriction(TableMetadata table, VariableSpecifications boundNames)
{
throw invalidRequest("%s cannot be used for multi-column relations", operator());
}

@Override
protected Restriction newAnalyzerMatchesRestriction(TableMetadata table, VariableSpecifications boundNames)
{
Expand Down
21 changes: 20 additions & 1 deletion src/java/org/apache/cassandra/cql3/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public boolean isSatisfiedBy(AbstractType<?> type,
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer)
{
return true;
throw new UnsupportedOperationException();
}
},
NOT_IN(16)
Expand Down Expand Up @@ -523,6 +523,7 @@ private boolean hasToken(AbstractType<?> type, List<ByteBuffer> tokens, ByteBuff
return false;
}
},

/**
* An operator that performs a distance bounded approximate nearest neighbor search against a vector column such
* that all result vectors are within a given distance of the query vector. The notable difference between this
Expand Down Expand Up @@ -584,6 +585,24 @@ public boolean isSatisfiedBy(AbstractType<?> type,
{
throw new UnsupportedOperationException();
}
},
BM25(104)
{
@Override
public String toString()
{
return "BM25";
}

@Override
public boolean isSatisfiedBy(AbstractType<?> type,
ByteBuffer leftOperand,
ByteBuffer rightOperand,
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer)
{
throw new UnsupportedOperationException();
}
};

/**
Expand Down
75 changes: 75 additions & 0 deletions src/java/org/apache/cassandra/cql3/Ordering.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
import org.apache.cassandra.cql3.restrictions.SingleRestriction;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;

Expand Down Expand Up @@ -48,6 +49,11 @@ public interface Expression
SingleRestriction toRestriction();

ColumnMetadata getColumn();

default boolean isScored()
{
return false;
}
}

/**
Expand Down Expand Up @@ -118,6 +124,54 @@ public ColumnMetadata getColumn()
{
return column;
}

@Override
public boolean isScored()
{
return SelectStatement.ANN_USE_SYNTHETIC_SCORE;
}
}

/**
* An expression used in BM25 ordering.
* <code>ORDER BY column BM25 OF value</code>
*/
public static class Bm25 implements Expression
{
final ColumnMetadata column;
final Term queryValue;
final Direction direction;

public Bm25(ColumnMetadata column, Term queryValue, Direction direction)
{
this.column = column;
this.queryValue = queryValue;
this.direction = direction;
}

@Override
public boolean hasNonClusteredOrdering()
{
return true;
}

@Override
public SingleRestriction toRestriction()
{
return new SingleColumnRestriction.Bm25Restriction(column, queryValue);
}

@Override
public ColumnMetadata getColumn()
{
return column;
}

@Override
public boolean isScored()
{
return true;
}
}

public enum Direction
Expand Down Expand Up @@ -190,6 +244,27 @@ public Ordering.Expression bind(TableMetadata table, VariableSpecifications boun
return new Ordering.Ann(column, value, direction);
}
}

public static class Bm25 implements Expression
{
final ColumnIdentifier columnId;
final Term.Raw queryValue;

Bm25(ColumnIdentifier column, Term.Raw queryValue)
{
this.columnId = column;
this.queryValue = queryValue;
}

@Override
public Ordering.Expression bind(TableMetadata table, VariableSpecifications boundNames, Direction direction)
{
ColumnMetadata column = table.getExistingColumn(columnId);
Term value = queryValue.prepare(table.keyspace, column);
value.collectMarkerSpecification(boundNames);
return new Ordering.Bm25(column, value, direction);
}
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/cql3/Relation.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public final Restriction toRestriction(TableMetadata table, VariableSpecificatio
return newLikeRestriction(table, boundNames, relationType);
case ANN:
return newAnnRestriction(table, boundNames);
case BM25:
return newBm25Restriction(table, boundNames);
case ANALYZER_MATCHES:
return newAnalyzerMatchesRestriction(table, boundNames);
default: throw invalidRequest("Unsupported \"!=\" relation: %s", this);
Expand Down Expand Up @@ -296,6 +298,11 @@ protected abstract Restriction newSliceRestriction(TableMetadata table,
*/
protected abstract Restriction newAnnRestriction(TableMetadata table, VariableSpecifications boundNames);

/**
* Creates a new BM25 restriction instance.
*/
protected abstract Restriction newBm25Restriction(TableMetadata table, VariableSpecifications boundNames);

/**
* Creates a new Analyzer Matches restriction instance.
*/
Expand Down
36 changes: 35 additions & 1 deletion src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.cassandra.db.marshal.VectorType;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.index.sai.analyzer.AnalyzerEqOperatorSupport;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.cql3.Term.Raw;
Expand All @@ -33,6 +36,7 @@
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.ClientWarn;

import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
Expand Down Expand Up @@ -191,7 +195,29 @@ protected Restriction newEQRestriction(TableMetadata table, VariableSpecificatio
if (mapKey == null)
{
Term term = toTerm(toReceivers(columnDef), value, table.keyspace, boundNames);
return new SingleColumnRestriction.EQRestriction(columnDef, term);
// Leave the restriction as EQ if no analyzed index in backwards compatibility mode is present
var ebi = IndexRegistry.obtain(table).getEqBehavior(columnDef);
if (ebi.behavior == IndexRegistry.EqBehavior.EQ)
return new SingleColumnRestriction.EQRestriction(columnDef, term);

// the index is configured to transform EQ into MATCH for backwards compatibility
var matchIndexName = ebi.matchIndex.getIndexMetadata() == null ? "Unknown" : ebi.matchIndex.getIndexMetadata().name;
if (ebi.behavior == IndexRegistry.EqBehavior.MATCH)
{
ClientWarn.instance.warn(String.format(AnalyzerEqOperatorSupport.EQ_RESTRICTION_ON_ANALYZED_WARNING,
columnDef.toString(),
matchIndexName),
columnDef);
return new SingleColumnRestriction.AnalyzerMatchesRestriction(columnDef, term);
}

// multiple indexes support EQ, this is unsupported
assert ebi.behavior == IndexRegistry.EqBehavior.AMBIGUOUS;
var eqIndexName = ebi.eqIndex.getIndexMetadata() == null ? "Unknown" : ebi.eqIndex.getIndexMetadata().name;
throw invalidRequest(AnalyzerEqOperatorSupport.EQ_AMBIGUOUS_ERROR,
columnDef.toString(),
matchIndexName,
eqIndexName);
}
List<? extends ColumnSpecification> receivers = toReceivers(columnDef);
Term entryKey = toTerm(Collections.singletonList(receivers.get(0)), mapKey, table.keyspace, boundNames);
Expand Down Expand Up @@ -333,6 +359,14 @@ protected Restriction newAnnRestriction(TableMetadata table, VariableSpecificati
return new SingleColumnRestriction.AnnRestriction(columnDef, term);
}

@Override
protected Restriction newBm25Restriction(TableMetadata table, VariableSpecifications boundNames)
{
ColumnMetadata columnDef = table.getExistingColumn(entity);
Term term = toTerm(toReceivers(columnDef), value, table.keyspace, boundNames);
return new SingleColumnRestriction.Bm25Restriction(columnDef, term);
}

@Override
protected Restriction newAnalyzerMatchesRestriction(TableMetadata table, VariableSpecifications boundNames)
{
Expand Down
8 changes: 7 additions & 1 deletion src/java/org/apache/cassandra/cql3/TokenRelation.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ protected Restriction newLikeRestriction(TableMetadata table, VariableSpecificat
@Override
protected Restriction newAnnRestriction(TableMetadata table, VariableSpecifications boundNames)
{
throw invalidRequest("%s cannot be used for toekn relations", operator());
throw invalidRequest("%s cannot be used for token relations", operator());
}

@Override
protected Restriction newBm25Restriction(TableMetadata table, VariableSpecifications boundNames)
{
throw invalidRequest("%s cannot be used for token relations", operator());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public ClusteringColumnRestrictions.Builder addRestriction(Restriction restricti
SingleRestriction lastRestriction = restrictions.lastRestriction();
ColumnMetadata lastRestrictionStart = lastRestriction.getFirstColumn();
ColumnMetadata newRestrictionStart = newRestriction.getFirstColumn();
restrictions.addRestriction(newRestriction, isDisjunction, indexRegistry);
restrictions.addRestriction(newRestriction, isDisjunction);

checkFalse(lastRestriction.isSlice() && newRestrictionStart.position() > lastRestrictionStart.position(),
"Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
Expand All @@ -205,7 +205,7 @@ public ClusteringColumnRestrictions.Builder addRestriction(Restriction restricti
}
else
{
restrictions.addRestriction(newRestriction, isDisjunction, indexRegistry);
restrictions.addRestriction(newRestriction, isDisjunction);
}

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public PartitionKeyRestrictions build(IndexRegistry indexRegistry, boolean isDis
if (restriction.isOnToken())
return buildWithTokens(restrictionSet, i, indexRegistry);

restrictionSet.addRestriction((SingleRestriction) restriction, isDisjunction, indexRegistry);
restrictionSet.addRestriction((SingleRestriction) restriction, isDisjunction);
}

return buildPartitionKeyRestrictions(restrictionSet);
Expand Down
35 changes: 13 additions & 22 deletions src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,45 +398,36 @@ private Builder()
{
}

public void addRestriction(SingleRestriction restriction, boolean isDisjunction, IndexRegistry indexRegistry)
public void addRestriction(SingleRestriction restriction, boolean isDisjunction)
{
List<ColumnMetadata> columnDefs = restriction.getColumnDefs();

if (isDisjunction)
{
// If this restriction is part of a disjunction query then we don't want
// to merge the restrictions (if that is possible), we just add the
// restriction to the set of restrictions for the column.
// to merge the restrictions, we just add the new restriction
addRestrictionForColumns(columnDefs, restriction, null);
}
else
{
// In some special cases such as EQ in analyzed index we need to skip merging the restriction,
// so we can send multiple EQ restrictions to the index.
if (restriction.skipMerge(indexRegistry))
{
addRestrictionForColumns(columnDefs, restriction, null);
return;
}

// If this restriction isn't part of a disjunction then we need to get
// the set of existing restrictions for the column and merge them with the
// new restriction
// ANDed together restrictions against the same columns should be merged.
Set<SingleRestriction> existingRestrictions = getRestrictions(newRestrictions, columnDefs);

SingleRestriction merged = restriction;
Set<SingleRestriction> replacedRestrictions = new HashSet<>();

for (SingleRestriction existing : existingRestrictions)
// merge the new restriction into an existing one. note that there is only ever a single
// restriction (per column), UNLESS one is ORDER BY BM25 and the other is MATCH.
for (var existing : existingRestrictions)
{
if (!existing.skipMerge(indexRegistry))
// shouldMerge exists for the BM25/MATCH case
if (existing.shouldMerge(restriction))
{
merged = existing.mergeWith(merged);
replacedRestrictions.add(existing);
var merged = existing.mergeWith(restriction);
addRestrictionForColumns(merged.getColumnDefs(), merged, Set.of(existing));
return;
}
}

addRestrictionForColumns(merged.getColumnDefs(), merged, replacedRestrictions);
// no existing restrictions that we should merge the new one with, add a new one
addRestrictionForColumns(columnDefs, restriction, null);
}
}

Expand Down
Loading

0 comments on commit b0cdc37

Please sign in to comment.