Skip to content

Commit

Permalink
Merge pull request #3 from Sanketika-Obsrv/main
Browse files Browse the repository at this point in the history
Obsrv Python SDK - 1.1.0-Beta
  • Loading branch information
manjudr authored Sep 3, 2024
2 parents dab7570 + 8de4a82 commit babc1b6
Show file tree
Hide file tree
Showing 33 changed files with 2,327 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
__pycache__
*.pyc

# Packages
/dist/*

# Unit test / coverage reports
.coverage
.pytest_cache

.DS_Store
.python-version
.vscode/*

/docs/site/*

.venv
/poetry.toml
Empty file added obsrv/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions obsrv/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# autoflake: skip_file
from .exception import ObsrvException
18 changes: 18 additions & 0 deletions obsrv/common/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# from obsrv.models import ErrorData
from obsrv.utils import LoggerController

logger = LoggerController(__name__)


class ObsrvException(Exception):
def __init__(self, error):
self.error = error
super().__init__(self.error.error_msg)
logger.exception(
f"exception called from {self.__class__.__name__} with error {self.error.error_code} - {self.error.error_msg}"
)


# class UnsupportedDataFormatException(ObsrvException):
# def __init__(self, data_format):
# super().__init__(ErrorData("DATA_FORMAT_ERR", f"Unsupported data format {data_format}"))
3 changes: 3 additions & 0 deletions obsrv/connector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# autoflake: skip_file
from .metrics_collector import MetricsCollector
from .registry import ConnectorContext, ConnectorInstance
3 changes: 3 additions & 0 deletions obsrv/connector/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# autoflake: skip_file
from .obsrv_dataset import ObsrvDataset
from .source import ISourceConnector, SourceConnector
84 changes: 84 additions & 0 deletions obsrv/connector/batch/obsrv_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json
import time

from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json, length, lit, struct, to_json
from pyspark.sql.types import StringType, StructField, StructType, LongType

from obsrv.utils import LoggerController

logger = LoggerController(__name__)


class ObsrvDataset:
def __init__(self, ds: DataFrame):
self.ds = ds
self.invalid_events = None
self.valid_events = None

def filter_events(self, ctx, config):
max_event_size = config.find("kafka.producer.max-request-size", 1000000)
self.ds = self.ds.withColumn("_obsrv_tmp_size", length(to_json(struct("*"))))
self.invalid_events = self.ds.filter(
self.ds._obsrv_tmp_size > max_event_size
).drop("_obsrv_tmp_size")
self.valid_events = self.ds.filter(
self.ds._obsrv_tmp_size <= max_event_size
).drop("_obsrv_tmp_size")

def append_obsrv_meta(self, ctx):
addn_meta = False

source_meta = [
StructField("connector", StringType(), True),
StructField("connectorInstance", StringType(), True),
]
if "_addn_source_meta" in self.ds.columns:
addn_meta = True
source_meta.append(StructField("_addn_source_meta", StringType(), True))
addn_meta_data = (
self.ds.select("_addn_source_meta").collect()[0][0].replace('"', "'")
)
self.ds = self.ds.drop("_addn_source_meta")

obsrv_meta_schema = StructType(
[
StructField("syncts", LongType(), True),
StructField("flags", StructType(), True),
StructField("timespans", StructType(), True),
StructField("error", StructType(), True),
StructField("source", StructType(source_meta), True),
]
)

syncts = int(time.time() * 1000)
obsrv_meta = {
"syncts": syncts,
"flags": {},
"timespans": {},
"error": {},
"source": {
"connector": ctx.connector_id,
"connectorInstance": ctx.connector_instance_id,
},
}

if addn_meta:
obsrv_meta["source"]["_addn_source_meta"] = addn_meta_data

obsrv_meta_struct = from_json(lit(json.dumps(obsrv_meta)), obsrv_meta_schema)
self.ds = self.ds.withColumn("obsrv_meta", obsrv_meta_struct)

def save_to_kafka(self, config, topic):
kafka_servers = config.find("kafka.broker-servers", "localhost:9092")
compression_type = config.find("kafka.producer.compression", "snappy")

self.valid_events.selectExpr("to_json(struct(*)) AS value").write.format(
"kafka"
).option("kafka.bootstrap.servers", kafka_servers).option(
"kafka.compression.type", compression_type
).option(
"topic", topic
).save()

# TODO: Handle invalid events - send to dead letter queue
Loading

0 comments on commit babc1b6

Please sign in to comment.