Skip to content

Commit

Permalink
Only extract from completed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ynaim94-harrys committed Sep 23, 2022
1 parent 7701b37 commit 463d7df
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
15 changes: 6 additions & 9 deletions tap_gladly/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@
SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")


class ExportJobsStream(gladlyStream):
class ExportCompletedJobsStream(gladlyStream):
"""List export jobs stream."""

name = "jobs"
path = "/export/jobs"
path = "/export/jobs?status=COMPLETED"
primary_keys = ["id"]
replication_key = None
# Optionally, you may also use `schema_filepath` in place of `schema`:
schema_filepath = SCHEMAS_DIR / "export_jobs.json"

def post_process(self, row, context):
"""As needed, append or transform raw data to match expected structure."""
if "start_date" not in self.config:
return row

if pendulum.parse(row["parameters"]["startAt"]) >= pendulum.parse(
"""Filter jobs that finished before start_date."""
if pendulum.parse(row["parameters"]["endAt"]) >= pendulum.parse(
self.config["start_date"]
):
return row
Expand All @@ -47,7 +44,7 @@ class ExportFileTopicsStream(gladlyStream):
path = "/export/jobs/{job_id}/files/topics.jsonl"
primary_keys = ["id"]
replication_key = None
parent_stream_type = ExportJobsStream
parent_stream_type = ExportCompletedJobsStream
ignore_parent_replication_key = True
schema_filepath = SCHEMAS_DIR / "export_topics.json"

Expand All @@ -64,7 +61,7 @@ class ExportFileConversationItemsStream(gladlyStream, abc.ABC):
path = "/export/jobs/{job_id}/files/conversation_items.jsonl"
primary_keys = ["id"]
replication_key = None
parent_stream_type = ExportJobsStream
parent_stream_type = ExportCompletedJobsStream
ignore_parent_replication_key = True

@property
Expand Down
4 changes: 2 additions & 2 deletions tap_gladly/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

# TODO: Import your custom stream types here:
from tap_gladly.streams import (
ExportCompletedJobsStream,
ExportFileConversationItemsChatMessage,
ExportFileConversationItemsConversationNote,
ExportFileConversationItemsConversationStatusChange,
Expand All @@ -15,11 +16,10 @@
ExportFileConversationItemsTopicChange,
ExportFileConversationItemsVoiceMail,
ExportFileTopicsStream,
ExportJobsStream,
)

STREAM_TYPES = [
ExportJobsStream,
ExportCompletedJobsStream,
ExportFileConversationItemsChatMessage,
ExportFileConversationItemsConversationNote,
ExportFileConversationItemsTopicChange,
Expand Down
11 changes: 7 additions & 4 deletions tap_gladly/tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

import pendulum

from tap_gladly.streams import ExportFileConversationItemsChatMessage, ExportJobsStream
from tap_gladly.streams import (
ExportCompletedJobsStream,
ExportFileConversationItemsChatMessage,
)
from tap_gladly.tap import Tapgladly

SAMPLE_CONFIG = {
Expand All @@ -23,16 +26,16 @@ def test_started_at():
),
parse_env_config=False,
)
export_jobs_stream = ExportJobsStream(tap_gladly)
export_jobs_stream = ExportCompletedJobsStream(tap_gladly)
before_row = {
"record": "data",
"parameters": {
"startAt": (pendulum.now() - datetime.timedelta(days=3)).isoformat()
"endAt": (pendulum.now() - datetime.timedelta(days=3)).isoformat()
},
}
after_row = {
"record": "data",
"parameters": {"startAt": pendulum.now().isoformat()},
"parameters": {"endAt": pendulum.now().isoformat()},
}
assert not export_jobs_stream.post_process(before_row, None)
assert export_jobs_stream.post_process(after_row, None)
Expand Down

0 comments on commit 463d7df

Please sign in to comment.