Skip to content

Commit

Permalink
added examples for spark and pyspark testing
Browse files Browse the repository at this point in the history
  • Loading branch information
lepto2014 committed Jun 12, 2024
1 parent 6289efc commit 477151b
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,101 @@ class ReaderSuite extends BaseTestSuite {
)
}

test("read vertex chunks") {
// construct the vertex information
val prefix = testData + "/ldbc_sample/json/"
val vertex_yaml = prefix + "Person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)

// construct the vertex reader
val reader = new VertexReader(prefix, vertex_info, spark)

// test reading the number of vertices
assert(reader.readVerticesNumber() == 903)
val property_group = vertex_info.getPropertyGroup("gender")

// test reading a single property chunk
val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
assert(single_chunk_df.columns.length == 4)
assert(single_chunk_df.count() == 100)
val cond = "gender = 'female'"
var df_pd = single_chunk_df.select("firstName", "gender").filter(cond)

/**
* ==Physical Plan==
* (1) Filter (isnotnull(gender#2) AND (gender#2 = female))
* +- *(1) ColumnarToRow
* +- BatchScan[firstName#0, gender#2] GarScan DataFilters:
* [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location:
* InMemoryFileIndex(1
* paths)[file:/path/to/GraphAr/spark/src/test/resources/gar-test/l...,
* PartitionFilters: [], PushedFilters: [IsNotNull(gender),
* EqualTo(gender,female)], ReadSchema:
* struct<firstName:string,gender:string>, PushedFilters:
* [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
*/
df_pd.explain()
df_pd.show()

// test reading all chunks for a property group
val property_df =
reader.readVertexPropertyGroup(property_group)
assert(property_df.columns.length == 4)
assert(property_df.count() == 903)
df_pd = property_df.select("firstName", "gender").filter(cond)

/**
* ==Physical Plan==
* (1) Filter (isnotnull(gender#31) AND (gender#31 = female))
* +- *(1) ColumnarToRow
* +- BatchScan[firstName#29, gender#31] GarScan DataFilters:
* [isnotnull(gender#31), (gender#31 = female)], Format: gar, Location:
* InMemoryFileIndex(1
* paths)[file:/path/to/code/cpp/GraphAr/spark/src/test/resources/gar-test/l...,
* PartitionFilters: [], PushedFilters: [IsNotNull(gender),
* EqualTo(gender,female)], ReadSchema:
* struct<firstName:string,gender:string>, PushedFilters:
* [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
*/
df_pd.explain()
df_pd.show()

// test reading chunks for multiple property groups
val property_group_1 = vertex_info.getPropertyGroup("id")
val property_groups = new java.util.ArrayList[PropertyGroup]()
property_groups.add(property_group_1)
property_groups.add(property_group)
val multiple_property_df =
reader.readMultipleVertexPropertyGroups(property_groups)
assert(multiple_property_df.columns.length == 5)
assert(multiple_property_df.count() == 903)
df_pd = multiple_property_df.filter(cond)
df_pd.explain()
df_pd.show()
// test reading chunks for all property groups and optionally adding indices
val vertex_df = reader.readAllVertexPropertyGroups()
assert(vertex_df.columns.length == 5)
assert(vertex_df.count() == 903)
df_pd = vertex_df.filter(cond)
df_pd.explain()
df_pd.show()
val vertex_df_with_index = reader.readAllVertexPropertyGroups()
assert(vertex_df_with_index.columns.length == 5)
assert(vertex_df_with_index.count() == 903)
df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender")
df_pd.explain()
df_pd.show()

// throw an exception for non-existing property groups
val invalid_property_group = new PropertyGroup()
assertThrows[IllegalArgumentException](
reader.readVertexPropertyChunk(invalid_property_group, 0)
)
assertThrows[IllegalArgumentException](
reader.readVertexPropertyGroup(invalid_property_group)
)
}

test("read edge chunks") {
// construct the edge information
val prefix = testData + "/ldbc_sample/csv/"
Expand Down
39 changes: 39 additions & 0 deletions pyspark/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,45 @@ def test_edge_reader(spark):
assert edge_reader.read_edges_number(0) == 0
assert edge_reader.read_offset(0).count() > 0

def test_vertex_reader_with_json(spark):
initialize(spark)

vertex_info = VertexInfo.load_vertex_info(
GRAPHAR_TESTS_EXAMPLES.joinpath("/ldbc_sample/json/")
.joinpath("Person.vertex.yml")
.absolute()
.__str__()
)
vertex_reader = VertexReader.from_python(
GRAPHAR_TESTS_EXAMPLES.joinpath("/ldbc_sample/json/").absolute().__str__(),
vertex_info,
)
assert VertexReader.from_scala(vertex_reader.to_scala()) is not None
assert vertex_reader.read_vertices_number() > 0
assert (
vertex_reader.read_vertex_property_group(
vertex_info.get_property_group("name")
).count()
> 0
)
assert (
vertex_reader.read_vertex_property_chunk(
vertex_info.get_property_groups()[0], 0
).count()
> 0
)
assert (
vertex_reader.read_all_vertex_property_groups().count()
>= vertex_reader.read_vertex_property_group(
vertex_info.get_property_group("age")
).count()
)
assert (
vertex_reader.read_multiple_vertex_property_groups(
[vertex_info.get_property_group("name")]
).count()
> 0
)

def test_graph_reader(spark):
initialize(spark)
Expand Down

0 comments on commit 477151b

Please sign in to comment.