From 477151b1fc1bcd96c85378fa1e70936c5dafcaa3 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 12 Jun 2024 15:41:27 +0000 Subject: [PATCH] added examples for spark and pyspark testing --- .../scala/org/apache/graphar/TestReader.scala | 95 +++++++++++++++++++ pyspark/tests/test_reader.py | 39 ++++++++ 2 files changed, 134 insertions(+) diff --git a/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala b/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala index 2aafb29d1..70b8f3e33 100644 --- a/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala +++ b/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala @@ -191,6 +191,101 @@ class ReaderSuite extends BaseTestSuite { ) } + test("read vertex chunks") { + // construct the vertex information + val prefix = testData + "/ldbc_sample/json/" + val vertex_yaml = prefix + "Person.vertex.yml" + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark) + + // construct the vertex reader + val reader = new VertexReader(prefix, vertex_info, spark) + + // test reading the number of vertices + assert(reader.readVerticesNumber() == 903) + val property_group = vertex_info.getPropertyGroup("gender") + + // test reading a single property chunk + val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0) + assert(single_chunk_df.columns.length == 4) + assert(single_chunk_df.count() == 100) + val cond = "gender = 'female'" + var df_pd = single_chunk_df.select("firstName", "gender").filter(cond) + + /** + * ==Physical Plan== + * (1) Filter (isnotnull(gender#2) AND (gender#2 = female)) + * +- *(1) ColumnarToRow + * +- BatchScan[firstName#0, gender#2] GarScan DataFilters: + * [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location: + * InMemoryFileIndex(1 + * paths)[file:/path/to/GraphAr/spark/src/test/resources/gar-test/l..., + * PartitionFilters: [], PushedFilters: [IsNotNull(gender), + * EqualTo(gender,female)], ReadSchema: + * struct, PushedFilters: + * [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: [] + */ + df_pd.explain() + df_pd.show() + + // test reading all chunks for a property group + val property_df = + reader.readVertexPropertyGroup(property_group) + assert(property_df.columns.length == 4) + assert(property_df.count() == 903) + df_pd = property_df.select("firstName", "gender").filter(cond) + + /** + * ==Physical Plan== + * (1) Filter (isnotnull(gender#31) AND (gender#31 = female)) + * +- *(1) ColumnarToRow + * +- BatchScan[firstName#29, gender#31] GarScan DataFilters: + * [isnotnull(gender#31), (gender#31 = female)], Format: gar, Location: + * InMemoryFileIndex(1 + * paths)[file:/path/to/code/cpp/GraphAr/spark/src/test/resources/gar-test/l..., + * PartitionFilters: [], PushedFilters: [IsNotNull(gender), + * EqualTo(gender,female)], ReadSchema: + * struct, PushedFilters: + * [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: [] + */ + df_pd.explain() + df_pd.show() + + // test reading chunks for multiple property groups + val property_group_1 = vertex_info.getPropertyGroup("id") + val property_groups = new java.util.ArrayList[PropertyGroup]() + property_groups.add(property_group_1) + property_groups.add(property_group) + val multiple_property_df = + reader.readMultipleVertexPropertyGroups(property_groups) + assert(multiple_property_df.columns.length == 5) + assert(multiple_property_df.count() == 903) + df_pd = multiple_property_df.filter(cond) + df_pd.explain() + df_pd.show() + // test reading chunks for all property groups and optionally adding indices + val vertex_df = reader.readAllVertexPropertyGroups() + assert(vertex_df.columns.length == 5) + assert(vertex_df.count() == 903) + df_pd = vertex_df.filter(cond) + df_pd.explain() + df_pd.show() + val vertex_df_with_index = reader.readAllVertexPropertyGroups() + assert(vertex_df_with_index.columns.length == 5) + assert(vertex_df_with_index.count() == 903) + df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender") + df_pd.explain() + df_pd.show() + + // throw an exception for non-existing property groups + val invalid_property_group = new PropertyGroup() + assertThrows[IllegalArgumentException]( + reader.readVertexPropertyChunk(invalid_property_group, 0) + ) + assertThrows[IllegalArgumentException]( + reader.readVertexPropertyGroup(invalid_property_group) + ) + } + test("read edge chunks") { // construct the edge information val prefix = testData + "/ldbc_sample/csv/" diff --git a/pyspark/tests/test_reader.py b/pyspark/tests/test_reader.py index a92a399f8..24eeac697 100644 --- a/pyspark/tests/test_reader.py +++ b/pyspark/tests/test_reader.py @@ -100,6 +100,45 @@ def test_edge_reader(spark): assert edge_reader.read_edges_number(0) == 0 assert edge_reader.read_offset(0).count() > 0 +def test_vertex_reader_with_json(spark): + initialize(spark) + + vertex_info = VertexInfo.load_vertex_info( + GRAPHAR_TESTS_EXAMPLES.joinpath("/ldbc_sample/json/") + .joinpath("Person.vertex.yml") + .absolute() + .__str__() + ) + vertex_reader = VertexReader.from_python( + GRAPHAR_TESTS_EXAMPLES.joinpath("/ldbc_sample/json/").absolute().__str__(), + vertex_info, + ) + assert VertexReader.from_scala(vertex_reader.to_scala()) is not None + assert vertex_reader.read_vertices_number() > 0 + assert ( + vertex_reader.read_vertex_property_group( + vertex_info.get_property_group("name") + ).count() + > 0 + ) + assert ( + vertex_reader.read_vertex_property_chunk( + vertex_info.get_property_groups()[0], 0 + ).count() + > 0 + ) + assert ( + vertex_reader.read_all_vertex_property_groups().count() + >= vertex_reader.read_vertex_property_group( + vertex_info.get_property_group("age") + ).count() + ) + assert ( + vertex_reader.read_multiple_vertex_property_groups( + [vertex_info.get_property_group("name")] + ).count() + > 0 + ) def test_graph_reader(spark): initialize(spark)