diff --git a/tsumugi-python/tsumugi/proto/repository_pb2.py b/tsumugi-python/tsumugi/proto/repository_pb2.py new file mode 100644 index 0000000..6ae7a7e --- /dev/null +++ b/tsumugi-python/tsumugi/proto/repository_pb2.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: repository.proto +# Protobuf Python Version: 5.27.1 +"""Generated protocol buffer code.""" + +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, 5, 27, 1, "", "repository.proto" +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x10repository.proto\x12\x1c\x63om.ssinchenko.tsumugi.proto"\xc8\x01\n\nRepository\x12U\n\x0b\x66ile_system\x18\x01 \x01(\x0b\x32\x32.com.ssinchenko.tsumugi.proto.FileSystemRepositoryH\x00R\nfileSystem\x12U\n\x0bspark_table\x18\x02 \x01(\x0b\x32\x32.com.ssinchenko.tsumugi.proto.SparkTableRepositoryH\x00R\nsparkTableB\x0c\n\nrepository"*\n\x14\x46ileSystemRepository\x12\x12\n\x04path\x18\x01 \x01(\tR\x04path"5\n\x14SparkTableRepository\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName"\xae\x01\n\tResultKey\x12!\n\x0c\x64\x61taset_date\x18\x01 \x01(\x03R\x0b\x64\x61tasetDate\x12\x45\n\x04tags\x18\x02 \x03(\x0b\x32\x31.com.ssinchenko.tsumugi.proto.ResultKey.TagsEntryR\x04tags\x1a\x37\n\tTagsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\xdb\x01\n com.com.ssinchenko.tsumugi.protoB\x0fRepositoryProtoH\x01P\x01Z\rtsumugi/proto\xa0\x01\x01\xa2\x02\x04\x43STP\xaa\x02\x1c\x43om.Ssinchenko.Tsumugi.Proto\xca\x02\x1c\x43om\\Ssinchenko\\Tsumugi\\Proto\xe2\x02(Com\\Ssinchenko\\Tsumugi\\Proto\\GPBMetadata\xea\x02\x1f\x43om::Ssinchenko::Tsumugi::Protob\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "repository_pb2", _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals["DESCRIPTOR"]._loaded_options = None + _globals[ + "DESCRIPTOR" + ]._serialized_options = b"\n com.com.ssinchenko.tsumugi.protoB\017RepositoryProtoH\001P\001Z\rtsumugi/proto\240\001\001\242\002\004CSTP\252\002\034Com.Ssinchenko.Tsumugi.Proto\312\002\034Com\\Ssinchenko\\Tsumugi\\Proto\342\002(Com\\Ssinchenko\\Tsumugi\\Proto\\GPBMetadata\352\002\037Com::Ssinchenko::Tsumugi::Proto" + _globals["_RESULTKEY_TAGSENTRY"]._loaded_options = None + _globals["_RESULTKEY_TAGSENTRY"]._serialized_options = b"8\001" + _globals["_REPOSITORY"]._serialized_start = 51 + _globals["_REPOSITORY"]._serialized_end = 251 + _globals["_FILESYSTEMREPOSITORY"]._serialized_start = 253 + _globals["_FILESYSTEMREPOSITORY"]._serialized_end = 295 + _globals["_SPARKTABLEREPOSITORY"]._serialized_start = 297 + _globals["_SPARKTABLEREPOSITORY"]._serialized_end = 350 + _globals["_RESULTKEY"]._serialized_start = 353 + _globals["_RESULTKEY"]._serialized_end = 527 + _globals["_RESULTKEY_TAGSENTRY"]._serialized_start = 472 + _globals["_RESULTKEY_TAGSENTRY"]._serialized_end = 527 +# @@protoc_insertion_point(module_scope) diff --git a/tsumugi-python/tsumugi/proto/repository_pb2.pyi b/tsumugi-python/tsumugi/proto/repository_pb2.pyi new file mode 100644 index 0000000..feded1f --- /dev/null +++ b/tsumugi-python/tsumugi/proto/repository_pb2.pyi @@ -0,0 +1,57 @@ +from google.protobuf.internal import containers as _containers +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ( + ClassVar as _ClassVar, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) + +DESCRIPTOR: _descriptor.FileDescriptor + +class Repository(_message.Message): + __slots__ = ("file_system", "spark_table") + FILE_SYSTEM_FIELD_NUMBER: _ClassVar[int] + SPARK_TABLE_FIELD_NUMBER: _ClassVar[int] + file_system: FileSystemRepository + spark_table: SparkTableRepository + def __init__( + self, + file_system: _Optional[_Union[FileSystemRepository, _Mapping]] = ..., + spark_table: _Optional[_Union[SparkTableRepository, _Mapping]] = ..., + ) -> None: ... + +class FileSystemRepository(_message.Message): + __slots__ = ("path",) + PATH_FIELD_NUMBER: _ClassVar[int] + path: str + def __init__(self, path: _Optional[str] = ...) -> None: ... + +class SparkTableRepository(_message.Message): + __slots__ = ("table_name",) + TABLE_NAME_FIELD_NUMBER: _ClassVar[int] + table_name: str + def __init__(self, table_name: _Optional[str] = ...) -> None: ... + +class ResultKey(_message.Message): + __slots__ = ("dataset_date", "tags") + class TagsEntry(_message.Message): + __slots__ = ("key", "value") + KEY_FIELD_NUMBER: _ClassVar[int] + VALUE_FIELD_NUMBER: _ClassVar[int] + key: str + value: str + def __init__( + self, key: _Optional[str] = ..., value: _Optional[str] = ... + ) -> None: ... + + DATASET_DATE_FIELD_NUMBER: _ClassVar[int] + TAGS_FIELD_NUMBER: _ClassVar[int] + dataset_date: int + tags: _containers.ScalarMap[str, str] + def __init__( + self, + dataset_date: _Optional[int] = ..., + tags: _Optional[_Mapping[str, str]] = ..., + ) -> None: ... diff --git a/tsumugi-python/tsumugi/proto/repository_pb2_grpc.py b/tsumugi-python/tsumugi/proto/repository_pb2_grpc.py new file mode 100644 index 0000000..8a4eb2a --- /dev/null +++ b/tsumugi-python/tsumugi/proto/repository_pb2_grpc.py @@ -0,0 +1,3 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" + diff --git a/tsumugi-python/tsumugi/proto/suite_pb2.py b/tsumugi-python/tsumugi/proto/suite_pb2.py index ab72236..318e493 100644 --- a/tsumugi-python/tsumugi/proto/suite_pb2.py +++ b/tsumugi-python/tsumugi/proto/suite_pb2.py @@ -22,7 +22,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x0bsuite.proto\x12\x1c\x63om.ssinchenko.tsumugi.proto\x1a\x0f\x61nalyzers.proto\x1a\x10strategies.proto"\xcd\x04\n\x05\x43heck\x12H\n\ncheckLevel\x18\x01 \x01(\x0e\x32(.com.ssinchenko.tsumugi.proto.CheckLevelR\ncheckLevel\x12 \n\x0b\x64\x65scription\x18\x02 \x01(\tR\x0b\x64\x65scription\x12P\n\x0b\x63onstraints\x18\x03 \x03(\x0b\x32..com.ssinchenko.tsumugi.proto.Check.ConstraintR\x0b\x63onstraints\x1a\xc9\x02\n\nConstraint\x12\x42\n\x08\x61nalyzer\x18\x01 \x01(\x0b\x32&.com.ssinchenko.tsumugi.proto.AnalyzerR\x08\x61nalyzer\x12+\n\x10long_expectation\x18\x02 \x01(\x03H\x00R\x0flongExpectation\x12/\n\x12\x64ouble_expectation\x18\x03 \x01(\x01H\x00R\x11\x64oubleExpectation\x12\x46\n\x04sign\x18\x04 \x01(\x0e\x32\x32.com.ssinchenko.tsumugi.proto.Check.ComparisonSignR\x04sign\x12\x17\n\x04hint\x18\x05 \x01(\tH\x01R\x04hint\x88\x01\x01\x12\x17\n\x04name\x18\x06 \x01(\tH\x02R\x04name\x88\x01\x01\x42\r\n\x0b\x65xpectationB\x07\n\x05_hintB\x07\n\x05_name":\n\x0e\x43omparisonSign\x12\x06\n\x02GT\x10\x00\x12\x07\n\x03GET\x10\x01\x12\x06\n\x02\x45Q\x10\x02\x12\x06\n\x02LT\x10\x03\x12\x07\n\x03LET\x10\x04"\xd9\x05\n\x10\x41nomalyDetection\x12t\n\x1a\x61nomaly_detection_strategy\x18\x01 \x01(\x0b\x32\x36.com.ssinchenko.tsumugi.proto.AnomalyDetectionStrategyR\x18\x61nomalyDetectionStrategy\x12\x42\n\x08\x61nalyzer\x18\x02 \x01(\x0b\x32&.com.ssinchenko.tsumugi.proto.AnalyzerR\x08\x61nalyzer\x12^\n\x06\x63onfig\x18\x03 \x01(\x0b\x32\x41.com.ssinchenko.tsumugi.proto.AnomalyDetection.AnomalyCheckConfigH\x00R\x06\x63onfig\x88\x01\x01\x1a\x9f\x03\n\x12\x41nomalyCheckConfig\x12>\n\x05level\x18\x01 \x01(\x0e\x32(.com.ssinchenko.tsumugi.proto.CheckLevelR\x05level\x12 \n\x0b\x64\x65scription\x18\x02 \x01(\tR\x0b\x64\x65scription\x12|\n\x0fwith_tag_values\x18\x03 \x03(\x0b\x32T.com.ssinchenko.tsumugi.proto.AnomalyDetection.AnomalyCheckConfig.WithTagValuesEntryR\rwithTagValues\x12"\n\nafter_date\x18\x04 \x01(\x03H\x00R\tafterDate\x88\x01\x01\x12$\n\x0b\x62\x65\x66ore_date\x18\x05 \x01(\x03H\x01R\nbeforeDate\x88\x01\x01\x1a@\n\x12WithTagValuesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\r\n\x0b_after_dateB\x0e\n\x0c_before_dateB\t\n\x07_config"\x81\x08\n\x11VerificationSuite\x12\x17\n\x04\x64\x61ta\x18\x01 \x01(\x0cH\x01R\x04\x64\x61ta\x88\x01\x01\x12;\n\x06\x63hecks\x18\x02 \x03(\x0b\x32#.com.ssinchenko.tsumugi.proto.CheckR\x06\x63hecks\x12U\n\x12required_analyzers\x18\x03 \x03(\x0b\x32&.com.ssinchenko.tsumugi.proto.AnalyzerR\x11requiredAnalyzers\x12|\n\x16\x66ile_system_repository\x18\x04 \x01(\x0b\x32\x44.com.ssinchenko.tsumugi.proto.VerificationSuite.FileSystemRepositoryH\x00R\x14\x66ileSystemRepository\x12|\n\x16spark_table_repository\x18\x05 \x01(\x0b\x32\x44.com.ssinchenko.tsumugi.proto.VerificationSuite.SparkTableRepositoryH\x00R\x14sparkTableRepository\x12]\n\nresult_key\x18\x06 \x01(\x0b\x32\x39.com.ssinchenko.tsumugi.proto.VerificationSuite.ResultKeyH\x02R\tresultKey\x88\x01\x01\x12]\n\x12\x61nomaly_detections\x18\x07 \x03(\x0b\x32..com.ssinchenko.tsumugi.proto.AnomalyDetectionR\x11\x61nomalyDetections\x12\x39\n\x19\x63ompute_row_level_results\x18\x08 \x01(\x08R\x16\x63omputeRowLevelResults\x1a*\n\x14\x46ileSystemRepository\x12\x12\n\x04path\x18\x01 \x01(\tR\x04path\x1a\x35\n\x14SparkTableRepository\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x1a\xc0\x01\n\tResultKey\x12!\n\x0c\x64\x61taset_date\x18\x01 \x01(\x03R\x0b\x64\x61tasetDate\x12W\n\x04tags\x18\x02 \x03(\x0b\x32\x43.com.ssinchenko.tsumugi.proto.VerificationSuite.ResultKey.TagsEntryR\x04tags\x1a\x37\n\tTagsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x0c\n\nrepositoryB\x07\n\x05_dataB\r\n\x0b_result_key*$\n\nCheckLevel\x12\t\n\x05\x45rror\x10\x00\x12\x0b\n\x07Warning\x10\x01\x42\xd6\x01\n com.com.ssinchenko.tsumugi.protoB\nSuiteProtoH\x01P\x01Z\rtsumugi/proto\xa0\x01\x01\xa2\x02\x04\x43STP\xaa\x02\x1c\x43om.Ssinchenko.Tsumugi.Proto\xca\x02\x1c\x43om\\Ssinchenko\\Tsumugi\\Proto\xe2\x02(Com\\Ssinchenko\\Tsumugi\\Proto\\GPBMetadata\xea\x02\x1f\x43om::Ssinchenko::Tsumugi::Protob\x06proto3' + b'\n\x0bsuite.proto\x12\x1c\x63om.ssinchenko.tsumugi.proto\x1a\x0f\x61nalyzers.proto\x1a\x10strategies.proto\x1a\x10repository.proto"\xcd\x04\n\x05\x43heck\x12H\n\ncheckLevel\x18\x01 \x01(\x0e\x32(.com.ssinchenko.tsumugi.proto.CheckLevelR\ncheckLevel\x12 \n\x0b\x64\x65scription\x18\x02 \x01(\tR\x0b\x64\x65scription\x12P\n\x0b\x63onstraints\x18\x03 \x03(\x0b\x32..com.ssinchenko.tsumugi.proto.Check.ConstraintR\x0b\x63onstraints\x1a\xc9\x02\n\nConstraint\x12\x42\n\x08\x61nalyzer\x18\x01 \x01(\x0b\x32&.com.ssinchenko.tsumugi.proto.AnalyzerR\x08\x61nalyzer\x12+\n\x10long_expectation\x18\x02 \x01(\x03H\x00R\x0flongExpectation\x12/\n\x12\x64ouble_expectation\x18\x03 \x01(\x01H\x00R\x11\x64oubleExpectation\x12\x46\n\x04sign\x18\x04 \x01(\x0e\x32\x32.com.ssinchenko.tsumugi.proto.Check.ComparisonSignR\x04sign\x12\x17\n\x04hint\x18\x05 \x01(\tH\x01R\x04hint\x88\x01\x01\x12\x17\n\x04name\x18\x06 \x01(\tH\x02R\x04name\x88\x01\x01\x42\r\n\x0b\x65xpectationB\x07\n\x05_hintB\x07\n\x05_name":\n\x0e\x43omparisonSign\x12\x06\n\x02GT\x10\x00\x12\x07\n\x03GET\x10\x01\x12\x06\n\x02\x45Q\x10\x02\x12\x06\n\x02LT\x10\x03\x12\x07\n\x03LET\x10\x04"\xd9\x05\n\x10\x41nomalyDetection\x12t\n\x1a\x61nomaly_detection_strategy\x18\x01 \x01(\x0b\x32\x36.com.ssinchenko.tsumugi.proto.AnomalyDetectionStrategyR\x18\x61nomalyDetectionStrategy\x12\x42\n\x08\x61nalyzer\x18\x02 \x01(\x0b\x32&.com.ssinchenko.tsumugi.proto.AnalyzerR\x08\x61nalyzer\x12^\n\x06\x63onfig\x18\x03 \x01(\x0b\x32\x41.com.ssinchenko.tsumugi.proto.AnomalyDetection.AnomalyCheckConfigH\x00R\x06\x63onfig\x88\x01\x01\x1a\x9f\x03\n\x12\x41nomalyCheckConfig\x12>\n\x05level\x18\x01 \x01(\x0e\x32(.com.ssinchenko.tsumugi.proto.CheckLevelR\x05level\x12 \n\x0b\x64\x65scription\x18\x02 \x01(\tR\x0b\x64\x65scription\x12|\n\x0fwith_tag_values\x18\x03 \x03(\x0b\x32T.com.ssinchenko.tsumugi.proto.AnomalyDetection.AnomalyCheckConfig.WithTagValuesEntryR\rwithTagValues\x12"\n\nafter_date\x18\x04 \x01(\x03H\x00R\tafterDate\x88\x01\x01\x12$\n\x0b\x62\x65\x66ore_date\x18\x05 \x01(\x03H\x01R\nbeforeDate\x88\x01\x01\x1a@\n\x12WithTagValuesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\r\n\x0b_after_dateB\x0e\n\x0c_before_dateB\t\n\x07_config"\x9d\x04\n\x11VerificationSuite\x12\x17\n\x04\x64\x61ta\x18\x01 \x01(\x0cH\x00R\x04\x64\x61ta\x88\x01\x01\x12;\n\x06\x63hecks\x18\x02 \x03(\x0b\x32#.com.ssinchenko.tsumugi.proto.CheckR\x06\x63hecks\x12U\n\x12required_analyzers\x18\x03 \x03(\x0b\x32&.com.ssinchenko.tsumugi.proto.AnalyzerR\x11requiredAnalyzers\x12M\n\nrepository\x18\x04 \x01(\x0b\x32(.com.ssinchenko.tsumugi.proto.RepositoryH\x01R\nrepository\x88\x01\x01\x12K\n\nresult_key\x18\x05 \x01(\x0b\x32\'.com.ssinchenko.tsumugi.proto.ResultKeyH\x02R\tresultKey\x88\x01\x01\x12]\n\x12\x61nomaly_detections\x18\x06 \x03(\x0b\x32..com.ssinchenko.tsumugi.proto.AnomalyDetectionR\x11\x61nomalyDetections\x12\x39\n\x19\x63ompute_row_level_results\x18\x07 \x01(\x08R\x16\x63omputeRowLevelResultsB\x07\n\x05_dataB\r\n\x0b_repositoryB\r\n\x0b_result_key*$\n\nCheckLevel\x12\t\n\x05\x45rror\x10\x00\x12\x0b\n\x07Warning\x10\x01\x42\xd6\x01\n com.com.ssinchenko.tsumugi.protoB\nSuiteProtoH\x01P\x01Z\rtsumugi/proto\xa0\x01\x01\xa2\x02\x04\x43STP\xaa\x02\x1c\x43om.Ssinchenko.Tsumugi.Proto\xca\x02\x1c\x43om\\Ssinchenko\\Tsumugi\\Proto\xe2\x02(Com\\Ssinchenko\\Tsumugi\\Proto\\GPBMetadata\xea\x02\x1f\x43om::Ssinchenko::Tsumugi::Protob\x06proto3' ) _globals = globals() @@ -39,34 +39,24 @@ _globals[ "_ANOMALYDETECTION_ANOMALYCHECKCONFIG_WITHTAGVALUESENTRY" ]._serialized_options = b"8\001" - _globals["_VERIFICATIONSUITE_RESULTKEY_TAGSENTRY"]._loaded_options = None - _globals["_VERIFICATIONSUITE_RESULTKEY_TAGSENTRY"]._serialized_options = b"8\001" - _globals["_CHECKLEVEL"]._serialized_start = 2432 - _globals["_CHECKLEVEL"]._serialized_end = 2468 - _globals["_CHECK"]._serialized_start = 81 - _globals["_CHECK"]._serialized_end = 670 - _globals["_CHECK_CONSTRAINT"]._serialized_start = 281 - _globals["_CHECK_CONSTRAINT"]._serialized_end = 610 - _globals["_CHECK_COMPARISONSIGN"]._serialized_start = 612 - _globals["_CHECK_COMPARISONSIGN"]._serialized_end = 670 - _globals["_ANOMALYDETECTION"]._serialized_start = 673 - _globals["_ANOMALYDETECTION"]._serialized_end = 1402 - _globals["_ANOMALYDETECTION_ANOMALYCHECKCONFIG"]._serialized_start = 976 - _globals["_ANOMALYDETECTION_ANOMALYCHECKCONFIG"]._serialized_end = 1391 + _globals["_CHECKLEVEL"]._serialized_start = 1966 + _globals["_CHECKLEVEL"]._serialized_end = 2002 + _globals["_CHECK"]._serialized_start = 99 + _globals["_CHECK"]._serialized_end = 688 + _globals["_CHECK_CONSTRAINT"]._serialized_start = 299 + _globals["_CHECK_CONSTRAINT"]._serialized_end = 628 + _globals["_CHECK_COMPARISONSIGN"]._serialized_start = 630 + _globals["_CHECK_COMPARISONSIGN"]._serialized_end = 688 + _globals["_ANOMALYDETECTION"]._serialized_start = 691 + _globals["_ANOMALYDETECTION"]._serialized_end = 1420 + _globals["_ANOMALYDETECTION_ANOMALYCHECKCONFIG"]._serialized_start = 994 + _globals["_ANOMALYDETECTION_ANOMALYCHECKCONFIG"]._serialized_end = 1409 _globals[ "_ANOMALYDETECTION_ANOMALYCHECKCONFIG_WITHTAGVALUESENTRY" - ]._serialized_start = 1296 + ]._serialized_start = 1314 _globals[ "_ANOMALYDETECTION_ANOMALYCHECKCONFIG_WITHTAGVALUESENTRY" - ]._serialized_end = 1360 - _globals["_VERIFICATIONSUITE"]._serialized_start = 1405 - _globals["_VERIFICATIONSUITE"]._serialized_end = 2430 - _globals["_VERIFICATIONSUITE_FILESYSTEMREPOSITORY"]._serialized_start = 2100 - _globals["_VERIFICATIONSUITE_FILESYSTEMREPOSITORY"]._serialized_end = 2142 - _globals["_VERIFICATIONSUITE_SPARKTABLEREPOSITORY"]._serialized_start = 2144 - _globals["_VERIFICATIONSUITE_SPARKTABLEREPOSITORY"]._serialized_end = 2197 - _globals["_VERIFICATIONSUITE_RESULTKEY"]._serialized_start = 2200 - _globals["_VERIFICATIONSUITE_RESULTKEY"]._serialized_end = 2392 - _globals["_VERIFICATIONSUITE_RESULTKEY_TAGSENTRY"]._serialized_start = 2337 - _globals["_VERIFICATIONSUITE_RESULTKEY_TAGSENTRY"]._serialized_end = 2392 + ]._serialized_end = 1378 + _globals["_VERIFICATIONSUITE"]._serialized_start = 1423 + _globals["_VERIFICATIONSUITE"]._serialized_end = 1964 # @@protoc_insertion_point(module_scope) diff --git a/tsumugi-python/tsumugi/proto/suite_pb2.pyi b/tsumugi-python/tsumugi/proto/suite_pb2.pyi index 377cc1e..350c166 100644 --- a/tsumugi-python/tsumugi/proto/suite_pb2.pyi +++ b/tsumugi-python/tsumugi/proto/suite_pb2.pyi @@ -1,5 +1,6 @@ import analyzers_pb2 as _analyzers_pb2 import strategies_pb2 as _strategies_pb2 +import repository_pb2 as _repository_pb2 from google.protobuf.internal import containers as _containers from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor @@ -140,51 +141,15 @@ class VerificationSuite(_message.Message): "data", "checks", "required_analyzers", - "file_system_repository", - "spark_table_repository", + "repository", "result_key", "anomaly_detections", "compute_row_level_results", ) - class FileSystemRepository(_message.Message): - __slots__ = ("path",) - PATH_FIELD_NUMBER: _ClassVar[int] - path: str - def __init__(self, path: _Optional[str] = ...) -> None: ... - - class SparkTableRepository(_message.Message): - __slots__ = ("table_name",) - TABLE_NAME_FIELD_NUMBER: _ClassVar[int] - table_name: str - def __init__(self, table_name: _Optional[str] = ...) -> None: ... - - class ResultKey(_message.Message): - __slots__ = ("dataset_date", "tags") - class TagsEntry(_message.Message): - __slots__ = ("key", "value") - KEY_FIELD_NUMBER: _ClassVar[int] - VALUE_FIELD_NUMBER: _ClassVar[int] - key: str - value: str - def __init__( - self, key: _Optional[str] = ..., value: _Optional[str] = ... - ) -> None: ... - - DATASET_DATE_FIELD_NUMBER: _ClassVar[int] - TAGS_FIELD_NUMBER: _ClassVar[int] - dataset_date: int - tags: _containers.ScalarMap[str, str] - def __init__( - self, - dataset_date: _Optional[int] = ..., - tags: _Optional[_Mapping[str, str]] = ..., - ) -> None: ... - DATA_FIELD_NUMBER: _ClassVar[int] CHECKS_FIELD_NUMBER: _ClassVar[int] REQUIRED_ANALYZERS_FIELD_NUMBER: _ClassVar[int] - FILE_SYSTEM_REPOSITORY_FIELD_NUMBER: _ClassVar[int] - SPARK_TABLE_REPOSITORY_FIELD_NUMBER: _ClassVar[int] + REPOSITORY_FIELD_NUMBER: _ClassVar[int] RESULT_KEY_FIELD_NUMBER: _ClassVar[int] ANOMALY_DETECTIONS_FIELD_NUMBER: _ClassVar[int] COMPUTE_ROW_LEVEL_RESULTS_FIELD_NUMBER: _ClassVar[int] @@ -193,9 +158,8 @@ class VerificationSuite(_message.Message): required_analyzers: _containers.RepeatedCompositeFieldContainer[ _analyzers_pb2.Analyzer ] - file_system_repository: VerificationSuite.FileSystemRepository - spark_table_repository: VerificationSuite.SparkTableRepository - result_key: VerificationSuite.ResultKey + repository: _repository_pb2.Repository + result_key: _repository_pb2.ResultKey anomaly_detections: _containers.RepeatedCompositeFieldContainer[AnomalyDetection] compute_row_level_results: bool def __init__( @@ -205,13 +169,8 @@ class VerificationSuite(_message.Message): required_analyzers: _Optional[ _Iterable[_Union[_analyzers_pb2.Analyzer, _Mapping]] ] = ..., - file_system_repository: _Optional[ - _Union[VerificationSuite.FileSystemRepository, _Mapping] - ] = ..., - spark_table_repository: _Optional[ - _Union[VerificationSuite.SparkTableRepository, _Mapping] - ] = ..., - result_key: _Optional[_Union[VerificationSuite.ResultKey, _Mapping]] = ..., + repository: _Optional[_Union[_repository_pb2.Repository, _Mapping]] = ..., + result_key: _Optional[_Union[_repository_pb2.ResultKey, _Mapping]] = ..., anomaly_detections: _Optional[ _Iterable[_Union[AnomalyDetection, _Mapping]] ] = ..., diff --git a/tsumugi-python/tsumugi/repository.py b/tsumugi-python/tsumugi/repository.py new file mode 100644 index 0000000..df1ca64 --- /dev/null +++ b/tsumugi-python/tsumugi/repository.py @@ -0,0 +1,40 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +from .proto import repository_pb2 as proto + + +class MetricRepository(ABC): + """Abstract class for all metric repository in tsumugi.""" + + @abstractmethod + def _to_proto(self) -> proto.Repository: ... + + +@dataclass +class FileSystemRepository(MetricRepository): + """Represents a FileSystem metric Repository in tsumugi.""" + + path: str + + def _to_proto(self) -> proto.Repository: + return proto.Repository( + file_system=proto.FileSystemRepository( + path=self.path, + ) + ) + + +@dataclass +class SparkTableRepository(MetricRepository): + """Represents a spark table metric repository in tsumugi.""" + + table_name: str + + def _to_proto(self) -> proto.Repository: + return proto.Repository( + spark_table=proto.SparkTableRepository( + table_name=self.table_name, + ) + ) diff --git a/tsumugi-python/tsumugi/verification.py b/tsumugi-python/tsumugi/verification.py index b74582b..ab9d37b 100644 --- a/tsumugi-python/tsumugi/verification.py +++ b/tsumugi-python/tsumugi/verification.py @@ -13,6 +13,9 @@ from tsumugi.analyzers import ( AbstractAnalyzer, ) + +from tsumugi.repository import MetricRepository + from tsumugi.utils import ( CHECK_RESULTS_SUB_DF, CHECKS_SUB_DF, @@ -24,6 +27,7 @@ ) from .proto import suite_pb2 as suite +from .proto import repository_pb2 as repository @dataclass @@ -168,8 +172,7 @@ def __init__(self, df: DataFrame | ConnectDataFrame) -> None: self._data = df self._checks: list[suite.Check] = list() self._required_analyzers: list[AbstractAnalyzer] = list() - self._path: str | None = None - self._table_name: str | None = None + self._repository: MetricRepository | None = None self._dataset_date: int | None = None self._dataset_tags: dict[str, str] = dict() self._anomaly_detectons: list[suite.AnomalyDetection] = list() @@ -256,24 +259,14 @@ def _build(self) -> suite.VerificationSuite: compute_row_level_results=self._compute_row_results, ) - if self._path: - pb_suite.file_system_repository = ( - suite.VerificationSuite.FileSystemRepository(path=self._path) - ) - - if self._table_name: - pb_suite.spark_table_repository = ( - suite.VerificationSuite.SparkTableRepository( - table_name=self._table_name - ) - ) - - if self._path or self._table_name: - pb_suite.result_key = suite.VerificationSuite.ResultKey( + if self._repository: + pb_suite.repository = self._repository._to_proto() + pb_suite.result_key = repository.ResultKey( dataset_date=self._dataset_date, tags=self._dataset_tags ) + for ad in self._anomaly_detectons: - _ad = pb_suite.anomaly_detections.append(ad) + pb_suite.anomaly_detections.append(ad) return pb_suite diff --git a/tsumugi-server/src/main/protobuf/repository.proto b/tsumugi-server/src/main/protobuf/repository.proto new file mode 100644 index 0000000..268443d --- /dev/null +++ b/tsumugi-server/src/main/protobuf/repository.proto @@ -0,0 +1,29 @@ +syntax = 'proto3'; + +package com.ssinchenko.tsumugi.proto; + +option java_multiple_files = true; +option java_package = "com.ssinchenko.tsumugi.proto"; +option java_generate_equals_and_hash = true; +option optimize_for=SPEED; +option go_package = "tsumugi/proto"; + +message Repository { + oneof repository { + FileSystemRepository file_system = 1; + SparkTableRepository spark_table = 2; + } +} + +message FileSystemRepository { + string path = 1; +} + +message SparkTableRepository { + string table_name = 1; +} + +message ResultKey { + int64 dataset_date = 1; + map tags = 2; +} diff --git a/tsumugi-server/src/main/protobuf/suite.proto b/tsumugi-server/src/main/protobuf/suite.proto index 2c1d236..c9145fc 100644 --- a/tsumugi-server/src/main/protobuf/suite.proto +++ b/tsumugi-server/src/main/protobuf/suite.proto @@ -10,6 +10,7 @@ option go_package = "tsumugi/proto"; import "analyzers.proto"; import "strategies.proto"; +import "repository.proto"; // TODO: For an unknown reason proto-dependencies from spark are not working // import "spark/connect/base.proto"; // Spark base.proto @@ -78,25 +79,9 @@ message VerificationSuite { repeated Analyzer required_analyzers = 3; // Analysis in Deequ that is just a wrapper of sequence of Analyzers // Anomaly detection part - oneof repository { - FileSystemRepository file_system_repository = 4; - SparkTableRepository spark_table_repository = 5; - } - optional ResultKey result_key = 6; - repeated AnomalyDetection anomaly_detections = 7; - - message FileSystemRepository { - string path = 1; - } - - message SparkTableRepository { - string table_name = 1; - } - - message ResultKey { - int64 dataset_date = 1; - map tags = 2; - } + optional Repository repository = 4; + optional ResultKey result_key = 5; + repeated AnomalyDetection anomaly_detections = 6; - bool compute_row_level_results = 8; + bool compute_row_level_results = 7; } \ No newline at end of file diff --git a/tsumugi-server/src/main/scala/com/ssinchenko/tsumugi/DeequSuiteBuilder.scala b/tsumugi-server/src/main/scala/com/ssinchenko/tsumugi/DeequSuiteBuilder.scala index 02d5a3b..511b4ed 100644 --- a/tsumugi-server/src/main/scala/com/ssinchenko/tsumugi/DeequSuiteBuilder.scala +++ b/tsumugi-server/src/main/scala/com/ssinchenko/tsumugi/DeequSuiteBuilder.scala @@ -4,18 +4,16 @@ import com.amazon.deequ.analyzers._ import com.amazon.deequ.anomalydetection._ import com.amazon.deequ.checks.{Check, CheckLevel} import com.amazon.deequ.constraints.Constraint -import com.amazon.deequ.repository.ResultKey +import com.amazon.deequ.repository.{MetricsRepository, ResultKey} import com.amazon.deequ.repository.fs.FileSystemMetricsRepository import com.amazon.deequ.repository.sparktable.SparkTableMetricsRepository -import com.amazon.deequ.{ - AnomalyCheckConfig, - VerificationRunBuilder, - VerificationRunBuilderWithRepository, - VerificationSuite -} -import org.apache.spark.sql.DataFrame +import com.amazon.deequ.{AnomalyCheckConfig, VerificationRunBuilder, VerificationRunBuilderWithRepository, VerificationSuite} +import com.ssinchenko.tsumugi.proto.AnomalyDetection +import org.apache.spark.sql.{DataFrame, SparkSession} +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try} object DeequSuiteBuilder { private[ssinchenko] def parseCheckLevel(checkLevel: proto.CheckLevel): CheckLevel.Value = { @@ -430,44 +428,50 @@ object DeequSuiteBuilder { case _ => throw new RuntimeException(s"Unsupported Strategy ${strategy.getStrategyCase.name}") } } + private[ssinchenko] def parseMetricRepository(spark: SparkSession, verificationSuite: proto.VerificationSuite): Try[MetricsRepository] = { + if (!verificationSuite.hasRepository) { + Failure(new RuntimeException("For anomaly detection one of FS repository or Table repository should be provided!")) + } else { + verificationSuite.getRepository.getRepositoryCase match { + case proto.Repository.RepositoryCase.FILE_SYSTEM => + Success(FileSystemMetricsRepository(spark, verificationSuite.getRepository.getFileSystem.getPath)) + case proto.Repository.RepositoryCase.SPARK_TABLE => + Success(new SparkTableMetricsRepository(spark, verificationSuite.getRepository.getSparkTable.getTableName)) + case _ => Failure(new RuntimeException("For anomaly detection one of FS repository or Table repository should be provided!")) + } + } + } - def protoToVerificationSuite(data: DataFrame, verificationSuite: proto.VerificationSuite): VerificationRunBuilder = { + def protoToVerificationSuite(data: DataFrame, verificationSuite: proto.VerificationSuite): Try[VerificationRunBuilder] = { val spark = data.sparkSession - var builder = new VerificationSuite().onData(data) - - builder = builder.addChecks( - verificationSuite.getChecksList.asScala.map(parseCheck).toSeq - ) - - builder = builder.addRequiredAnalyzers( - verificationSuite.getRequiredAnalyzersList.asScala.map(parseAnalyzer).toSeq - ) + val builder = new VerificationSuite() + .onData(data) + .addChecks(verificationSuite.getChecksList.asScala.map(parseCheck)) + .addRequiredAnalyzers(verificationSuite.getRequiredAnalyzersList.asScala.map(parseAnalyzer)) // Anomaly detection branch - if (verificationSuite.hasResultKey) { - if (!(verificationSuite.hasFileSystemRepository || verificationSuite.hasSparkTableRepository)) { - throw new RuntimeException("For anomaly detection one of FS repository or Table repository should be provided!") - } - var adBuilder: VerificationRunBuilderWithRepository = null - if (verificationSuite.hasFileSystemRepository) { - adBuilder = - builder.useRepository(FileSystemMetricsRepository(spark, verificationSuite.getFileSystemRepository.getPath)) - } else { - adBuilder = builder.useRepository( - new SparkTableMetricsRepository(spark, verificationSuite.getSparkTableRepository.getTableName) - ) - } + if (verificationSuite.hasResultKey) { + for { + repository <- parseMetricRepository(spark, verificationSuite) + builderWithRepo <- verificationSuiteToRunBuilderWithRepo(builder, repository, verificationSuite) + } yield builderWithRepo + } else { + Success(builder) + } + } - adBuilder = adBuilder.saveOrAppendResult( + private def verificationSuiteToRunBuilderWithRepo(builder: VerificationRunBuilder, repository: MetricsRepository, verificationSuite: proto.VerificationSuite): Try[VerificationRunBuilderWithRepository] = Try { + val adBuilder = builder + .useRepository(repository) + .saveOrAppendResult( ResultKey( verificationSuite.getResultKey.getDatasetDate, verificationSuite.getResultKey.getTagsMap.asScala.toMap ) ) - - for (i <- 0 to verificationSuite.getAnomalyDetectionsCount) { - val ad = verificationSuite.getAnomalyDetections(i) + Range(0, verificationSuite.getAnomalyDetectionsCount).foldLeft(adBuilder)((builder, adIdx) => { + val ad = verificationSuite.getAnomalyDetections(adIdx) val analyzer = parseAnalyzer(ad.getAnalyzer) val strategy = parseAnomalyDetectionStrategy(ad.getAnomalyDetectionStrategy) val options = @@ -477,45 +481,41 @@ object DeequSuiteBuilder { parseCheckLevel(ad.getConfig.getLevel), ad.getConfig.getDescription, ad.getConfig.getWithTagValuesMap.asScala.toMap, - if (ad.getConfig.hasAfterDate) Some(ad.getConfig.getAfterDate) else Option.empty, - if (ad.getConfig.hasBeforeDate) Some(ad.getConfig.getBeforeDate) else Option.empty + if (ad.getConfig.hasAfterDate) Some(ad.getConfig.getAfterDate) else None, + if (ad.getConfig.hasBeforeDate) Some(ad.getConfig.getBeforeDate) else None ) ) - else Option.empty + else None // TODO: How to filter only Analyzer[S, Metric[Double]] instead of this ugly code? Good first issue analyzer match { - case al: ApproxCountDistinct => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: ApproxQuantile => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: ColumnCount => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Completeness => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Compliance => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Correlation => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: CountDistinct => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Distinctness => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Entropy => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: ExactQuantile => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: MaxLength => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Maximum => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Mean => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: MinLength => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Minimum => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: MutualInformation => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: PatternMatch => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: RatioOfSums => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Size => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: StandardDeviation => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Sum => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: UniqueValueRatio => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) - case al: Uniqueness => adBuilder = adBuilder.addAnomalyCheck(strategy, al, options) + case al: ApproxCountDistinct => builder.addAnomalyCheck(strategy, al, options) + case al: ApproxQuantile => builder.addAnomalyCheck(strategy, al, options) + case al: ColumnCount => builder.addAnomalyCheck(strategy, al, options) + case al: Completeness => builder.addAnomalyCheck(strategy, al, options) + case al: Compliance => builder.addAnomalyCheck(strategy, al, options) + case al: Correlation => builder.addAnomalyCheck(strategy, al, options) + case al: CountDistinct => builder.addAnomalyCheck(strategy, al, options) + case al: Distinctness => builder.addAnomalyCheck(strategy, al, options) + case al: Entropy => builder.addAnomalyCheck(strategy, al, options) + case al: ExactQuantile => builder.addAnomalyCheck(strategy, al, options) + case al: MaxLength => builder.addAnomalyCheck(strategy, al, options) + case al: Maximum => builder.addAnomalyCheck(strategy, al, options) + case al: Mean => builder.addAnomalyCheck(strategy, al, options) + case al: MinLength => builder.addAnomalyCheck(strategy, al, options) + case al: Minimum => builder.addAnomalyCheck(strategy, al, options) + case al: MutualInformation => builder.addAnomalyCheck(strategy, al, options) + case al: PatternMatch => builder.addAnomalyCheck(strategy, al, options) + case al: RatioOfSums => builder.addAnomalyCheck(strategy, al, options) + case al: Size => builder.addAnomalyCheck(strategy, al, options) + case al: StandardDeviation => builder.addAnomalyCheck(strategy, al, options) + case al: Sum => builder.addAnomalyCheck(strategy, al, options) + case al: UniqueValueRatio => builder.addAnomalyCheck(strategy, al, options) + case al: Uniqueness => builder.addAnomalyCheck(strategy, al, options) case _ => throw new RuntimeException( s"AD is supported only for Analyzers with Double metric! Got ${analyzer.getClass.getSimpleName}" ) } - } - adBuilder - } else { - builder - } + }) } } diff --git a/tsumugi-server/src/main/scala/org/apache/spark/sql/tsumugi/DeequConnectPlugin.scala b/tsumugi-server/src/main/scala/org/apache/spark/sql/tsumugi/DeequConnectPlugin.scala index ce9d13e..f42345c 100644 --- a/tsumugi-server/src/main/scala/org/apache/spark/sql/tsumugi/DeequConnectPlugin.scala +++ b/tsumugi-server/src/main/scala/org/apache/spark/sql/tsumugi/DeequConnectPlugin.scala @@ -27,14 +27,14 @@ class DeequConnectPlugin extends RelationPlugin { // TODO: pass returnRows here val resultDf = DeequUtils.runAndCollectResults( - verificationSuiteBuilder, + verificationSuiteBuilder.get, spark, returnRows = protoSuite.getComputeRowLevelResults, dataFrame = data ) - Option(resultDf.logicalPlan) + Some(resultDf.logicalPlan) } else { - Option.empty + None } } } diff --git a/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequSuiteBuilderTest.scala b/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequSuiteBuilderTest.scala index 992ee7c..fef36f7 100644 --- a/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequSuiteBuilderTest.scala +++ b/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequSuiteBuilderTest.scala @@ -280,11 +280,14 @@ class DeequSuiteBuilderTest extends ConfTest { .setDoubleExpectation(1.0) ) ) - protoSuiteBuilder.setFileSystemRepository( - proto.VerificationSuite.FileSystemRepository - .newBuilder() - .setPath("test-file.json") - ) + val metricRepo = proto.Repository + .newBuilder() + .setFileSystem( + proto.FileSystemRepository + .newBuilder() + .setPath("test-file.json") + ) + protoSuiteBuilder.setRepository(metricRepo) protoSuiteBuilder.addAnomalyDetections( proto.AnomalyDetection .newBuilder() @@ -312,7 +315,7 @@ class DeequSuiteBuilderTest extends ConfTest { ) ) - val deequSuite = DeequSuiteBuilder.protoToVerificationSuite(data, protoSuiteBuilder.build()) + val deequSuite = DeequSuiteBuilder.protoToVerificationSuite(data, protoSuiteBuilder.build()).get val checkResults = deequSuite.run().checkResults assert(checkResults.forall(_._2.status == CheckStatus.Success)) } diff --git a/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequUtilsTest.scala b/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequUtilsTest.scala index 6ff85fd..ea5e6e2 100644 --- a/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequUtilsTest.scala +++ b/tsumugi-server/src/test/scala/com/ssinchenko/tsumugi/DeequUtilsTest.scala @@ -27,11 +27,15 @@ class DeequUtilsTest extends ConfTest { .setDoubleExpectation(1.0) ) ) - protoSuiteBuilder.setFileSystemRepository( - proto.VerificationSuite.FileSystemRepository - .newBuilder() - .setPath("test-file.json") - ) + + val metricRepo = proto.Repository + .newBuilder() + .setFileSystem( + proto.FileSystemRepository + .newBuilder() + .setPath("test-file.json") + ) + protoSuiteBuilder.setRepository(metricRepo) protoSuiteBuilder.addAnomalyDetections( proto.AnomalyDetection .newBuilder()