Skip to content

Commit

Permalink
Black format
Browse files Browse the repository at this point in the history
  • Loading branch information
edurdevic committed Jan 9, 2024
1 parent 8ff270d commit 511f7c9
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 52 deletions.
53 changes: 27 additions & 26 deletions examples/deep_clone_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
# MAGIC %md
# MAGIC # Deep Clone a Schema
# MAGIC
# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table.
# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table.
# MAGIC
# MAGIC
# MAGIC Deep cloning is applied on a per-table basis, requiring a separate invocation for each table within your schema. In scenarios where automation is desirable, such as when dealing with shared schemas through Delta sharing, replicating the entire schema can be achieved using DiscoverX. This approach eliminates the need to manually inspect and modify your code each time a new table is added to the schema by the provider.
# MAGIC
# MAGIC This notebook serves as an example of utilizing DiscoverX to automate the replication of a schema using Delta Deep Clone.
# MAGIC
# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side.
# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side.
# MAGIC

# COMMAND ----------
Expand All @@ -19,7 +19,7 @@

# COMMAND ----------

dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone")
dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone")
dbutils.widgets.text("2.destination_catalog", "_discoverx_deep_clone_replica")

source_catalog = dbutils.widgets.get("1.source_catalog")
Expand Down Expand Up @@ -52,31 +52,33 @@

# COMMAND ----------


def clone_tables(table_info):
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}")
try:
spark.sql(
f"""CREATE OR REPLACE TABLE

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}")
try:
spark.sql(
f"""CREATE OR REPLACE TABLE
{destination_catalog}.{table_info.schema}.{table_info.table}
CLONE {table_info.catalog}.{table_info.schema}.{table_info.table}
"""
)
result={
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success":True,
"info": None,
}
# Cloning Views is not supported
except Exception as error:
result={
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success":False,
"info": error,
}
return result
)
result = {
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success": True,
"info": None,
}
# Cloning Views is not supported
except Exception as error:
result = {
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success": False,
"info": error,
}
return result


# COMMAND ----------

Expand All @@ -86,5 +88,4 @@ def clone_tables(table_info):

# COMMAND ----------

res = dx.from_tables(f"{source_catalog}.*.*")\
.map(clone_tables)
res = dx.from_tables(f"{source_catalog}.*.*").map(clone_tables)
10 changes: 3 additions & 7 deletions examples/scan_with_user_specified_data_source_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

# COMMAND ----------

# MAGIC %md
# MAGIC %md
# MAGIC ### DiscoverX will scan all delta tables by default

# COMMAND ----------
Expand All @@ -52,15 +52,11 @@

# COMMAND ----------

# MAGIC %md
# MAGIC %md
# MAGIC ### User can specify data source formats as follows

# COMMAND ----------

(dx.from_tables(from_table_statement)
.with_data_source_formats(["DELTA","JSON"])
.scan())
(dx.from_tables(from_table_statement).with_data_source_formats(["DELTA", "JSON"]).scan())

# COMMAND ----------


42 changes: 23 additions & 19 deletions examples/update_owner_of_data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
dbutils.widgets.text("catalogs", "*", "Catalogs")
dbutils.widgets.text("schemas", "*", "Schemas")
dbutils.widgets.text("tables", "*", "Tables")
dbutils.widgets.text("owner","[email protected]","owner")
dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES","NO"])
dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES","NO"])
dbutils.widgets.text("owner", "[email protected]", "owner")
dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES", "NO"])
dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES", "NO"])

# COMMAND ----------

Expand Down Expand Up @@ -54,23 +54,27 @@

# COMMAND ----------


def update_owner(table_info):
catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`"""
schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`"""
table_owner_alter_sql = f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`"""
try:
if(if_update_catalog_owner == 'YES'):
print(f"Executing {catalog_owner_alter_sql}")
spark.sql(catalog_owner_alter_sql)

if(if_update_schema_owner == 'YES'):
print(f"Executing {schema_owner_alter_sql}")
spark.sql(schema_owner_alter_sql)

print(f"Executing {table_owner_alter_sql}")
spark.sql(table_owner_alter_sql)
except Exception as exception:
print(f" Exception occurred while updating owner: {exception}")
catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`"""
schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`"""
table_owner_alter_sql = (
f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`"""
)
try:
if if_update_catalog_owner == "YES":
print(f"Executing {catalog_owner_alter_sql}")
spark.sql(catalog_owner_alter_sql)

if if_update_schema_owner == "YES":
print(f"Executing {schema_owner_alter_sql}")
spark.sql(schema_owner_alter_sql)

print(f"Executing {table_owner_alter_sql}")
spark.sql(table_owner_alter_sql)
except Exception as exception:
print(f" Exception occurred while updating owner: {exception}")


# COMMAND ----------

Expand Down

0 comments on commit 511f7c9

Please sign in to comment.