Skip to content

Commit

Permalink
use catalog where possible instead of the default SF schema
Browse files Browse the repository at this point in the history
  • Loading branch information
agatav committed Jul 3, 2024
1 parent 974ac76 commit 2885183
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,119 +2,68 @@
"streams": [
{
"stream": {
"name": "Account",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"name": "Lead",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": true,
"properties": {
"Country": {
"type": [
"string",
"null"
]
},
"country_iso_code__c": {
"type": [
"string",
"null"
]
},
"Email": {
"type": [
"string",
"null"
]
},
"Id": {
"type": [
"string",
"null"
]
},
"LastModifiedDate": {
"format": "date-time",
"type": [
"string",
"null"
]
},
"SystemModstamp": {
"format": "date-time",
"type": [
"string",
"null"
]
}
},
"type": "object"
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ActiveFeatureLicenseMetric",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ActivePermSetLicenseMetric",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ActiveProfileMetric",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "AppDefinition",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "Asset",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "FormulaFunctionAllowedType",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "ObjectPermissions",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LeadHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["CreatedDate"],
"source_defined_primary_key": [["Id"]]
"default_cursor_field": [
"CreatedDate"
],
"source_defined_primary_key": [
[
"Id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ def generate_streams(
for stream_name, sobject_options in stream_objects.items():
json_schema = schemas.get(stream_name, {})

if self.catalog:
for catalog_stream in self.catalog.streams:
if stream_name == catalog_stream.stream.name and catalog_stream.stream.json_schema.get("properties", {}):
json_schema['properties'] = catalog_stream.stream.json_schema.get("properties", {})

logger.warning(f"JSON schema used for the stream {stream_name}: {json_schema}")
stream_class, kwargs = self.prepare_stream(stream_name, json_schema, sobject_options, *default_args)

parent_name = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("parent_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,10 @@ def request_params(
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.primary_key} ASC"

self.logger.warning(f"Used SOQL query {query}")
return {"q": query}

def chunk_properties(self) -> Iterable[Mapping[str, Any]]:
selected_properties = self.get_json_schema().get("properties", {})

def empty_props_with_pk_if_present():
return {self.primary_key: selected_properties[self.primary_key]} if self.primary_key else {}

Expand Down Expand Up @@ -617,7 +615,6 @@ def request_params(
if where_conditions:
query += f" WHERE {' AND '.join(where_conditions)}"

self.logger.warning(f"Used SOQL query {query}")
return {"q": query}

def read_records(
Expand Down Expand Up @@ -781,7 +778,6 @@ def request_params(

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"
self.logger.warning(f"Used SOQL query {query}")

return {"q": query}

Expand Down Expand Up @@ -831,7 +827,6 @@ def request_params(
where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"

self.logger.warning(f"Used SOQL query {query}")
return {"q": query}


Expand Down

0 comments on commit 2885183

Please sign in to comment.