From 1fd2cb0cb933f93594840d763d8ac99e9f4713d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Bonhomme?= Date: Thu, 13 Feb 2025 08:48:06 +0100 Subject: [PATCH] new: [core] Added new async report_error function and preparing a first trusted release on Pypi. --- .github/workflows/release.yml | 27 +++++++++++++++++++++++ CHANGELOG.md | 8 +++++++ blueskysight/firehose.py | 27 ++++++++--------------- blueskysight/jetstream.py | 41 +++++++++++++++++++++++++++++------ blueskysight/utils.py | 38 ++++++++++++++++++++++++++++++++ poetry.lock | 24 ++++++++++---------- pyproject.toml | 4 ++-- 7 files changed, 130 insertions(+), 39 deletions(-) create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..06540a5 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,27 @@ +on: + release: + types: + - published + +name: release + +jobs: + pypi-publish: + name: Upload release to PyPI + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/BlueSkySight + + permissions: + id-token: write # IMPORTANT: this permission is mandatory for trusted publishing + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Install Poetry + run: python -m pip install --upgrade pip poetry + - name: Build artifacts + run: poetry build + - name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f4c959..a583175 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Release 1.0.0 (2025-02-13) + +This release introduces the capability to report errors, warnings, +and heartbeats to a Valkey datastore, facilitating centralized monitoring. +Events are reported using Python coroutines, with the heartbeat integrated +into the main event loop. + + ## Release 0.5.0 (2025-01-06) The initial code for connecting to BlueSky's firehose has been replaced with diff --git a/blueskysight/firehose.py b/blueskysight/firehose.py index a420a92..1d8a438 100644 --- a/blueskysight/firehose.py +++ b/blueskysight/firehose.py @@ -1,19 +1,19 @@ import asyncio import json import struct -import time import typing as t from base64 import b32encode from io import BytesIO -import valkey import websockets from blueskysight import config from blueskysight.utils import ( extract_vulnerability_ids, get_post_url, + heartbeat, push_sighting_to_vulnerability_lookup, + report_error, ) BSKY_FIREHOSE = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" @@ -294,8 +294,12 @@ async def firehose(): await process_firehose(ws) except websockets.ConnectionClosedError as e: print(f"Connection closed unexpectedly: {e}. Reconnecting…") + await report_error( + "error", f"Connection closed unexpectedly: {e}. Reconnecting…" + ) except Exception as e: print(f"Unexpected error: {e}. Reconnecting…") + await report_error("error", f"Unexpected error: {e}. Reconnecting…") finally: await asyncio.sleep(5) # Delay before attempting reconnection @@ -315,9 +319,11 @@ async def process_firehose(ws): await process_commit_frame(body) except websockets.ConnectionClosedError: print("WebSocket connection lost during processing.") + await report_error("error", "WebSocket connection lost during processing.") raise except Exception as e: print(f"Error while processing frame: {e}") + await report_error("error", f"Error while processing frame: {e}") async def process_commit_frame(body): @@ -367,22 +373,6 @@ def extract_textual_content(blocks): ] -async def heartbeat(): - """Sends a heartbeat in the Valkey datastore.""" - client = valkey.Valkey(config.valkey_host, config.valkey_port) - while True: - try: - client.set( - "process_BlueskySight_heartbeat", - time.time(), - ex=config.expiration_period, - ) - except Exception as e: - print(f"Heartbeat error: {e}") - raise # Propagate the error to stop the process - await asyncio.sleep(30) # Heartbeat every 30 seconds - - async def launch_with_hearbeat(): """Launch the heartbeat within the same event loop as the firehose coroutine.""" tasks = [asyncio.create_task(heartbeat()), asyncio.create_task(firehose())] @@ -391,6 +381,7 @@ async def launch_with_hearbeat(): await asyncio.gather(*tasks) except Exception as e: print(f"Error detected: {e}") + await report_error("error", f"Error detected: {e}") # Cancel remaining tasks (like heartbeat) for task in tasks: task.cancel() diff --git a/blueskysight/jetstream.py b/blueskysight/jetstream.py index b8e2088..e2e2c94 100644 --- a/blueskysight/jetstream.py +++ b/blueskysight/jetstream.py @@ -4,15 +4,17 @@ import os import platform import typing as t -import websockets from pathlib import Path from urllib.parse import urlencode +import websockets import zstandard as zstd +from blueskysight import config from blueskysight.utils import ( extract_vulnerability_ids, get_post_url, + heartbeat, push_sighting_to_vulnerability_lookup, ) @@ -259,6 +261,26 @@ async def process_jetstream_message(json_message): push_sighting_to_vulnerability_lookup(url, vulnerability_ids) +async def launch_with_hearbeat(): + """Launch the heartbeat within the same event loop as the firehose coroutine.""" + tasks = [ + asyncio.create_task(heartbeat("process_heartbeat_BlueskySight-Jetstream")), + asyncio.create_task(jetstream()), + ] + try: + # Wait for all tasks, but stop on the first exception + await asyncio.gather(*tasks) + except Exception as e: + print(f"Error detected: {e}") + # Cancel remaining tasks (like heartbeat) + for task in tasks: + task.cancel() + # Wait until all tasks are properly cancelled + await asyncio.gather(*tasks, return_exceptions=True) + # Re-raise the exception to propagate it + raise + + def main(): parser = argparse.ArgumentParser( prog="BlueSkySight-Jetstream", description="Connect to a Jetstream service." @@ -283,13 +305,18 @@ def main(): arguments = parser.parse_args() - asyncio.run( - jetstream( - collections=arguments.collections, - geo=arguments.geo, - instance=arguments.instance, + if config.heartbeat_enabled: + # Execute the firehose() coroutine within the same event loop as the heartbeat for monitoring + asyncio.run(launch_with_hearbeat()) + else: + # No monitoring of the process + asyncio.run( + jetstream( + collections=arguments.collections, + geo=arguments.geo, + instance=arguments.instance, + ) ) - ) if __name__ == "__main__": diff --git a/blueskysight/utils.py b/blueskysight/utils.py index 23855ec..28132ea 100644 --- a/blueskysight/utils.py +++ b/blueskysight/utils.py @@ -1,14 +1,52 @@ +import asyncio import base64 import hashlib import io import struct +import time from enum import Enum import httpx +import valkey from pyvulnerabilitylookup import PyVulnerabilityLookup from blueskysight import config +if config.heartbeat_enabled: + valkey_client = valkey.Valkey(config.valkey_host, config.valkey_port) + + +async def heartbeat(key="process_heartbeat_BlueskySight") -> None: + """Sends a heartbeat in the Valkey datastore.""" + if not config.heartbeat_enabled: + return + while True: + try: + valkey_client.set( + key, + time.time(), + ex=config.expiration_period, + ) + except Exception as e: + print(f"Heartbeat error: {e}") + raise # Propagate the error to stop the process + await asyncio.sleep(30) # Heartbeat every 30 seconds + + +async def report_error( + level="warning", message="", key="process_logs_BlueskySight" +) -> None: + """Reports an error or warning in the Valkey datastore.""" + timestamp = time.time() + log_entry = {"timestamp": timestamp, "level": level, "message": message} + try: + # Add the log entry to a list, so multiple messages are preserved + valkey_client.rpush(key, str(log_entry)) + valkey_client.expire(key, 86400) # Expire after 24 hours + except Exception as e: + print(f"Error reporting failure: {e}") + raise + def push_sighting_to_vulnerability_lookup(status_uri, vulnerability_ids): """Create a sighting from an incoming status and push it to the Vulnerability Lookup instance.""" diff --git a/poetry.lock b/poetry.lock index c40e6b6..abb6262 100644 --- a/poetry.lock +++ b/poetry.lock @@ -425,14 +425,14 @@ zstd = ["zstandard (>=0.18.0)"] [[package]] name = "identify" -version = "2.6.6" +version = "2.6.7" description = "File identification library for Python" optional = false python-versions = ">=3.9" groups = ["dev"] files = [ - {file = "identify-2.6.6-py2.py3-none-any.whl", hash = "sha256:cbd1810bce79f8b671ecb20f53ee0ae8e86ae84b557de31d89709dc2a48ba881"}, - {file = "identify-2.6.6.tar.gz", hash = "sha256:7bec12768ed44ea4761efb47806f0a41f86e7c0a5fdf5950d4648c90eca7e251"}, + {file = "identify-2.6.7-py2.py3-none-any.whl", hash = "sha256:155931cb617a401807b09ecec6635d6c692d180090a1cedca8ef7d58ba5b6aa0"}, + {file = "identify-2.6.7.tar.gz", hash = "sha256:3fa266b42eba321ee0b2bb0936a6a6b9e36a1351cbb69055b3082f4193035684"}, ] [package.extras] @@ -618,21 +618,21 @@ files = [ [[package]] name = "pyvulnerabilitylookup" -version = "2.2.0" +version = "2.5.0" description = "Python CLI and module for Vulnerability Lookup" optional = false -python-versions = "<4.0,>=3.10" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "pyvulnerabilitylookup-2.2.0-py3-none-any.whl", hash = "sha256:27acef0e4fb1fad301c1a4c403de0fe6f06dcd2d12234eb2f63d2f737a7475a7"}, - {file = "pyvulnerabilitylookup-2.2.0.tar.gz", hash = "sha256:cdf56157cf9df013e2131ed3c03029e9e9be9211d4cf4097b2999448b6d6fec3"}, + {file = "pyvulnerabilitylookup-2.5.0-py3-none-any.whl", hash = "sha256:abbb0fdc0ffd66f1debb2d1e4f70ad6747a65d92f6b3550bea2f0495fc258032"}, + {file = "pyvulnerabilitylookup-2.5.0.tar.gz", hash = "sha256:45cd9182c5c93c869ea5a9a85b5f39aa565c5862703dbeeb8239eaa08b71ba75"}, ] [package.dependencies] -requests = ">=2.32.3,<3.0.0" +requests = ">=2.32.3" [package.extras] -docs = ["Sphinx (>=8.1.3,<9.0.0)"] +docs = ["Sphinx (>=8.1.3)"] [[package]] name = "pyyaml" @@ -841,14 +841,14 @@ ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==23.2.1)", "requests (>=2.31.0)" [[package]] name = "virtualenv" -version = "20.29.1" +version = "20.29.2" description = "Virtual Python Environment builder" optional = false python-versions = ">=3.8" groups = ["dev"] files = [ - {file = "virtualenv-20.29.1-py3-none-any.whl", hash = "sha256:4e4cb403c0b0da39e13b46b1b2476e505cb0046b25f242bee80f62bf990b2779"}, - {file = "virtualenv-20.29.1.tar.gz", hash = "sha256:b8b8970138d32fb606192cb97f6cd4bb644fa486be9308fb9b63f81091b5dc35"}, + {file = "virtualenv-20.29.2-py3-none-any.whl", hash = "sha256:febddfc3d1ea571bdb1dc0f98d7b45d24def7428214d4fb73cc486c9568cce6a"}, + {file = "virtualenv-20.29.2.tar.gz", hash = "sha256:fdaabebf6d03b5ba83ae0a02cfe96f48a716f4fae556461d180825866f75b728"}, ] [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index 3180850..4f8ce5e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "BlueSkySight" -version = "0.6.0" +version = "1.0.0" description = "A client to gather vulnerability-related information from Bluesky." authors = [ {name = "Cédric Bonhomme", email = "cedric.bonhomme@circl.lu"} @@ -13,7 +13,7 @@ authors = [ license = "GPL-3.0-or-later" readme = "README.md" keywords = ["Vulnerability-Lookup", "Vulnerability", "CVE", "Bluesky"] -# classifieres is dynamic because we want to create Python classifiers automatically + dynamic = ["classifiers"] requires-python = ">=3.10,<4.0"