Skip to content

Commit

Permalink
Dont overwrite cert
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Nov 27, 2024
1 parent fecee81 commit b7a6c36
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "golem-cert-companion"
version = "0.2.1"
version = "0.2.2"
description = "Certificate companion for Golem"
authors = [{name = "Phillip", email = "[email protected]"}]
dependencies = [
Expand Down
188 changes: 96 additions & 92 deletions src/golem_cert_companion/certificate_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def run_command(self, args, shell=False):
f"{Fore.RED}Error: Command not found: {command[0]}. Is it installed and in your PATH?{Style.RESET_ALL}")
sys.exit(1)

def update_json_file(self, filepath, update_function):
def update_json_file(self, filepath, update_function, overwrite_existing=True):
"""Reads a JSON file, applies an update function, and writes it back."""
try:
# If file doesn't exist, use default template
Expand All @@ -124,6 +124,9 @@ def update_json_file(self, filepath, update_function):
else:
with open(filepath, 'r') as f:
data = json.load(f)
if not overwrite_existing:
# Do not overwrite existing file
return

updated_data = update_function(data)
with open(filepath, 'w') as f:
Expand Down Expand Up @@ -155,23 +158,13 @@ def get_current_value(self, filepath, key_path):

def update_certificate_template(self, data):
# Get user input for name and email
current_name = self.get_current_value(
"root-cert-template.json", ["certificate", "subject", "displayName"])
current_email = self.get_current_value(
"root-cert-template.json", ["certificate", "subject", "contact", "email"])

print(f"\n{Fore.YELLOW}=== Certificate Holder Information ==={Style.RESET_ALL}")
self.display_name = input(
f"{Fore.CYAN}Enter your name{' (press Enter for ' + current_name + ')' if current_name else ''}: {Style.RESET_ALL}").strip()
if not self.display_name and current_name:
self.display_name = current_name
f"{Fore.CYAN}Enter your name: {Style.RESET_ALL}").strip()

while True:
self.email = input(
f"{Fore.CYAN}Enter your email{' (press Enter for ' + current_email + ')' if current_email else ''}: {Style.RESET_ALL}").strip()
if not self.email and current_email:
self.email = current_email
break
f"{Fore.CYAN}Enter your email: {Style.RESET_ALL}").strip()
if '@' in self.email and '.' in self.email: # Basic email validation
break
print(f"{Fore.RED}Invalid email format. Please enter a valid email address.{Style.RESET_ALL}")
Expand Down Expand Up @@ -256,93 +249,103 @@ def update_node_descriptor(self, data):
return data

def main(self):
# Generate Root Key Pair
self.run_command(["create-key-pair", "root"])

# Add $schema and Update Dates in root-cert-template.json
self.update_json_file(
"root-cert-template.json", self.update_certificate_template)

# Configure Outbound Permissions
print(f"\n{Fore.YELLOW}=== Outbound Access Configuration ==={Style.RESET_ALL}")
print("You can either:")
print(
f"{Fore.GREEN}1. Request unrestricted access to all URLs{Style.RESET_ALL} (requires more trust from providers)")
print(
f"{Fore.GREEN}2. Specify a whitelist of allowed URLs{Style.RESET_ALL} (more restrictive, easier to gain provider trust)")

current_outbound = self.get_current_value(
"root-cert-template.json", ["certificate", "permissions", "outbound"])
if current_outbound == "unrestricted":
current_mode = "y"
elif isinstance(current_outbound, dict):
current_mode = "n"
# Generate Root Key Pair if it doesn't exist
if not (os.path.exists("root.key.json") and os.path.exists("root.pub.json")):
self.run_command(["create-key-pair", "root"])
else:
current_mode = None

whitelist_choice = input(
f"\n{Fore.CYAN}Do you want to request unrestricted outbound access to all URLs? (y/n){' (press Enter for ' + ('y' if current_mode == 'y' else 'n') + ')' if current_mode else ''}: {Style.RESET_ALL}").lower()
if not whitelist_choice and current_mode:
whitelist_choice = current_mode

if whitelist_choice == "y":
self.outbound_mode = "unrestricted"

def update_outbound_unrestricted(data):
data["certificate"]["permissions"]["outbound"] = "unrestricted"
return data
print(f"\n{Fore.GREEN}Root key pair already exists, skipping key pair creation.{Style.RESET_ALL}")

# Update or use existing root-cert-template.json
if not os.path.exists("root-cert-template.json"):
# Add $schema and Update Dates in root-cert-template.json
self.update_json_file(
"root-cert-template.json", update_outbound_unrestricted)
else:
self.outbound_mode = "whitelist"
current_urls = self.get_current_value("root-cert-template.json", [
"certificate", "permissions", "outbound", "urls"]) or []
self.urls = []
"root-cert-template.json", self.update_certificate_template)

# Configure Outbound Permissions
print(f"\n{Fore.YELLOW}=== Outbound Access Configuration ==={Style.RESET_ALL}")
print("You can either:")
print(
f"\n{Fore.YELLOW}Enter URLs to whitelist (must include https:// or http://){Style.RESET_ALL}")
print("You must enter at least one URL")

