diff --git a/cli.py b/cli.py
index a9069e82..00536de1 100644
--- a/cli.py
+++ b/cli.py
@@ -1,22 +1,25 @@
-# uglypy/cli.py
+"""
+uglypy - Command-line interface for running various scripts.
+"""
+
import subprocess
import os
import argparse
-from logging_setup import setup_logging, get_logger
+from logging_setup import setup_logging
# Setup logging
logger = setup_logging()
def run_command(command):
"""Run a command with subprocess and log the outcome."""
- logger.info(f"Running command: {' '.join(command)}")
+ logger.info("Running command: %s", ' '.join(command))
try:
subprocess.run(command, check=True)
- logger.info(f"Command {' '.join(command)} executed successfully.")
+ logger.info("Command %s executed successfully.", ' '.join(command))
except subprocess.CalledProcessError as e:
- logger.error(f"Command {' '.join(command)} failed: {e}")
- except Exception as e:
- logger.error(f"An unexpected error occurred: {e}")
+ logger.error("Command %s failed: %s", ' '.join(command), e)
+ except Exception as e: # pylint: disable=broad-except
+ logger.error("An unexpected error occurred: %s", e)
def run_streamlit(extra_args):
"""Run the Streamlit application."""
@@ -28,7 +31,7 @@ def run_script(script_name, extra_args):
script_path = os.path.join(os.getcwd(), script_name)
if not os.path.isfile(script_path):
- logger.error(f"Error: {script_name} not found.")
+ logger.error("Error: %s not found.", script_name)
return
command = ["python", script_path] + extra_args
diff --git a/config.py b/config.py
index aeb4784a..6b374ef9 100644
--- a/config.py
+++ b/config.py
@@ -1,31 +1,34 @@
-import yaml
+"""
+Configuration Management for UglyFeed
+"""
+
from pathlib import Path
+import yaml
-config_path = Path("config.yaml")
-feeds_path = Path("input/feeds.txt")
+CONFIG_PATH = Path("config.yaml")
+FEEDS_PATH = Path("input/feeds.txt")
def tuple_constructor(loader, node):
"""Constructor for !!python/tuple tag."""
return tuple(loader.construct_sequence(node))
-# Add the constructor to PyYAML with SafeLoader replaced by the FullLoader to handle tuples
+# Add the constructor to PyYAML with FullLoader to handle tuples
yaml.add_constructor('tag:yaml.org,2002:python/tuple', tuple_constructor, Loader=yaml.FullLoader)
-def load_config(config_file=config_path):
+def load_config(config_file=CONFIG_PATH):
"""Load the configuration from the specified YAML file."""
if isinstance(config_file, str):
config_file = Path(config_file)
try:
if config_file.exists():
- with open(config_file, "r") as f:
+ with open(config_file, "r", encoding='utf-8') as f:
return yaml.load(f, Loader=yaml.FullLoader) # Use yaml.FullLoader to support custom constructors
- else:
- return {}
+ return {}
except yaml.YAMLError as e:
- raise Exception(f"Error loading YAML configuration: {e}")
+ raise Exception(f"Error loading YAML configuration: {e}") from e
except Exception as e:
- raise Exception(f"Failed to load configuration from {config_file}: {e}")
+ raise Exception(f"Failed to load configuration from {config_file}: {e}") from e
def ensure_default_config(config_data):
"""Ensure all required keys are in the config_data with default values."""
@@ -123,12 +126,12 @@ def recursive_update(d, u):
def save_configuration(config_data, feeds):
"""Save configuration and feeds to file."""
try:
- with open(config_path, "w") as f:
+ with open(CONFIG_PATH, "w", encoding='utf-8') as f:
yaml.dump(config_data, f)
- with open(feeds_path, "w") as f:
+ with open(FEEDS_PATH, "w", encoding='utf-8') as f:
f.write(feeds)
except Exception as e:
- raise Exception(f"Failed to save configuration: {e}")
+ raise Exception(f"Failed to save configuration: {e}") from e
# Usage example
if __name__ == "__main__":
diff --git a/deploy_xml.py b/deploy_xml.py
index 9a5cc63f..41cfaa48 100644
--- a/deploy_xml.py
+++ b/deploy_xml.py
@@ -1,20 +1,26 @@
+"""
+This script uploads XML files to GitHub and GitLab repositories.
+"""
+
import os
-import yaml
-import requests
import base64
import logging
+import requests
+import yaml
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
-# Function to load configuration from YAML or environment variables
def load_config(config_path='config.yaml'):
- logging.info(f"Loading configuration from {config_path} or environment variables...")
+ """
+ Load configuration from YAML or environment variables.
+ """
+ logging.info("Loading configuration from %s or environment variables...", config_path)
if os.path.exists(config_path):
- with open(config_path, 'r') as file:
+ with open(config_path, 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
else:
- logging.warning(f"Configuration file {config_path} not found. Falling back to environment variables.")
+ logging.warning("Configuration file %s not found. Falling back to environment variables.", config_path)
config = {}
config['github_token'] = config.get('github_token', os.getenv('GITHUB_TOKEN'))
@@ -26,8 +32,10 @@ def load_config(config_path='config.yaml'):
return config
-# Function to upload file to GitHub
def upload_to_github(file_path, config):
+ """
+ Upload file to GitHub.
+ """
logging.info("Uploading to GitHub...")
repo_name = config['github_repo']
token = config['github_token']
@@ -43,7 +51,7 @@ def upload_to_github(file_path, config):
content = base64.b64encode(file.read()).decode('utf-8')
# Check if the file exists in the repository
- response = requests.get(url, headers=headers)
+ response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
# File exists, retrieve its SHA
sha = response.json()['sha']
@@ -54,7 +62,7 @@ def upload_to_github(file_path, config):
'branch': 'main'
}
method = requests.put
- logging.info(f"File {file_name} exists in GitHub repo, updating it.")
+ logging.info("File %s exists in GitHub repo, updating it.", file_name)
elif response.status_code == 404:
# File does not exist, create it
data = {
@@ -63,22 +71,24 @@ def upload_to_github(file_path, config):
'branch': 'main'
}
method = requests.put
- logging.info(f"File {file_name} does not exist in GitHub repo, creating it.")
+ logging.info("File %s does not exist in GitHub repo, creating it.", file_name)
else:
- logging.error(f"GitHub file check failed: {response.text}")
+ logging.error("GitHub file check failed: %s", response.text)
raise Exception(f"GitHub file check failed: {response.text}")
# Upload or update the file
- response = method(url, json=data, headers=headers)
+ response = method(url, json=data, headers=headers, timeout=10)
if response.status_code in (200, 201):
download_url = response.json()['content']['download_url']
return download_url
else:
- logging.error(f"GitHub upload failed: {response.text}")
+ logging.error("GitHub upload failed: %s", response.text)
raise Exception(f"GitHub upload failed: {response.text}")
-# Function to upload file to GitLab
def upload_to_gitlab(file_path, config):
+ """
+ Upload file to GitLab.
+ """
logging.info("Uploading to GitLab...")
repo_name = config['gitlab_repo']
token = config['gitlab_token']
@@ -88,7 +98,7 @@ def upload_to_gitlab(file_path, config):
file_name = os.path.basename(file_path)
url = f'https://gitlab.com/api/v4/projects/{repo_name}/repository/files/{file_name}'
- with open(file_path, 'r') as file:
+ with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
data = {
@@ -97,26 +107,28 @@ def upload_to_gitlab(file_path, config):
'commit_message': 'Add uglyfeed.xml'
}
- response = requests.post(url, json=data, headers=headers)
+ response = requests.post(url, json=data, headers=headers, timeout=10)
if response.status_code == 201:
download_url = f'https://gitlab.com/{repo_name}/-/raw/main/{file_name}'
return download_url
elif response.status_code == 400 and 'already exists' in response.text:
# Update file if it already exists
logging.info("File already exists on GitLab, attempting to update...")
- response = requests.put(url, json=data, headers=headers)
+ response = requests.put(url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
download_url = f'https://gitlab.com/{repo_name}/-/raw/main/{file_name}'
return download_url
else:
- logging.error(f"GitLab update failed: {response.text}")
+ logging.error("GitLab update failed: %s", response.text)
raise Exception(f"GitLab update failed: {response.text}")
else:
- logging.error(f"GitLab upload failed: {response.text}")
+ logging.error("GitLab upload failed: %s", response.text)
raise Exception(f"GitLab upload failed: {response.text}")
-# Main function to deploy XML file
def deploy_xml(file_path, config):
+ """
+ Deploy XML file to GitHub and GitLab based on the configuration.
+ """
urls = {}
if config.get('enable_github', False):
@@ -124,14 +136,14 @@ def deploy_xml(file_path, config):
github_url = upload_to_github(file_path, config)
urls['github'] = github_url
except Exception as e:
- logging.error(f"GitHub upload error: {e}")
+ logging.error("GitHub upload error: %s", e)
if config.get('enable_gitlab', False):
try:
gitlab_url = upload_to_gitlab(file_path, config)
urls['gitlab'] = gitlab_url
except Exception as e:
- logging.error(f"GitLab upload error: {e}")
+ logging.error("GitLab upload error: %s", e)
return urls
@@ -140,10 +152,10 @@ def deploy_xml(file_path, config):
config = load_config()
# File to deploy
- xml_file_path = 'uglyfeeds/uglyfeed.xml'
+ XML_FILE_PATH = 'uglyfeeds/uglyfeed.xml'
# Deploy the XML file
- urls = deploy_xml(xml_file_path, config)
+ urls = deploy_xml(XML_FILE_PATH, config)
# Print the URLs
if urls:
diff --git a/json2rss.py b/json2rss.py
index 7fd577af..f04f0628 100644
--- a/json2rss.py
+++ b/json2rss.py
@@ -1,12 +1,16 @@
+"""
+This script processes JSON files and generates an RSS feed.
+"""
+
import json
import os
import urllib.parse
from datetime import datetime, timedelta
from xml.etree.ElementTree import Element, SubElement, ElementTree, parse, register_namespace
-import yaml
import re
import logging
import argparse
+import yaml
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -22,10 +26,10 @@
def load_config(config_file='config.yaml'):
"""Load configuration from a YAML file."""
try:
- with open(config_file, 'r') as file:
+ with open(config_file, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
except FileNotFoundError:
- logging.error(f"Configuration file '{config_file}' not found.")
+ logging.error("Configuration file '%s' not found.", config_file)
exit(1)
def get_config_value(config, key, default_value=None):
@@ -51,16 +55,16 @@ def read_json_files(directory):
data = json.load(file)
json_data.append(data)
except json.JSONDecodeError as e:
- logging.error(f"Error decoding JSON from file {filename}: {e}")
+ logging.error("Error decoding JSON from file %s: %s", filename, e)
return json_data
def load_moderated_words(file_path):
"""Load a list of moderated words from a file."""
try:
- with open(file_path, 'r') as file:
+ with open(file_path, 'r', encoding='utf-8') as file:
return [line.strip() for line in file if line.strip()]
except FileNotFoundError:
- logging.error(f"Moderated words file '{file_path}' not found.")
+ logging.error("Moderated words file '%s' not found.", file_path)
return []
def replace_swear_words(text, moderated_words):
@@ -87,8 +91,9 @@ def create_rss_channel(config):
language = SubElement(channel, 'language')
language.text = get_config_value(config, 'feed_language', 'en')
- atom_link = SubElement(channel, 'atom:link', {
- 'href': get_config_value(config, 'feed_self_link', 'https://raw.githubusercontent.com/fabriziosalmi/UglyFeed/main/examples/uglyfeed-source-1.xml'),
+ SubElement(channel, 'atom:link', {
+ 'href': get_config_value(config, 'feed_self_link',
+ 'https://raw.githubusercontent.com/fabriziosalmi/UglyFeed/main/examples/uglyfeed-source-1.xml'),
'rel': 'self',
'type': 'application/rss+xml'
})
@@ -103,8 +108,8 @@ def create_rss_channel(config):
category.text = get_config_value(config, 'category')
if 'copyright' in config:
- copyright = SubElement(channel, 'copyright')
- copyright.text = get_config_value(config, 'copyright')
+ copy_right = SubElement(channel, 'copyright')
+ copy_right.text = get_config_value(config, 'copyright')
return rss, channel
@@ -118,11 +123,15 @@ def process_item(item, config, moderated_words):
item_title = SubElement(item_element, 'title')
title_text = item.get('title', 'No Title')
- item_title.text = escape_xml_chars(replace_swear_words(title_text, moderated_words) if moderation_enabled else title_text)
+ item_title.text = escape_xml_chars(
+ replace_swear_words(title_text, moderated_words) if moderation_enabled else title_text
+ )
item_description = SubElement(item_element, 'description')
content = item.get('content', 'No Content')
- content = escape_xml_chars(replace_swear_words(content, moderated_words) if moderation_enabled else content)
+ content = escape_xml_chars(
+ replace_swear_words(content, moderated_words) if moderation_enabled else content
+ )
if 'links' in item:
links = item['links']
@@ -136,17 +145,25 @@ def process_item(item, config, moderated_words):
api = item.get('api', 'Unknown API')
model = item.get('model', 'Unknown Model')
- content += f'
Generated by {escape_xml_chars(model)} via {escape_xml_chars(api.capitalize())}'
+ content += (
+ f'
Generated by {escape_xml_chars(model)} via '
+ f'{escape_xml_chars(api.capitalize())}'
+ )
item_description.text = content
+ processed_at_str = item.get('processed_at', datetime.now().isoformat())
+ if processed_at_str is None:
+ processed_at_str = datetime.now().isoformat()
try:
- processed_at = datetime.strptime(item.get('processed_at', datetime.now().isoformat()), '%Y-%m-%d %H:%M:%S')
+ processed_at = datetime.strptime(processed_at_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
processed_at = datetime.now()
pub_date = SubElement(item_element, 'pubDate')
- pub_date.text = processed_at.strftime(get_config_value(config, 'datetime_format', '%a, %d %b %Y %H:%M:%S GMT'))
+ pub_date.text = processed_at.strftime(
+ get_config_value(config, 'datetime_format', '%a, %d %b %Y %H:%M:%S GMT')
+ )
guid = SubElement(item_element, 'guid')
guid.text = f"https://github.com/fabriziosalmi/UglyFeed/{urllib.parse.quote(item.get('title', 'No Title'))}"
@@ -165,8 +182,10 @@ def create_rss_feed(json_data, output_path, config):
tree = parse(output_path)
rss = tree.getroot()
channel = rss.find('channel')
- except Exception as e:
- logging.error(f"Error parsing existing RSS file: {e}")
+ if channel is None:
+ raise ValueError("Channel element not found in existing RSS file.")
+ except (ValueError, Exception) as e:
+ logging.error("Error parsing existing RSS file: %s", e)
return
else:
rss, channel = create_rss_channel(config)
@@ -175,33 +194,47 @@ def create_rss_feed(json_data, output_path, config):
cutoff_date = datetime.now() - timedelta(days=int(get_config_value(config, 'max_age_days', 30)))
for item in json_data:
item_element = process_item(item, config, moderated_words)
- processed_at = datetime.strptime(item_element.find('pubDate').text, get_config_value(config, 'datetime_format', '%a, %d %b %Y %H:%M:%S GMT'))
+ pub_date_element = item_element.find('pubDate')
+ if pub_date_element is not None:
+ processed_at = datetime.strptime(
+ pub_date_element.text,
+ get_config_value(config, 'datetime_format', '%a, %d %b %Y %H:%M:%S GMT')
+ )
+ else:
+ processed_at = datetime.now()
if processed_at >= cutoff_date:
new_items.append(item_element)
- existing_items = list(channel.findall('item'))
+ existing_items = list(channel.findall('item')) if channel is not None else []
all_items = existing_items + new_items
- all_items.sort(key=lambda x: datetime.strptime(x.find('pubDate').text, get_config_value(config, 'datetime_format', '%a, %d %b %Y %H:%M:%S GMT')), reverse=True)
+ all_items.sort(
+ key=lambda x: datetime.strptime(
+ x.find('pubDate').text,
+ get_config_value(config, 'datetime_format', '%a, %d %b %Y %H:%M:%S GMT')
+ ),
+ reverse=True
+ )
max_items = int(get_config_value(config, 'max_items', 50))
trimmed_items = all_items[:max_items]
- for item in channel.findall('item'):
- channel.remove(item)
- for item in trimmed_items:
- channel.append(item)
+ if channel is not None:
+ for item in channel.findall('item'):
+ channel.remove(item)
+ for item in trimmed_items:
+ channel.append(item)
try:
tree = ElementTree(rss)
tree.write(output_path, encoding='utf-8', xml_declaration=True)
item_count = len(trimmed_items)
- logging.info(f"RSS feed successfully updated at {output_path}")
- logging.info(f"Total items in feed: {item_count}")
+ logging.info("RSS feed successfully updated at %s", output_path)
+ logging.info("Total items in feed: %d", item_count)
print(f"RSS feed successfully generated at {output_path}")
print(f"Total items in feed: {item_count}")
except IOError as e:
- logging.error(f"Error saving RSS feed to file {output_path}: {e}")
+ logging.error("Error saving RSS feed to file %s: %s", output_path, e)
def main():
"""Main function to read JSON files and create/update the RSS feed."""
@@ -241,7 +274,7 @@ def main():
else:
config[key] = value
- logging.debug(f"Configuration: {json.dumps(config, indent=4)}")
+ logging.debug("Configuration: %s", json.dumps(config, indent=4))
rewritten_dir = config.get('rewritten_dir', 'rewritten')
output_path = os.path.join(config.get('output_dir', 'uglyfeeds'), 'uglyfeed.xml')
diff --git a/llm_processor.py b/llm_processor.py
index 9969bc98..f36ae515 100644
--- a/llm_processor.py
+++ b/llm_processor.py
@@ -1,6 +1,9 @@
+"""
+This script processes JSON files using various LLM APIs and saves the rewritten content.
+"""
+
import re
import json
-import requests
import logging
import argparse
import yaml
@@ -8,6 +11,7 @@
import time
from pathlib import Path
from datetime import datetime
+import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from openai import OpenAI
@@ -20,6 +24,7 @@
MAX_TOKENS = 32768
def requests_retry_session(retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), session=None):
+ """Create a requests session with retry logic."""
session = session or requests.Session()
retry = Retry(
total=retries,
@@ -34,10 +39,11 @@ def requests_retry_session(retries=3, backoff_factor=0.3, status_forcelist=(500,
return session
def estimate_token_count(text):
- # Simple estimation: one token per 4 characters
+ """Estimate the number of tokens in a text."""
return len(text) / 4
def truncate_content(content, max_tokens):
+ """Truncate the content to fit within the maximum token limit."""
tokens = content.split()
truncated_content = []
current_tokens = 0
@@ -51,6 +57,7 @@ def truncate_content(content, max_tokens):
return ' '.join(truncated_content)
def call_openai_api(api_url, combined_content, model, api_key):
+ """Call the OpenAI API with the given parameters."""
client = OpenAI(api_key=api_key)
try:
response = client.chat.completions.create(
@@ -62,10 +69,11 @@ def call_openai_api(api_url, combined_content, model, api_key):
)
return response.choices[0].message.content
except Exception as e:
- logger.error(f"OpenAI API request failed: {e}")
+ logger.error("OpenAI API request failed: %s", e)
return None
def call_groq_api(api_url, combined_content, model, api_key):
+ """Call the Groq API with the given parameters."""
data = json.dumps({
"model": model,
"messages": [{"role": "user", "content": combined_content}],
@@ -75,54 +83,56 @@ def call_groq_api(api_url, combined_content, model, api_key):
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
- logger.debug(f"Groq API request data: {data}")
+ logger.debug("Groq API request data: %s", data)
try:
response = requests_retry_session().post(api_url, data=data, headers=headers)
response.raise_for_status()
try:
response_json = response.json()
- logger.debug(f"Groq API response: {response_json}")
+ logger.debug("Groq API response: %s", response_json)
return response_json['choices'][0]['message']['content']
except json.JSONDecodeError as e:
- logger.error(f"Failed to parse JSON response from Groq API: {e}")
- logger.error(f"Response content: {response.text}")
+ logger.error("Failed to parse JSON response from Groq API: %s", e)
+ logger.error("Response content: %s", response.text)
return None
except requests.RequestException as e:
- logger.error(f"Groq API request failed: {e}")
+ logger.error("Groq API request failed: %s", e)
if response is not None:
- logger.error(f"Groq API response content: {response.text}")
+ logger.error("Groq API response content: %s", response.text)
if 'rate_limit_exceeded' in response.text:
retry_after = parse_retry_after(response.json())
- logger.info(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
+ logger.info("Rate limit exceeded. Retrying after %s seconds.", retry_after)
time.sleep(retry_after)
return call_groq_api(api_url, combined_content, model, api_key)
return None
def call_ollama_api(api_url, combined_content, model):
+ """Call the Ollama API with the given parameters."""
data = json.dumps({
"model": model,
"messages": [{"role": "user", "content": combined_content}],
"stream": False
})
- logger.debug(f"Ollama API request data: {data}")
+ logger.debug("Ollama API request data: %s", data)
try:
response = requests_retry_session().post(api_url, data=data, headers={'Content-Type': 'application/json'})
response.raise_for_status()
try:
response_json = response.json()
- logger.debug(f"Ollama API response: {response_json}")
+ logger.debug("Ollama API response: %s", response_json)
return response_json['message']['content']
except json.JSONDecodeError as e:
- logger.error(f"Failed to parse JSON response from Ollama API: {e}")
- logger.error(f"Response content: {response.text}")
+ logger.error("Failed to parse JSON response from Ollama API: %s", e)
+ logger.error("Response content: %s", response.text)
return None
except requests.RequestException as e:
- logger.error(f"Ollama API request failed: {e}")
+ logger.error("Ollama API request failed: %s", e)
if response is not None:
- logger.error(f"Ollama API response content: {response.text}")
+ logger.error("Ollama API response content: %s", response.text)
return None
def call_anthropic_api(api_url, combined_content, model, api_key):
+ """Call the Anthropic API with the given parameters."""
data = json.dumps({
"model": model,
"messages": [
@@ -135,13 +145,13 @@ def call_anthropic_api(api_url, combined_content, model, api_key):
'x-api-key': api_key,
'anthropic-version': '2023-06-01'
}
- logger.debug(f"Anthropic API request data: {data}")
+ logger.debug("Anthropic API request data: %s", data)
try:
response = requests_retry_session().post(api_url, data=data, headers=headers)
response.raise_for_status()
try:
response_json = response.json()
- logger.debug(f"Anthropic API response: {response_json}")
+ logger.debug("Anthropic API response: %s", response_json)
# Print the full response for debugging purposes
print("Anthropic API response:", response_json)
@@ -153,19 +163,20 @@ def call_anthropic_api(api_url, combined_content, model, api_key):
text_content = " ".join(item['text'] for item in content_items if 'text' in item)
return text_content
else:
- logger.error(f"Expected 'content' key with list structure not found in response: {response_json}")
+ logger.error("Expected 'content' key with list structure not found in response: %s", response_json)
return None
except json.JSONDecodeError as e:
- logger.error(f"Failed to parse JSON response from Anthropic API: {e}")
- logger.error(f"Response content: {response.text}")
+ logger.error("Failed to parse JSON response from Anthropic API: %s", e)
+ logger.error("Response content: %s", response.text)
return None
except requests.RequestException as e:
- logger.error(f"Anthropic API request failed: {e}")
+ logger.error("Anthropic API request failed: %s", e)
if response is not None:
- logger.error(f"Anthropic API response content: {response.text}")
+ logger.error("Anthropic API response content: %s", response.text)
return None
def parse_retry_after(response_json):
+ """Parse the retry-after duration from the response."""
try:
message = response_json['error']['message']
retry_after = float(re.search(r"try again in (\d+\.?\d*)s", message).group(1))
@@ -174,6 +185,7 @@ def parse_retry_after(response_json):
return 60 # Default retry after 60 seconds if parsing fails
def ensure_proper_punctuation(text):
+ """Ensure proper punctuation in the text."""
sentences = re.split(r'(? MAX_TOKENS:
- logger.info(f"Truncating content to fit within {MAX_TOKENS} tokens.")
+ logger.info("Truncating content to fit within %s tokens.", MAX_TOKENS)
combined_content = truncate_content(combined_content, MAX_TOKENS)
if api_type == "openai":
@@ -257,20 +269,15 @@ def process_json_file(filepath, api_url, model, api_key, content_prefix, rewritt
with open(new_filename, 'w', encoding='utf-8') as outfile:
json.dump(new_data, outfile, ensure_ascii=False, indent=4)
print(f"Rewritten file saved to {new_filename}")
- logger.info(f"Rewritten file saved to {new_filename}")
+ logger.info("Rewritten file saved to %s", new_filename)
except IOError as e:
- logger.error(f"Error writing to {new_filename}: {e}")
+ logger.error("Error writing to %s: %s", new_filename, e)
else:
logger.error("Failed to get rewritten content from LLM API.")
- logger.debug(f"Rewritten content: {rewritten_content}")
-
-
-
-
-
-
+ logger.debug("Rewritten content: %s", rewritten_content)
def validate_config(api_config):
+ """Validate the configuration for the selected API."""
selected_api = api_config.get('selected_api')
if selected_api == "OpenAI":
@@ -289,11 +296,12 @@ def validate_config(api_config):
raise ValueError(f"The selected API configuration is incomplete. Missing keys: {', '.join(missing_keys)}")
def main(config_path, prompt_path=None, api=None, api_key=None, model=None, api_url=None, output_folder=None, rewritten_folder=None):
+ """Main function to process JSON files with LLM API."""
try:
with open(config_path, 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
except (yaml.YAMLError, IOError) as e:
- logger.error(f"Error reading config file {config_path}: {e}")
+ logger.error("Error reading config file %s: %s", config_path, e)
return
api_config = config.get('api_config', {})
diff --git a/logging_setup.py b/logging_setup.py
index 0fd02d2f..275b8012 100644
--- a/logging_setup.py
+++ b/logging_setup.py
@@ -1,4 +1,6 @@
-# logging_setup.py
+"""
+Logging setup for UglyFeed
+"""
import logging
import logging.config
@@ -48,6 +50,7 @@
}
def setup_logging() -> logging.Logger:
+ """Set up logging configuration and return the root logger."""
try:
logging.config.dictConfig(LOGGING_CONFIG)
diff --git a/main.py b/main.py
index bd12d73f..48fe98e4 100644
--- a/main.py
+++ b/main.py
@@ -1,22 +1,27 @@
+"""
+This script processes RSS feeds and groups similar articles based on a similarity threshold.
+"""
+
import os
import argparse
import time
-import yaml
-import logging
import sys
-import feedparser
import json
import re
+
+
+from typing import List, Dict, Any, Optional, Tuple
+import yaml
+import feedparser
import numpy as np
+import nltk
+from langdetect import detect
+
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer, HashingVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import AgglomerativeClustering, DBSCAN, KMeans
-from tqdm import tqdm
-import nltk
-from langdetect import detect
from nltk.stem import WordNetLemmatizer, SnowballStemmer
from nltk.corpus import stopwords
-from typing import List, Dict, Any, Optional, Tuple
from logging_setup import setup_logging
# Setup logging
@@ -26,34 +31,39 @@
nltk.download('wordnet', quiet=True)
nltk.download('stopwords', quiet=True)
+
def load_config(config_path: str) -> Dict[str, Any]:
"""Load configuration from a YAML file."""
try:
- with open(config_path, 'r') as file:
- logger.info(f"Loading configuration from {config_path}")
+ with open(config_path, 'r', encoding='utf-8') as file:
+ logger.info("Loading configuration from %s", config_path)
return yaml.safe_load(file)
except yaml.YAMLError as e:
- logger.error(f"YAML error loading configuration from {config_path}: {e}")
+ logger.error("YAML error loading configuration from %s: %s", config_path, e)
sys.exit(1)
except Exception as e:
- logger.error(f"Error loading configuration from {config_path}: {e}")
+ logger.error("Error loading configuration from %s: %s", config_path, e)
sys.exit(1)
+
def ensure_directory_exists(directory: str) -> None:
"""Ensure that a directory exists; if not, create it."""
if not os.path.exists(directory):
- logger.info(f"Creating missing directory: {directory}")
+ logger.info("Creating missing directory: %s", directory)
os.makedirs(directory)
+
def get_env_variable(key: str, default: Optional[str] = None) -> Optional[str]:
"""Retrieve environment variable or use default if not set."""
value = os.getenv(key.upper(), default)
if value is None:
- logger.info(f"Environment variable {key.upper()} is not set; using default value.")
+ logger.info("Environment variable %s is not set; using default value.", key.upper())
return value
-def merge_configs(yaml_config: Dict[str, Any], env_config: Dict[str, Any], cli_config: Dict[str, Any]) -> Dict[str, Any]:
+
+def merge_configs(yaml_cfg: Dict[str, Any], env_cfg: Dict[str, Any], cli_cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Merge configurations with priority: CLI > ENV > YAML."""
+
def update_recursive(d: Dict[str, Any], u: Dict[str, Any]) -> Dict[str, Any]:
for k, v in u.items():
if isinstance(v, dict):
@@ -62,12 +72,13 @@ def update_recursive(d: Dict[str, Any], u: Dict[str, Any]) -> Dict[str, Any]:
d[k] = v
return d
- final_config = yaml_config.copy()
- final_config = update_recursive(final_config, env_config)
- final_config = update_recursive(final_config, cli_config)
+ final_config = yaml_cfg.copy()
+ final_config = update_recursive(final_config, env_cfg)
+ final_config = update_recursive(final_config, cli_cfg)
return final_config
+
def fetch_feeds_from_file(file_path: str) -> List[Dict[str, str]]:
"""Fetch and parse RSS feeds from a file containing URLs."""
articles = []
@@ -76,7 +87,7 @@ def fetch_feeds_from_file(file_path: str) -> List[Dict[str, str]]:
urls = [url.strip() for url in file.readlines()]
for url in urls:
- logger.info(f"Fetching feed from {url}")
+ logger.info("Fetching feed from %s", url)
feed = feedparser.parse(url)
articles.extend([{
'title': entry.title,
@@ -84,22 +95,24 @@ def fetch_feeds_from_file(file_path: str) -> List[Dict[str, str]]:
'link': entry.link
} for entry in feed.entries])
- logger.info(f"Total articles fetched and parsed: {len(articles)}")
+ logger.info("Total articles fetched and parsed: %d", len(articles))
except FileNotFoundError as e:
- logger.error(f"File not found: {e}")
+ logger.error("File not found: %s", e)
except Exception as e:
- logger.error(f"Error fetching feeds: {e}")
+ logger.error("Error fetching feeds: %s", e)
return articles
+
def detect_language(text: str) -> str:
"""Detect the language of a given text."""
try:
return detect(text)
except Exception as e:
- logger.warning(f"Language detection failed: {e}")
+ logger.warning("Language detection failed: %s", e)
return 'unknown'
+
def preprocess_text(text: str, language: str, config: Dict[str, Any]) -> str:
"""Preprocess the text based on the configuration settings and language."""
lemmatizer = WordNetLemmatizer()
@@ -125,6 +138,7 @@ def preprocess_text(text: str, language: str, config: Dict[str, Any]) -> str:
preprocessed_text = " ".join(tokens)
return preprocessed_text
+
def vectorize_texts(texts: List[str], config: Dict[str, Any]) -> Any:
"""Vectorize texts based on the specified method in the configuration."""
vectorizer_params = {
@@ -147,6 +161,7 @@ def vectorize_texts(texts: List[str], config: Dict[str, Any]) -> Any:
vectors = vectorizer.fit_transform(texts)
return vectors
+
def cluster_texts(vectors: Any, config: Dict[str, Any]) -> np.ndarray:
"""Cluster texts using the specified clustering method in the configuration."""
method = config.get('method', 'dbscan').lower()
@@ -176,6 +191,7 @@ def cluster_texts(vectors: Any, config: Dict[str, Any]) -> np.ndarray:
return labels
+
def aggregate_similar_articles(articles: List[Dict[str, str]], similarity_matrix: np.ndarray, threshold: float) -> List[Tuple[List[Dict[str, str]], float]]:
"""Aggregate articles into groups based on similarity matrix and threshold."""
clustering = AgglomerativeClustering(
@@ -195,6 +211,7 @@ def aggregate_similar_articles(articles: List[Dict[str, str]], similarity_matrix
return grouped_articles_with_scores
+
def save_grouped_articles(grouped_articles_with_scores: List[Tuple[List[Dict[str, str]], float]], output_dir: str) -> int:
"""Save grouped articles to JSON files and return the number of saved files."""
ensure_directory_exists(output_dir)
@@ -206,12 +223,13 @@ def save_grouped_articles(grouped_articles_with_scores: List[Tuple[List[Dict[str
try:
with open(file_path, 'w', encoding='utf-8') as file:
json.dump({'articles': group, 'average_similarity': avg_similarity}, file, ensure_ascii=False, indent=4)
- logger.info(f"Group {i}: Saved {len(group)} articles to {file_path}, Avg Similarity: {avg_similarity:.2f}")
+ logger.info("Group %d: Saved %d articles to %s, Avg Similarity: %.2f", i, len(group), file_path, avg_similarity)
saved_files_count += 1
except Exception as e:
- logger.error(f"Error saving group {i} to JSON: {e}")
+ logger.error("Error saving group %d to JSON: %s", i, e)
return saved_files_count
+
def deduplicate_articles(articles: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Remove duplicate articles based on content and link."""
seen = set()
@@ -221,9 +239,10 @@ def deduplicate_articles(articles: List[Dict[str, str]]) -> List[Dict[str, str]]
if identifier not in seen:
seen.add(identifier)
unique_articles.append(article)
- logger.info(f"Total unique articles after deduplication: {len(unique_articles)}")
+ logger.info("Total unique articles after deduplication: %d", len(unique_articles))
return unique_articles
+
def main(config: Dict[str, Any]) -> None:
"""Main function to process RSS feeds and group similar articles."""
logger.info("Starting RSS feed processing...")
@@ -237,12 +256,15 @@ def main(config: Dict[str, Any]) -> None:
try:
logger.info("Fetching and parsing RSS feeds...")
articles = fetch_feeds_from_file(input_feeds_path)
- logger.info(f"Total articles fetched and parsed: {len(articles)}")
+ logger.info("Total articles fetched and parsed: %d", len(articles))
logger.info("Deduplicating articles...")
articles = deduplicate_articles(articles)
+ except FileNotFoundError as e:
+ logger.error("File not found: %s", e)
+ return
except Exception as e:
- logger.error(f"Error fetching or parsing RSS feeds: {e}")
+ logger.error("Error fetching or parsing RSS feeds: %s", e)
return
logger.info("Preprocessing texts...")
@@ -263,15 +285,16 @@ def main(config: Dict[str, Any]) -> None:
logger.info("Saving grouped articles to JSON files...")
saved_files_count = save_grouped_articles(grouped_articles_with_scores, output_directory)
- logger.info(f"Total number of JSON files generated: {saved_files_count}")
+ logger.info("Total number of JSON files generated: %d", saved_files_count)
elapsed_time = time.time() - start_time
- logger.info(f"RSS feed processing complete in {elapsed_time:.2f} seconds")
+ logger.info("RSS feed processing complete in %.2f seconds", elapsed_time)
-def build_env_config(yaml_config: Dict[str, Any]) -> Dict[str, Any]:
+
+def build_env_config(yaml_cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Build configuration from environment variables."""
env_config = {}
- for key, value in yaml_config.items():
+ for key, value in yaml_cfg.items():
if isinstance(value, dict):
env_config[key] = build_env_config(value)
else:
@@ -280,6 +303,7 @@ def build_env_config(yaml_config: Dict[str, Any]) -> Dict[str, Any]:
env_config[key] = type(value)(env_value) if env_value is not None else value
return env_config
+
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Process RSS feeds and group similar articles based on a similarity threshold.'
@@ -306,13 +330,13 @@ def build_env_config(yaml_config: Dict[str, Any]) -> Dict[str, Any]:
args = parser.parse_args()
# Load default configuration from the YAML file
- yaml_config = load_config(args.config)
+ yaml_cfg = load_config(args.config)
# Build environment configuration based on environment variables
- env_config = build_env_config(yaml_config)
+ env_cfg = build_env_config(yaml_cfg)
# Override with command-line arguments if provided
- cli_config = {
+ cli_cfg = {
'similarity_threshold': args.similarity_threshold,
'min_samples': args.min_samples,
'eps': args.eps,
@@ -321,7 +345,7 @@ def build_env_config(yaml_config: Dict[str, Any]) -> Dict[str, Any]:
}
# Merge all configurations with priority: CLI > ENV > YAML
- final_config = merge_configs(yaml_config, env_config, cli_config)
+ final_cfg = merge_configs(yaml_cfg, env_cfg, cli_cfg)
# Run the main function with the final merged configuration
- main(final_config)
+ main(final_cfg)
diff --git a/process_multiple_metrics.py b/process_multiple_metrics.py
index 859c4952..0a474b4e 100644
--- a/process_multiple_metrics.py
+++ b/process_multiple_metrics.py
@@ -4,7 +4,6 @@
import subprocess
import logging
import re
-import sys
from pathlib import Path
# Suppress NLTK log messages
@@ -34,13 +33,18 @@
'evaluate_structural_metrics.py'
]
+
def run_evaluation_scripts(input_file, all_aggregated_scores):
- """Run evaluation scripts on the given input file and extract aggregated scores."""
+ """
+ Run evaluation scripts on the given input file and extract aggregated scores.
+ """
base_name = os.path.basename(input_file).replace('.json', '')
for script in EVALUATION_SCRIPTS:
script_path = TOOLS_DIR / script
logger.info("Running %s on %s", script_path, input_file)
- result = subprocess.run(['python', str(script_path), input_file], capture_output=True, text=True)
+ result = subprocess.run(
+ ['python', str(script_path), input_file], capture_output=True, text=True, check=False
+ )
if result.returncode != 0:
logger.error("Error running %s on %s", script_path, input_file)
logger.error(result.stderr)
@@ -59,13 +63,18 @@ def run_evaluation_scripts(input_file, all_aggregated_scores):
with open(metric_file, 'r') as file:
data = json.load(file)
extracted_scores = extract_aggregated_scores(data)
- logger.info("Extracted aggregated scores from %s: %s", metric_file, extracted_scores)
+ logger.info(
+ "Extracted aggregated scores from %s: %s", metric_file, extracted_scores
+ )
all_aggregated_scores.extend(extracted_scores)
else:
logger.warning("Metric file %s does not exist", metric_file)
+
def extract_aggregated_scores(data):
- """Extract aggregated scores from the given JSON data."""
+ """
+ Extract aggregated scores from the given JSON data.
+ """
aggregated_scores = []
if isinstance(data, dict):
for key, value in data.items():
@@ -79,8 +88,11 @@ def extract_aggregated_scores(data):
aggregated_scores.extend(extract_aggregated_scores(item))
return aggregated_scores
+
def calculate_average_aggregated_score(aggregated_scores):
- """Calculate the average of the aggregated scores."""
+ """
+ Calculate the average of the aggregated scores.
+ """
if aggregated_scores:
scores = [score for _, score in aggregated_scores]
logger.debug("Calculating average of aggregated scores: %s", scores)
@@ -88,8 +100,11 @@ def calculate_average_aggregated_score(aggregated_scores):
logger.debug("No aggregated scores found")
return None
+
def merge_metrics_files(input_file, all_aggregated_scores):
- """Merge metrics files for the given input JSON file."""
+ """
+ Merge metrics files for the given input JSON file.
+ """
base_name = os.path.basename(input_file).replace('.json', '')
pattern = REWRITTEN_DIR / f'{base_name}_metrics_*.json'
@@ -130,8 +145,11 @@ def merge_metrics_files(input_file, all_aggregated_scores):
logger.info("Merged metrics written to %s", output_file_path)
+
def main():
- """Main script execution."""
+ """
+ Main script execution.
+ """
input_files = glob.glob(str(REWRITTEN_DIR / '*_rewritten.json'))
if not input_files:
@@ -144,5 +162,6 @@ def main():
run_evaluation_scripts(input_file, all_aggregated_scores)
merge_metrics_files(input_file, all_aggregated_scores)
+
if __name__ == '__main__':
main()
diff --git a/scheduling.py b/scheduling.py
index 0d791874..92cd95a2 100644
--- a/scheduling.py
+++ b/scheduling.py
@@ -1,15 +1,20 @@
-import schedule
+"""
+Scheduling script for UglyFeed
+"""
+
import time
from datetime import datetime
import threading
import logging
+import schedule
from script_runner import run_script # Import run_script to use for script execution
# Initialize the logger
logger = logging.getLogger(__name__)
-# Using a class to encapsulate the scheduling logic
class UglyFeedScheduler:
+ """Class to encapsulate scheduling logic for UglyFeed."""
+
def __init__(self):
self.job_stats = []
@@ -28,14 +33,14 @@ def run_scripts_sequentially(self, get_new_item_count, get_xml_item_count, st):
else:
output, errors = run_script(script)
- logger.info(f"Output of {script}:\n{output}")
+ logger.info("Output of %s:\n%s", script, output)
if errors.strip() and errors != "No errors":
- logger.error(f"Errors or logs of {script}:\n{errors}")
+ logger.error("Errors or logs of %s:\n%s", script, errors)
if st:
st.text_area(f"Errors of {script}", errors, height=200)
except Exception as e:
- logger.error(f"Failed to execute {script}: {e}")
+ logger.error("Failed to execute %s: %s", script, e)
self.job_stats.append({
'script': script,
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
@@ -68,7 +73,7 @@ def job():
'time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'status': f'Failed with error: {e}'
})
- logger.error(f"Scheduled job failed with error: {e}")
+ logger.error("Scheduled job failed with error: %s", e)
# Scheduling based on the specified period
if period == 'minutes':
@@ -78,7 +83,7 @@ def job():
elif period == 'days':
schedule.every(interval).days.do(job)
else:
- logger.error(f"Unsupported period: {period}")
+ logger.error("Unsupported period: %s", period)
return
while True:
@@ -101,7 +106,7 @@ def start_scheduling(self, interval, period, session_state, get_new_item_count=N
daemon=True
)
scheduling_thread.start()
- logger.info(f"Scheduling started with interval: {interval} {period}")
+ logger.info("Scheduling started with interval: %d %s", interval, period)
else:
logger.info("Scheduling is disabled in the configuration.")
diff --git a/script_runner.py b/script_runner.py
index 4b4c38da..dc895ab7 100644
--- a/script_runner.py
+++ b/script_runner.py
@@ -1,8 +1,20 @@
+"""
+Streamlit script runner for executing Python scripts and capturing their output and errors.
+"""
+
import subprocess
import streamlit as st
-def run_script(script_name):
- """Execute a script and capture its output and errors."""
+def run_script(script_name: str) -> tuple[str, str]:
+ """
+ Execute a script and capture its output and errors.
+
+ Args:
+ script_name (str): The name of the script to execute.
+
+ Returns:
+ tuple: A tuple containing the script output and errors.
+ """
st.write(f"Running {script_name}...")
try:
process = subprocess.run(
@@ -13,10 +25,17 @@ def run_script(script_name):
errors = process.stderr.strip() if process.stderr else "No errors"
return output, errors
except subprocess.CalledProcessError as e:
- return f"Script {script_name} execution failed.\n\n" \
- f"Status: {e.returncode}, Output: {e.stdout}, Errors: {e.stderr}"
+ return (f"Script {script_name} execution failed.\n\n"
+ f"Status: {e.returncode}, Output: {e.stdout}, Errors: {e.stderr}"), "Errors"
+ except subprocess.TimeoutExpired as e:
+ return f"Script {script_name} execution timed out.\n\nErrors: {e.stderr}", "Errors"
+ except Exception as e:
+ return f"An unexpected error occurred while running {script_name}: {e}", "Errors"
def main():
+ """
+ Main function to run the specified script and display its output and errors.
+ """
script_name = "script_runner.py" # Replace with your script name
output, errors = run_script(script_name)
st.code(f"```\n{output}\n```")
diff --git a/server.py b/server.py
index 8f262d5c..7384ebe6 100644
--- a/server.py
+++ b/server.py
@@ -1,3 +1,7 @@
+"""
+This script starts an HTTP server to serve XML files with the correct content type and cache headers.
+"""
+
from http.server import HTTPServer, SimpleHTTPRequestHandler
import threading
import shutil
@@ -10,9 +14,10 @@
server_logger = get_logger('server')
# Define directory paths and filenames
-static_dir = Path(".streamlit") / "static" / "uglyfeeds"
-uglyfeeds_dir = Path("uglyfeeds")
-uglyfeed_file = "uglyfeed.xml" # Retain the original variable name for compatibility
+UGLYFEED_FILE = "uglyfeed.xml" # Define this at the top with other constants
+uglyfeed_file = UGLYFEED_FILE # Alias for UGLYFEED_FILE
+UGLYFEEDS_DIR = Path("uglyfeeds")
+STATIC_DIR = Path(".streamlit") / "static" / "uglyfeeds"
class CustomXMLHandler(SimpleHTTPRequestHandler):
"""Custom HTTP handler to serve XML files with correct content type and cache headers."""
@@ -26,7 +31,7 @@ def do_GET(self):
def _serve_xml_file(self):
"""Serve an XML file with appropriate headers."""
- file_path = static_dir / self.path.lstrip('/')
+ file_path = STATIC_DIR / self.path.lstrip('/')
if file_path.exists() and file_path.is_file():
self.send_response(200)
@@ -36,20 +41,20 @@ def _serve_xml_file(self):
with open(file_path, 'rb') as file:
self.wfile.write(file.read())
- server_logger.info(f"Served XML file: {file_path}")
+ server_logger.info("Served XML file: %s", file_path)
else:
self.send_error(404, "File not found")
- server_logger.warning(f"XML file not found: {file_path}")
+ server_logger.warning("XML file not found: %s", file_path)
def start_http_server(port):
"""Start the HTTP server to serve XML files."""
try:
server_address = ('', port)
httpd = HTTPServer(server_address, CustomXMLHandler)
- server_logger.info(f"Starting server on port {port}")
+ server_logger.info("Starting server on port %d", port)
httpd.serve_forever()
except Exception as e:
- server_logger.error(f"Failed to start server on port {port}: {e}")
+ server_logger.error("Failed to start server on port %d: %s", port, e)
raise
def toggle_server(start, port, session_state):
@@ -58,7 +63,7 @@ def toggle_server(start, port, session_state):
if not session_state.get('server_thread') or not session_state['server_thread'].is_alive():
session_state['server_thread'] = threading.Thread(target=start_http_server, args=(port,), daemon=True)
session_state['server_thread'].start()
- server_logger.info(f"Server started on port {port}.")
+ server_logger.info("Server started on port %d.", port)
else:
server_logger.info("Server is already running.")
else:
@@ -70,14 +75,14 @@ def toggle_server(start, port, session_state):
def copy_xml_to_static():
"""Copy the XML file to the Streamlit static directory if it exists."""
- source_file = uglyfeeds_dir / uglyfeed_file
- destination_file = static_dir / uglyfeed_file
+ source_file = UGLYFEEDS_DIR / UGLYFEED_FILE
+ destination_file = STATIC_DIR / UGLYFEED_FILE
if source_file.exists() and source_file.is_file():
- os.makedirs(static_dir, exist_ok=True)
+ os.makedirs(STATIC_DIR, exist_ok=True)
shutil.copy(source_file, destination_file)
- server_logger.info(f"Copied {uglyfeed_file} to {static_dir}.")
+ server_logger.info("Copied %s to %s.", UGLYFEED_FILE, STATIC_DIR)
return destination_file
else:
- server_logger.warning(f"Source file {uglyfeed_file} does not exist in {uglyfeeds_dir}.")
+ server_logger.warning("Source file %s does not exist in %s.", UGLYFEED_FILE, UGLYFEEDS_DIR)
return None
diff --git a/setup.py b/setup.py
index 4d3035b5..b51dd05b 100644
--- a/setup.py
+++ b/setup.py
@@ -1,3 +1,7 @@
+"""
+Setup script for the uglypy package.
+"""
+
from setuptools import setup, find_packages
# Reading the long description from the README file
@@ -6,7 +10,7 @@
setup(
name='uglypy', # The package name on PyPI
- version='0.0.47', # Initial version, update manually for major changes
+ version='0.0.48', # Version number, update manually for major changes
author='Fabrizio Salmi',
author_email='fabrizio.salmi@gmail.com', # Replace with your email
description='A Python package for aggregating and processing RSS feeds with LLM-enhanced content rewriting.',
@@ -56,5 +60,5 @@
'uglypy=uglypy.cli:main',
],
},
- license='AGPL-3.0',
+ license='AGPL-3.0', # License for the package
)
diff --git a/utils.py b/utils.py
index a1ce4b10..59f0f86d 100644
--- a/utils.py
+++ b/utils.py
@@ -1,10 +1,15 @@
+"""
+This script provides utility functions for handling socket operations and XML file statistics.
+"""
+
import socket
from pathlib import Path
import xml.etree.ElementTree as ET
from datetime import datetime
-uglyfeeds_dir = Path("uglyfeeds")
-uglyfeed_file = "uglyfeed.xml"
+# Define directory paths and filenames
+UGLYFEEDS_DIR = Path("uglyfeeds")
+UGLYFEED_FILE = "uglyfeed.xml"
def get_local_ip():
"""Get the local IP address."""
@@ -29,9 +34,9 @@ def find_available_port(base_port):
def get_xml_item_count():
"""Get the current count of items in the XML."""
- if not (uglyfeeds_dir / uglyfeed_file).exists():
+ if not (UGLYFEEDS_DIR / UGLYFEED_FILE).exists():
return 0
- tree = ET.parse(uglyfeeds_dir / uglyfeed_file)
+ tree = ET.parse(UGLYFEEDS_DIR / UGLYFEED_FILE)
root = tree.getroot()
items = root.findall(".//item")
return len(items)
@@ -45,11 +50,11 @@ def get_new_item_count(old_count):
def get_xml_stats():
"""Get quick stats from the XML file."""
- if not (uglyfeeds_dir / uglyfeed_file).exists():
+ if not (UGLYFEEDS_DIR / UGLYFEED_FILE).exists():
return None, None, None
- tree = ET.parse(uglyfeeds_dir / uglyfeed_file)
+ tree = ET.parse(UGLYFEEDS_DIR / UGLYFEED_FILE)
root = tree.getroot()
items = root.findall(".//item")
item_count = len(items)
last_updated = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- return item_count, last_updated, uglyfeeds_dir / uglyfeed_file
+ return item_count, last_updated, UGLYFEEDS_DIR / UGLYFEED_FILE