From 511f7c998f4addb03132be9e13467a9bf49dd2bd Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 9 Jan 2024 08:53:03 +0400 Subject: [PATCH] Black format --- examples/deep_clone_schema.py | 53 ++++++++++--------- ...with_user_specified_data_source_formats.py | 10 ++-- examples/update_owner_of_data_objects.py | 42 ++++++++------- 3 files changed, 53 insertions(+), 52 deletions(-) diff --git a/examples/deep_clone_schema.py b/examples/deep_clone_schema.py index 2fa8ed8..9bacc55 100644 --- a/examples/deep_clone_schema.py +++ b/examples/deep_clone_schema.py @@ -2,14 +2,14 @@ # MAGIC %md # MAGIC # Deep Clone a Schema # MAGIC -# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table. +# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table. # MAGIC # MAGIC # MAGIC Deep cloning is applied on a per-table basis, requiring a separate invocation for each table within your schema. In scenarios where automation is desirable, such as when dealing with shared schemas through Delta sharing, replicating the entire schema can be achieved using DiscoverX. This approach eliminates the need to manually inspect and modify your code each time a new table is added to the schema by the provider. # MAGIC # MAGIC This notebook serves as an example of utilizing DiscoverX to automate the replication of a schema using Delta Deep Clone. # MAGIC -# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side. +# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side. # MAGIC # COMMAND ---------- @@ -19,7 +19,7 @@ # COMMAND ---------- -dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone") +dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone") dbutils.widgets.text("2.destination_catalog", "_discoverx_deep_clone_replica") source_catalog = dbutils.widgets.get("1.source_catalog") @@ -52,31 +52,33 @@ # COMMAND ---------- + def clone_tables(table_info): - - spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}") - try: - spark.sql( - f"""CREATE OR REPLACE TABLE + + spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}") + try: + spark.sql( + f"""CREATE OR REPLACE TABLE {destination_catalog}.{table_info.schema}.{table_info.table} CLONE {table_info.catalog}.{table_info.schema}.{table_info.table} """ - ) - result={ - "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", - "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", - "success":True, - "info": None, - } - # Cloning Views is not supported - except Exception as error: - result={ - "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", - "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", - "success":False, - "info": error, - } - return result + ) + result = { + "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", + "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", + "success": True, + "info": None, + } + # Cloning Views is not supported + except Exception as error: + result = { + "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", + "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", + "success": False, + "info": error, + } + return result + # COMMAND ---------- @@ -86,5 +88,4 @@ def clone_tables(table_info): # COMMAND ---------- -res = dx.from_tables(f"{source_catalog}.*.*")\ - .map(clone_tables) +res = dx.from_tables(f"{source_catalog}.*.*").map(clone_tables) diff --git a/examples/scan_with_user_specified_data_source_formats.py b/examples/scan_with_user_specified_data_source_formats.py index f94f7f7..34c84d1 100644 --- a/examples/scan_with_user_specified_data_source_formats.py +++ b/examples/scan_with_user_specified_data_source_formats.py @@ -43,7 +43,7 @@ # COMMAND ---------- -# MAGIC %md +# MAGIC %md # MAGIC ### DiscoverX will scan all delta tables by default # COMMAND ---------- @@ -52,15 +52,11 @@ # COMMAND ---------- -# MAGIC %md +# MAGIC %md # MAGIC ### User can specify data source formats as follows # COMMAND ---------- -(dx.from_tables(from_table_statement) -.with_data_source_formats(["DELTA","JSON"]) -.scan()) +(dx.from_tables(from_table_statement).with_data_source_formats(["DELTA", "JSON"]).scan()) # COMMAND ---------- - - diff --git a/examples/update_owner_of_data_objects.py b/examples/update_owner_of_data_objects.py index 4c7e1a9..dc23bf6 100644 --- a/examples/update_owner_of_data_objects.py +++ b/examples/update_owner_of_data_objects.py @@ -22,9 +22,9 @@ dbutils.widgets.text("catalogs", "*", "Catalogs") dbutils.widgets.text("schemas", "*", "Schemas") dbutils.widgets.text("tables", "*", "Tables") -dbutils.widgets.text("owner","sourav.gulati@databricks.com","owner") -dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES","NO"]) -dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES","NO"]) +dbutils.widgets.text("owner", "sourav.gulati@databricks.com", "owner") +dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES", "NO"]) +dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES", "NO"]) # COMMAND ---------- @@ -54,23 +54,27 @@ # COMMAND ---------- + def update_owner(table_info): - catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`""" - schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`""" - table_owner_alter_sql = f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`""" - try: - if(if_update_catalog_owner == 'YES'): - print(f"Executing {catalog_owner_alter_sql}") - spark.sql(catalog_owner_alter_sql) - - if(if_update_schema_owner == 'YES'): - print(f"Executing {schema_owner_alter_sql}") - spark.sql(schema_owner_alter_sql) - - print(f"Executing {table_owner_alter_sql}") - spark.sql(table_owner_alter_sql) - except Exception as exception: - print(f" Exception occurred while updating owner: {exception}") + catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`""" + schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`""" + table_owner_alter_sql = ( + f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`""" + ) + try: + if if_update_catalog_owner == "YES": + print(f"Executing {catalog_owner_alter_sql}") + spark.sql(catalog_owner_alter_sql) + + if if_update_schema_owner == "YES": + print(f"Executing {schema_owner_alter_sql}") + spark.sql(schema_owner_alter_sql) + + print(f"Executing {table_owner_alter_sql}") + spark.sql(table_owner_alter_sql) + except Exception as exception: + print(f" Exception occurred while updating owner: {exception}") + # COMMAND ----------