diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 551b316..b91e967 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -25,3 +25,11 @@ DHuS watcher ------------ .. automodule:: pytroll_watchers.dhus_watcher :members: + +Adding a new backend +-------------------- +The base concept of the pytroll watchers library is very simply the publishing of file events. In order to add a new backend, two things need to be done: + +1. Implement a generator that iterates over filesystem events and generates a pair of (file item, file metadata). The file item is the the (U)Path object to the file (check out universal_pathlib). The file metadata is a dictionary that contains the metadata of the file. It can be formatted in the same fashion as the message config. If it does not contain a `data` key, it is expected to be the contents of the data key itself (ie `subject` and `atype` will not be used as message parameters, but rather as `data` items). + +2. Add an entry point to the new backend module. This module is expected to implement a `file_publisher` that just takes the config dictionary as argument. diff --git a/docs/source/cli.rst b/docs/source/cli.rst index 8aaa8d3..454175b 100644 --- a/docs/source/cli.rst +++ b/docs/source/cli.rst @@ -40,3 +40,19 @@ or for the case of a directory:: unpack: format: directory include_dir_in_uid: true + +Configuration +_____________ + +The configuration of pytroll watchers’ CLI is done through a yaml configuration +file representing a dictionary (key-value pairs) of options. Similarly different functions in the library use that same dictionary (a python object in that case) as input parameter. +This section should apply to both dictionaries, unless otherwise specified. + +The configuration is expected to contain the following four items: + +- `fs_config` for the configuration of the filesystem to watch. The parameters to provide here differ depending on what backend is being used, so the curious reader is recommended to look at the description for the corresponding backend. +- `publiser_config` for the configuration of the publishing part. These parameters are passed directly to posttroll’s `create_publisher_from_dict`, so here again, please refer to the corresponding documentation. +- `message_config` for the configuration of the message to send when a new filesystem object is to be published. Most parameters here are passed to posttroll’s `Message` constructor (check the documentation too), with one exception: `aliases` is removed from the items passed to the constructor and is used instead for replacing information from the file/object metadata. Say for example that the parsed file object from a local file system contains the `platform_name` field, and is set to a satellite short name `npp`. Providing an alias of the form `{platform_name: {npp: "Suomi-NPP"}}` will effectively set the platform name to "Suomi-NPP". Each value of the aliases dictionary is itself a dictionary, so multiple key-value pairs can be provided here to support multiple cases. +- `data_config` (optional) for the configuration of the "processing" to be done to the incoming filesystem events. At the moment, there are two possibilities: `fetch` for fetching remote data locally before advertising the event, and `unpack` to send the advertising the components of the event (e.g. in the case the user wants to advertise the contents of a receive zip file event). More information on how to format these options is shown in the examples above. + +Finally, the last element for the case of the configuration file for the CLI is the top-level `backend` parameter which tells pytroll-watcher which backend to use. The list of available backends is available as entry points defined in the pyproject.toml file. diff --git a/src/pytroll_watchers/publisher.py b/src/pytroll_watchers/publisher.py index bab852a..770b707 100644 --- a/src/pytroll_watchers/publisher.py +++ b/src/pytroll_watchers/publisher.py @@ -57,30 +57,48 @@ def file_publisher_from_generator(generator, config): - filesystem: `{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], "profile": "my_profile"}` - path: `/eodata/Sentinel-3/OLCI/OL_1_EFR___/2024/04/15/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiance.nc` """ # noqa - publisher_config = config["publisher_config"] + publisher_config = config.pop("publisher_config") publisher = create_publisher_from_dict_config(publisher_config) publisher.start() - message_config = config["message_config"] - unpack = message_config.pop("unpack", None) - if unpack is not None: - warn("The `unpack` option should be passed inside the `data_config` section", DeprecationWarning, stacklevel=1) - - data_config = config.get("data_config", dict()) with closing(publisher): for file_item, file_metadata in generator: - amended_message_config = deepcopy(message_config) - amended_message_config.setdefault("data", {}) - amended_message_config["data"].update(prepare_data(file_item, data_config)) - aliases = amended_message_config.pop("aliases", {}) - apply_aliases(aliases, file_metadata) - amended_message_config["data"].update(file_metadata) - msg = Message(**amended_message_config) + msg = _create_message(file_item, file_metadata, config) logger.info(f"Sending {str(msg)}") publisher.send(str(msg)) +def _create_message(file_item, file_metadata, config): + config = deepcopy(config) + message_config = config.pop("message_config", dict()) + unpack = message_config.pop("unpack", None) + if unpack is not None: + warn("The `unpack` option should be passed inside the `data_config` section", DeprecationWarning, stacklevel=1) + + data_config = config.pop("data_config", dict()) + + if file_metadata and ("data" in file_metadata): + file_mda = deepcopy(file_metadata) + message_data = file_mda.pop("data") + message_parameters = file_mda + else: + message_data = file_metadata or dict() + message_parameters = dict() + message_parameters.update(message_config) + message_parameters.setdefault("data", {}) + + file_location_info = prepare_data(file_item, data_config) + message_parameters["data"].update(file_location_info) + + aliases = message_parameters.pop("aliases", {}) + apply_aliases(aliases, message_data) + + message_parameters["data"].update(message_data) + + return Message(**message_parameters) + + def prepare_data(file_item, data_config): """Prepare the data for further processing.""" fetch = data_config.pop("fetch", {}) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 85c9b90..bc6910e 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -93,3 +93,36 @@ def test_unpacking_directory(tmp_path): assert "my_dir/file2" in messages[0] msg = Message.decode(messages[0]) assert msg.data["dataset"][0]["uid"].startswith("my_dir") + + +def test_publish_paths_with_fetching(tmp_path): + """Test publishing paths.""" + basename = "foo+bar,baz_.txt" + filename = os.fspath(tmp_path / basename) + with open(filename, "w"): + pass + + destination = tmp_path / "downloaded" + destination.mkdir() + + publisher_settings = dict(nameservers=False, port=1979) + subject = "/hey/jude" + atype = "atomic" + message_settings = dict(atype="file", data=dict(sensor="viirs")) + data_config = dict(fetch=dict(destination=destination)) + + items = [(filename, dict(subject=subject, atype=atype, + data=dict(mime="txt")))] + + with patched_publisher() as messages: + config = dict(publisher_config=publisher_settings, + message_config=message_settings, + data_config=data_config) + file_publisher_from_generator(items, config) + + assert "uri" not in message_settings["data"] + assert len(messages) == 1 + message = Message(rawstr=messages[0]) + assert message.subject == subject + assert message.type == "file" + assert message.data["sensor"] == "viirs"