diff --git a/my-dagster-project/my_dagster_project/__init__.py b/my-dagster-project/my_dagster_project/__init__.py index 54a0446..45d0da0 100644 --- a/my-dagster-project/my_dagster_project/__init__.py +++ b/my-dagster-project/my_dagster_project/__init__.py @@ -1 +1,20 @@ -from .repository import my_dagster_project +from dagster import ( + load_assets_from_package_module, + define_asset_job, + ScheduleDefinition, +) +from my_dagster_project import assets +from my_dagster_project.dagster_quickstart import quickstart +from github import Github +import os + +quickstart( + assets=load_assets_from_package_module(assets), + schedules=[ + ScheduleDefinition( + job=define_asset_job(name="daily_refresh", selection="*"), + cron_schedule="@daily", + ) + ], + resources={"github_api": Github(os.getenv("GITHUB_ACCESS_TOKEN"))}, +) diff --git a/my-dagster-project/my_dagster_project/dagster_quickstart.py b/my-dagster-project/my_dagster_project/dagster_quickstart.py new file mode 100644 index 0000000..f678b1a --- /dev/null +++ b/my-dagster-project/my_dagster_project/dagster_quickstart.py @@ -0,0 +1,88 @@ +from typing import List, Union, Any, Mapping +from dagster import ( + AssetsDefinition, + SourceAsset, + ScheduleDefinition, + SensorDefinition, + ResourceDefinition, + repository, + with_resources, +) +import inspect + +# This is a bandaid library to paper over core API usability issues until we sort +# them out. The idea is that this will be a library separate from Dagster but +# referenced in our tutorials. It will be used for simple examples where 80% +# solutions suffice. If you need more flexibility beyond this, you can +# learn more about the internals of Dagster. + +# This is inspired by cherrypy.quickstart(): +# https://cherrypydocrework.readthedocs.io/basics.html#hosting-one-or-more-applications + +# There are three goals of this package: +# 1. Hide the word "repository" from code snippets. It's still present in the UI +# but will be less front-and-center when someone is learning Dagster +# 2. Hide the complexities of the resources system. You can just provide plain +# old values to the `resources` kwarg. This lets you write your business logic +# with resources in mind without having to learn how the whole resource and +# config system works to wire them up. This is better than hardcoding +# the resources directly in your business logic, since you won't have to +# rewrite all your business logic to support resources later, just the code +# that injects resources here. +# 3. Make secrets easy and obvious. Users of this library will be discouraged +# from using Dagster config. Instead, we will instruct users to instantiate +# resources directly and read secrets using os.getenv(). When +# https://github.com/dagster-io/dagster/pull/10491 lands users will be able +# to add secrets to a `.env.local` file during development, and Cloud users +# will get a nice UI to add secrets for prod (just like Vercel) + + +_quickstart_called = False + + +def quickstart( + assets: List[Union[AssetsDefinition, SourceAsset]] = [], + schedules: List[ScheduleDefinition] = [], + sensors: List[SensorDefinition] = [], + resources: Mapping[str, Any] = {}, + resource_config: Mapping[str, Any] = {}, +): + global _quickstart_called + if _quickstart_called: + raise RuntimeError("you may only call quickstart() once in your app") + _quickstart_called = True + + @repository + def quickstart_repo(): + resource_defs = {} + for resource_key, resource_def in resources.items(): + if not isinstance(resource_def, ResourceDefinition): + resource_def = ResourceDefinition.hardcoded_resource(resource_def) + if resource_key in resource_config: + resource_def = resource_def.configured( + bundle.resource_config[resource_key] + ) + resource_defs[resource_key] = resource_def + + if len(resource_defs) > 0: + asset_defs = with_resources(assets, resource_defs) + + return [ + *asset_defs, + *sensors, + *schedules, + ] + + frame = inspect.stack()[1].frame + if not frame.f_code.co_filename.endswith("__init__.py"): + raise RuntimeError("quickstart() must be called from __init__.py") + + f_locals = frame.f_locals + + if "quickstart_repo" in f_locals: + raise RuntimeError( + "you cannot define a symbol called quickstart_repo in your module" + ) + f_locals["quickstart_repo"] = quickstart_repo + + return quickstart_repo diff --git a/my-dagster-project/my_dagster_project/repository.py b/my-dagster-project/my_dagster_project/repository.py deleted file mode 100644 index f4c4f4c..0000000 --- a/my-dagster-project/my_dagster_project/repository.py +++ /dev/null @@ -1,28 +0,0 @@ -from dagster import ( - load_assets_from_package_module, - repository, - with_resources, - define_asset_job, - ScheduleDefinition, -) -from my_dagster_project import assets -from my_dagster_project.resources import github_api - -daily_job = define_asset_job(name="daily_refresh", selection="*") -daily_schedule = ScheduleDefinition( - job=daily_job, - cron_schedule="@daily", -) - - -@repository -def my_dagster_project(): - return [ - daily_job, - daily_schedule, - with_resources( - load_assets_from_package_module(assets), - {"github_api": github_api.configured( - {"access_token": {"env": "GITHUB_ACCESS_TOKEN"}})}, - ), - ] diff --git a/my-dagster-project/my_dagster_project/resources.py b/my-dagster-project/my_dagster_project/resources.py deleted file mode 100644 index c885bea..0000000 --- a/my-dagster-project/my_dagster_project/resources.py +++ /dev/null @@ -1,7 +0,0 @@ -from dagster import StringSource, resource -from github import Github - - -@resource(config_schema={"access_token": StringSource}) -def github_api(init_context): - return Github(init_context.resource_config["access_token"])