Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
chenhao-db committed Mar 18, 2024
1 parent 51e8634 commit e74886e
Show file tree
Hide file tree
Showing 13 changed files with 742 additions and 17 deletions.
6 changes: 6 additions & 0 deletions common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-variant_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.variant.Variant;

import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -104,8 +105,7 @@ public String debugString() {
*/
@Override
public String toString() {
// NOTE: the encoding is not yet implemented, this is not the final implementation.
return new String(value);
return new Variant(value, metadata).toJson();
}

/**
Expand Down
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2876,6 +2876,12 @@
},
"sqlState" : "22023"
},
"MALFORMED_VARIANT" : {
"message" : [
"Variant binary is malformed. Please check the data source is valid."
],
"sqlState" : "22023"
},
"MERGE_CARDINALITY_VIOLATION" : {
"message" : [
"The ON search condition of the MERGE statement matched a single row from the target table with multiple rows of the source table.",
Expand Down Expand Up @@ -4550,6 +4556,12 @@
],
"sqlState" : "42883"
},
"VARIANT_CONSTRUCTOR_SIZE_LIMIT" : {
"message" : [
"Cannot construct a Variant larger than 16 MiB. The maximum allowed size of a Variant value is 16 MiB."
],
"sqlState" : "22023"
},
"VARIANT_SIZE_LIMIT" : {
"message" : [
"Cannot build variant bigger than <sizeLimit> in <functionName>.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.variant;

import scala.collection.immutable.Map$;

import org.apache.spark.QueryContext;
import org.apache.spark.SparkRuntimeException;

public class MalformedVariantException extends SparkRuntimeException {
public MalformedVariantException() {
super("MALFORMED_VARIANT", Map$.MODULE$.<String, String>empty(), null, new QueryContext[]{}, "");
}
}
91 changes: 91 additions & 0 deletions common/variant/src/main/java/org/apache/spark/variant/Variant.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.spark.variant;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;

import java.io.CharArrayWriter;
import java.io.IOException;

import static org.apache.spark.variant.VariantUtil.*;

/**
* This class is structurally equivalent to {@link org.apache.spark.unsafe.types.VariantVal}. We
* define a new class to avoid depending on or modifying Spark.
Expand All @@ -28,6 +36,15 @@ public final class Variant {
public Variant(byte[] value, byte[] metadata) {
this.value = value;
this.metadata = metadata;
// There is currently only one allowed version.
if (metadata.length < 1 || (metadata[0] & VERSION_MASK) != VERSION) {
throw new MalformedVariantException();
}
// Don't attempt to use a Variant larger than 16 MiB. We'll never produce one, and it risks
// memory instability.
if (metadata.length > SIZE_LIMIT || value.length > SIZE_LIMIT) {
throw new VariantConstructorSizeLimitException();
}
}

public byte[] getValue() {
Expand All @@ -37,4 +54,78 @@ public byte[] getValue() {
public byte[] getMetadata() {
return metadata;
}

// Stringify the variant in JSON format.
// Throw `MalformedVariantException` if the variant is malformed.
public String toJson() {
StringBuilder sb = new StringBuilder();
toJsonImpl(value, metadata, 0, sb);
return sb.toString();
}

// Escape a string so that it can be pasted into JSON structure.
// For example, if `str` only contains a new-line character, then the result content is "\n"
// (4 characters).
static String escapeJson(String str) {
try (CharArrayWriter writer = new CharArrayWriter();
JsonGenerator gen = new JsonFactory().createGenerator(writer)) {
gen.writeString(str);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

static void toJsonImpl(byte[] value, byte[] metadata, int pos, StringBuilder sb) {
switch (VariantUtil.getType(value, pos)) {
case OBJECT:
handleObject(value, pos, (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> {
sb.append('{');
for (int i = 0; i < size; ++i) {
int id = readUnsigned(value, idStart + idSize * i, idSize);
int offset = readUnsigned(value, offsetStart + offsetSize * i, offsetSize);
int elementPos = dataStart + offset;
if (i != 0) sb.append(',');
sb.append(escapeJson(getMetadataKey(metadata, id)));
sb.append(':');
toJsonImpl(value, metadata, elementPos, sb);
}
sb.append('}');
return null;
});
break;
case ARRAY:
handleArray(value, pos, (size, offsetSize, offsetStart, dataStart) -> {
sb.append('[');
for (int i = 0; i < size; ++i) {
int offset = readUnsigned(value, offsetStart + offsetSize * i, offsetSize);
int elementPos = dataStart + offset;
if (i != 0) sb.append(',');
toJsonImpl(value, metadata, elementPos, sb);
}
sb.append(']');
return null;
});
break;
case NULL:
sb.append("null");
break;
case BOOLEAN:
sb.append(VariantUtil.getBoolean(value, pos));
break;
case LONG:
sb.append(VariantUtil.getLong(value, pos));
break;
case STRING:
sb.append(escapeJson(VariantUtil.getString(value, pos)));
break;
case DOUBLE:
sb.append(VariantUtil.getDouble(value, pos));
break;
case DECIMAL:
sb.append(VariantUtil.getDecimal(value, pos).toPlainString());
break;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.variant;

import scala.collection.immutable.Map$;

import org.apache.spark.QueryContext;
import org.apache.spark.SparkRuntimeException;

/**
* An exception indicating that an external caller tried to call the Variant constructor with value
* or metadata exceeding the 16MiB size limit. We will never construct a Variant this large, so it
* should only be possible to encounter this exception when reading a Variant produced by another
* tool.
*/
public class VariantConstructorSizeLimitException extends SparkRuntimeException {
public VariantConstructorSizeLimitException() {
super("VARIANT_CONSTRUCTOR_SIZE_LIMIT",
Map$.MODULE$.<String, String>empty(), null, new QueryContext[]{}, "");
}
}
Loading

0 comments on commit e74886e

Please sign in to comment.