Skip to content

Commit

Permalink
Merge pull request #50 from mraspaud/expand-file-mda
Browse files Browse the repository at this point in the history
Expand file metadata scope
  • Loading branch information
mraspaud authored Jan 28, 2025
2 parents b18fa13 + 61dd838 commit 47bf61c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 14 deletions.
8 changes: 8 additions & 0 deletions docs/source/backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ DHuS watcher
------------
.. automodule:: pytroll_watchers.dhus_watcher
:members:

Adding a new backend
--------------------
The base concept of the pytroll watchers library is very simply the publishing of file events. In order to add a new backend, two things need to be done:

1. Implement a generator that iterates over filesystem events and generates a pair of (file item, file metadata). The file item is the the (U)Path object to the file (check out universal_pathlib). The file metadata is a dictionary that contains the metadata of the file. It can be formatted in the same fashion as the message config. If it does not contain a `data` key, it is expected to be the contents of the data key itself (ie `subject` and `atype` will not be used as message parameters, but rather as `data` items).

2. Add an entry point to the new backend module. This module is expected to implement a `file_publisher` that just takes the config dictionary as argument.
16 changes: 16 additions & 0 deletions docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ or for the case of a directory::
unpack:
format: directory
include_dir_in_uid: true

Configuration
_____________

The configuration of pytroll watchers’ CLI is done through a yaml configuration
file representing a dictionary (key-value pairs) of options. Similarly different functions in the library use that same dictionary (a python object in that case) as input parameter.
This section should apply to both dictionaries, unless otherwise specified.

The configuration is expected to contain the following four items:

- `fs_config` for the configuration of the filesystem to watch. The parameters to provide here differ depending on what backend is being used, so the curious reader is recommended to look at the description for the corresponding backend.
- `publiser_config` for the configuration of the publishing part. These parameters are passed directly to posttroll’s `create_publisher_from_dict`, so here again, please refer to the corresponding documentation.
- `message_config` for the configuration of the message to send when a new filesystem object is to be published. Most parameters here are passed to posttroll’s `Message` constructor (check the documentation too), with one exception: `aliases` is removed from the items passed to the constructor and is used instead for replacing information from the file/object metadata. Say for example that the parsed file object from a local file system contains the `platform_name` field, and is set to a satellite short name `npp`. Providing an alias of the form `{platform_name: {npp: "Suomi-NPP"}}` will effectively set the platform name to "Suomi-NPP". Each value of the aliases dictionary is itself a dictionary, so multiple key-value pairs can be provided here to support multiple cases.
- `data_config` (optional) for the configuration of the "processing" to be done to the incoming filesystem events. At the moment, there are two possibilities: `fetch` for fetching remote data locally before advertising the event, and `unpack` to send the advertising the components of the event (e.g. in the case the user wants to advertise the contents of a receive zip file event). More information on how to format these options is shown in the examples above.

Finally, the last element for the case of the configuration file for the CLI is the top-level `backend` parameter which tells pytroll-watcher which backend to use. The list of available backends is available as entry points defined in the pyproject.toml file.
46 changes: 32 additions & 14 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,48 @@ def file_publisher_from_generator(generator, config):
- filesystem: `{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], "profile": "my_profile"}`
- path: `/eodata/Sentinel-3/OLCI/OL_1_EFR___/2024/04/15/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiance.nc`
""" # noqa
publisher_config = config["publisher_config"]
publisher_config = config.pop("publisher_config")
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()

message_config = config["message_config"]
unpack = message_config.pop("unpack", None)
if unpack is not None:
warn("The `unpack` option should be passed inside the `data_config` section", DeprecationWarning, stacklevel=1)

data_config = config.get("data_config", dict())

with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})
amended_message_config["data"].update(prepare_data(file_item, data_config))
aliases = amended_message_config.pop("aliases", {})
apply_aliases(aliases, file_metadata)
amended_message_config["data"].update(file_metadata)
msg = Message(**amended_message_config)
msg = _create_message(file_item, file_metadata, config)
logger.info(f"Sending {str(msg)}")
publisher.send(str(msg))


def _create_message(file_item, file_metadata, config):
config = deepcopy(config)
message_config = config.pop("message_config", dict())
unpack = message_config.pop("unpack", None)
if unpack is not None:
warn("The `unpack` option should be passed inside the `data_config` section", DeprecationWarning, stacklevel=1)

data_config = config.pop("data_config", dict())

if file_metadata and ("data" in file_metadata):
file_mda = deepcopy(file_metadata)
message_data = file_mda.pop("data")
message_parameters = file_mda
else:
message_data = file_metadata or dict()
message_parameters = dict()
message_parameters.update(message_config)
message_parameters.setdefault("data", {})

file_location_info = prepare_data(file_item, data_config)
message_parameters["data"].update(file_location_info)

aliases = message_parameters.pop("aliases", {})
apply_aliases(aliases, message_data)

message_parameters["data"].update(message_data)

return Message(**message_parameters)


def prepare_data(file_item, data_config):
"""Prepare the data for further processing."""
fetch = data_config.pop("fetch", {})
Expand Down
33 changes: 33 additions & 0 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,36 @@ def test_unpacking_directory(tmp_path):
assert "my_dir/file2" in messages[0]
msg = Message.decode(messages[0])
assert msg.data["dataset"][0]["uid"].startswith("my_dir")


def test_publish_paths_with_fetching(tmp_path):
"""Test publishing paths."""
basename = "foo+bar,baz_.txt"
filename = os.fspath(tmp_path / basename)
with open(filename, "w"):
pass

destination = tmp_path / "downloaded"
destination.mkdir()

publisher_settings = dict(nameservers=False, port=1979)
subject = "/hey/jude"
atype = "atomic"
message_settings = dict(atype="file", data=dict(sensor="viirs"))
data_config = dict(fetch=dict(destination=destination))

items = [(filename, dict(subject=subject, atype=atype,
data=dict(mime="txt")))]

with patched_publisher() as messages:
config = dict(publisher_config=publisher_settings,
message_config=message_settings,
data_config=data_config)
file_publisher_from_generator(items, config)

assert "uri" not in message_settings["data"]
assert len(messages) == 1
message = Message(rawstr=messages[0])
assert message.subject == subject
assert message.type == "file"
assert message.data["sensor"] == "viirs"

0 comments on commit 47bf61c

Please sign in to comment.