if current_urls:
print(f"\nCurrent URLs:")
for url in current_urls:
print(f" - {url}")
use_current = input(f"{Fore.CYAN}Use current URLs? (y/n): {Style.RESET_ALL}").lower()
if use_current == 'y':
self.urls = current_urls

while not self.urls: # Keep asking until at least one valid URL is entered
url = input(f"{Fore.CYAN}Enter URL (e.g., https://example.com/api): {Style.RESET_ALL}").strip()
if self.validate_url(url):
self.urls.append(url)
else:
print(
f"{Fore.RED}Invalid URL format. Please use format like 'https://example.com'{Style.RESET_ALL}")

# Allow adding additional URLs
print(f"{Fore.YELLOW}Enter additional URLs (or press Enter when done){Style.RESET_ALL}")
while True:
url = input(f"{Fore.CYAN}Enter URL: {Style.RESET_ALL}").strip()
if not url:
break
if self.validate_url(url):
self.urls.append(url)
else:
print(
f"{Fore.RED}Invalid URL format. Please use format like 'https://example.com'{Style.RESET_ALL}")
f"{Fore.GREEN}1. Request unrestricted access to all URLs{Style.RESET_ALL} (requires more trust from providers)")
print(
f"{Fore.GREEN}2. Specify a whitelist of allowed URLs{Style.RESET_ALL} (more restrictive, easier to gain provider trust)")

def update_outbound_urls(data):
data["certificate"]["permissions"]["outbound"] = {
"urls": self.urls}
return data
whitelist_choice = input(
f"\n{Fore.CYAN}Do you want to request unrestricted outbound access to all URLs? (y/n): {Style.RESET_ALL}").lower()

self.update_json_file(
"root-cert-template.json", update_outbound_urls)
if whitelist_choice == "y":
self.outbound_mode = "unrestricted"

# Self-Sign Certificate
self.run_command(
["self-sign-certificate", "root-cert-template.json", "root.key.json"])
def update_outbound_unrestricted(data):
data["certificate"]["permissions"]["outbound"] = "unrestricted"
return data

self.update_json_file(
"root-cert-template.json", update_outbound_unrestricted, overwrite_existing=True)
else:
self.outbound_mode = "whitelist"
self.urls = []

print(
f"\n{Fore.YELLOW}Enter URLs to whitelist (must include https:// or http://){Style.RESET_ALL}")
print("You must enter at least one URL")

while not self.urls: # Keep asking until at least one valid URL is entered
url = input(f"{Fore.CYAN}Enter URL (e.g., https://example.com/api): {Style.RESET_ALL}").strip()
if self.validate_url(url):
self.urls.append(url)
else:
print(
f"{Fore.RED}Invalid URL format. Please use format like 'https://example.com'{Style.RESET_ALL}")

# Allow adding additional URLs
print(f"{Fore.YELLOW}Enter additional URLs (or press Enter when done){Style.RESET_ALL}")
while True:
url = input(f"{Fore.CYAN}Enter URL: {Style.RESET_ALL}").strip()
if not url:
break
if self.validate_url(url):
self.urls.append(url)
else:
print(
f"{Fore.RED}Invalid URL format. Please use format like 'https://example.com'{Style.RESET_ALL}")

def update_outbound_urls(data):
data["certificate"]["permissions"]["outbound"] = {
"urls": self.urls}
return data

self.update_json_file(
"root-cert-template.json", update_outbound_urls, overwrite_existing=True)
else:
print(f"\n{Fore.GREEN}root-cert-template.json already exists, using existing template.{Style.RESET_ALL}")
# Determine outbound mode from existing root-cert-template.json
current_outbound = self.get_current_value(
"root-cert-template.json", ["certificate", "permissions", "outbound"])
if current_outbound == "unrestricted":
self.outbound_mode = "unrestricted"
elif isinstance(current_outbound, dict):
self.outbound_mode = "whitelist"
self.urls = current_outbound.get("urls", [])
else:
print(
f"{Fore.RED}Error determining outbound mode from existing root-cert-template.json.{Style.RESET_ALL}")
sys.exit(1)

# Read the public key data from root.pub.json
try:
with open("root.pub.json", 'r') as f:
self.public_key_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(
f"{Fore.RED}Error reading root.pub.json: {str(e)}{Style.RESET_ALL}")
sys.exit(1)

# Self-Sign Certificate if it doesn't exist
if not os.path.exists("root-cert-template.signed.json"):
self.run_command(
["self-sign-certificate", "root-cert-template.json", "root.key.json"])
else:
print(f"\n{Fore.GREEN}root-cert-template.signed.json already exists, skipping self-signing certificate.{Style.RESET_ALL}")

# Update Node Descriptor
print(f"\n{Fore.YELLOW}=== Node Descriptor Configuration ==={Style.RESET_ALL}")
Expand All @@ -365,6 +368,7 @@ def update_outbound_urls(data):
print(
f"{Fore.RED}Invalid Node ID format. Must be a 42-character Ethereum address starting with '0x'{Style.RESET_ALL}")

# Update node-descriptor.json
self.update_json_file(
"node-descriptor.json", self.update_node_descriptor)

Expand Down

0 comments on commit b7a6c36

Please sign in to comment.