Skip to content

Commit

Permalink
new: [core] Added new async report_error function and preparing a fir…
Browse files Browse the repository at this point in the history
…st trusted release on Pypi.
  • Loading branch information
cedricbonhomme committed Feb 13, 2025
1 parent bdbadb0 commit 1fd2cb0
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 39 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
on:
release:
types:
- published

name: release

jobs:
pypi-publish:
name: Upload release to PyPI
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/BlueSkySight

permissions:
id-token: write # IMPORTANT: this permission is mandatory for trusted publishing
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install Poetry
run: python -m pip install --upgrade pip poetry
- name: Build artifacts
run: poetry build
- name: Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## Release 1.0.0 (2025-02-13)

This release introduces the capability to report errors, warnings,
and heartbeats to a Valkey datastore, facilitating centralized monitoring.
Events are reported using Python coroutines, with the heartbeat integrated
into the main event loop.


## Release 0.5.0 (2025-01-06)

The initial code for connecting to BlueSky's firehose has been replaced with
Expand Down
27 changes: 9 additions & 18 deletions blueskysight/firehose.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import asyncio
import json
import struct
import time
import typing as t
from base64 import b32encode
from io import BytesIO

import valkey
import websockets

from blueskysight import config
from blueskysight.utils import (
extract_vulnerability_ids,
get_post_url,
heartbeat,
push_sighting_to_vulnerability_lookup,
report_error,
)

BSKY_FIREHOSE = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
Expand Down Expand Up @@ -294,8 +294,12 @@ async def firehose():
await process_firehose(ws)
except websockets.ConnectionClosedError as e:
print(f"Connection closed unexpectedly: {e}. Reconnecting…")
await report_error(
"error", f"Connection closed unexpectedly: {e}. Reconnecting…"
)
except Exception as e:
print(f"Unexpected error: {e}. Reconnecting…")
await report_error("error", f"Unexpected error: {e}. Reconnecting…")
finally:
await asyncio.sleep(5) # Delay before attempting reconnection

Expand All @@ -315,9 +319,11 @@ async def process_firehose(ws):
await process_commit_frame(body)
except websockets.ConnectionClosedError:
print("WebSocket connection lost during processing.")
await report_error("error", "WebSocket connection lost during processing.")
raise
except Exception as e:
print(f"Error while processing frame: {e}")
await report_error("error", f"Error while processing frame: {e}")


async def process_commit_frame(body):
Expand Down Expand Up @@ -367,22 +373,6 @@ def extract_textual_content(blocks):
]


async def heartbeat():
"""Sends a heartbeat in the Valkey datastore."""
client = valkey.Valkey(config.valkey_host, config.valkey_port)
while True:
try:
client.set(
"process_BlueskySight_heartbeat",
time.time(),
ex=config.expiration_period,
)
except Exception as e:
print(f"Heartbeat error: {e}")
raise # Propagate the error to stop the process
await asyncio.sleep(30) # Heartbeat every 30 seconds


async def launch_with_hearbeat():
"""Launch the heartbeat within the same event loop as the firehose coroutine."""
tasks = [asyncio.create_task(heartbeat()), asyncio.create_task(firehose())]
Expand All @@ -391,6 +381,7 @@ async def launch_with_hearbeat():
await asyncio.gather(*tasks)
except Exception as e:
print(f"Error detected: {e}")
await report_error("error", f"Error detected: {e}")
# Cancel remaining tasks (like heartbeat)
for task in tasks:
task.cancel()
Expand Down
41 changes: 34 additions & 7 deletions blueskysight/jetstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import os
import platform
import typing as t
import websockets
from pathlib import Path
from urllib.parse import urlencode

import websockets
import zstandard as zstd

from blueskysight import config
from blueskysight.utils import (
extract_vulnerability_ids,
get_post_url,
heartbeat,
push_sighting_to_vulnerability_lookup,
)

Expand Down Expand Up @@ -259,6 +261,26 @@ async def process_jetstream_message(json_message):
push_sighting_to_vulnerability_lookup(url, vulnerability_ids)


async def launch_with_hearbeat():
"""Launch the heartbeat within the same event loop as the firehose coroutine."""
tasks = [
asyncio.create_task(heartbeat("process_heartbeat_BlueskySight-Jetstream")),
asyncio.create_task(jetstream()),
]
try:
# Wait for all tasks, but stop on the first exception
await asyncio.gather(*tasks)
except Exception as e:
print(f"Error detected: {e}")
# Cancel remaining tasks (like heartbeat)
for task in tasks:
task.cancel()
# Wait until all tasks are properly cancelled
await asyncio.gather(*tasks, return_exceptions=True)
# Re-raise the exception to propagate it
raise


def main():
parser = argparse.ArgumentParser(
prog="BlueSkySight-Jetstream", description="Connect to a Jetstream service."
Expand All @@ -283,13 +305,18 @@ def main():

arguments = parser.parse_args()

asyncio.run(
jetstream(
collections=arguments.collections,
geo=arguments.geo,
instance=arguments.instance,
if config.heartbeat_enabled:
# Execute the firehose() coroutine within the same event loop as the heartbeat for monitoring
asyncio.run(launch_with_hearbeat())
else:
# No monitoring of the process
asyncio.run(
jetstream(
collections=arguments.collections,
geo=arguments.geo,
instance=arguments.instance,
)
)
)


if __name__ == "__main__":
Expand Down
38 changes: 38 additions & 0 deletions blueskysight/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,52 @@
import asyncio
import base64
import hashlib
import io
import struct
import time
from enum import Enum

import httpx
import valkey
from pyvulnerabilitylookup import PyVulnerabilityLookup

from blueskysight import config

if config.heartbeat_enabled:
valkey_client = valkey.Valkey(config.valkey_host, config.valkey_port)


async def heartbeat(key="process_heartbeat_BlueskySight") -> None:
"""Sends a heartbeat in the Valkey datastore."""
if not config.heartbeat_enabled:
return
while True:
try:
valkey_client.set(
key,
time.time(),
ex=config.expiration_period,
)
except Exception as e:
print(f"Heartbeat error: {e}")
raise # Propagate the error to stop the process
await asyncio.sleep(30) # Heartbeat every 30 seconds


async def report_error(
level="warning", message="", key="process_logs_BlueskySight"
) -> None:
"""Reports an error or warning in the Valkey datastore."""
timestamp = time.time()
log_entry = {"timestamp": timestamp, "level": level, "message": message}
try:
# Add the log entry to a list, so multiple messages are preserved
valkey_client.rpush(key, str(log_entry))
valkey_client.expire(key, 86400) # Expire after 24 hours
except Exception as e:
print(f"Error reporting failure: {e}")
raise


def push_sighting_to_vulnerability_lookup(status_uri, vulnerability_ids):
"""Create a sighting from an incoming status and push it to the Vulnerability Lookup instance."""
Expand Down
24 changes: 12 additions & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ build-backend = "poetry.core.masonry.api"

[project]
name = "BlueSkySight"
version = "0.6.0"
version = "1.0.0"
description = "A client to gather vulnerability-related information from Bluesky."
authors = [
{name = "Cédric Bonhomme", email = "[email protected]"}
]
license = "GPL-3.0-or-later"
readme = "README.md"
keywords = ["Vulnerability-Lookup", "Vulnerability", "CVE", "Bluesky"]
# classifieres is dynamic because we want to create Python classifiers automatically

dynamic = ["classifiers"]

requires-python = ">=3.10,<4.0"
Expand Down

0 comments on commit 1fd2cb0

Please sign in to comment.