Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Apple Keychain support with migration procedure #44

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified Google Authenticator.alfredworkflow
Binary file not shown.
12 changes: 4 additions & 8 deletions src/otp.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ def is_otp_secret_valid(secret):
return True


def get_hotp_key(key=None, secret=None, hexkey=None):
if hexkey:
key = hexkey.decode('hex')
if secret:
secret = secret.replace(' ', '')
secret = pad_base32_str(secret, '=')
key = base64.b32decode(secret, casefold=True)
return key
Comment on lines -50 to -57
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed branches of logic that aren't used:

  • only the hexkey argument given;
  • only the key argument given.

def get_hotp_key(secret):
secret = secret.replace(' ', '')
secret = pad_base32_str(secret, '=')
return base64.b32decode(secret, casefold=True)
123 changes: 123 additions & 0 deletions src/storage/apple_keychain_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
import subprocess
import re

from storage.storage_interface import StorageInterface

class AppleKeychainStorage(StorageInterface):
def __init__(self, name = 'alfred-totp'):
self._name = name
self._file = name + ".keychain"
self._data = {}

if not self.keychain_exists():
self.create_storage()

self._data = self._parse(self._dump())

def get_file(self):
return "~/Library/Keychains/" + self._file + "-db"

def validate_storage(self):
pass

def is_empty(self):
return not self._data

def keychain_exists(self):
# The "security list-keychains -d user" only shows keychains from "Keychain Access" app.
return os.path.isfile(self._get_resolved_file())

def _get_resolved_file(self):
return os.path.expanduser(self.get_file())

def create_storage(self):
if self.keychain_exists():
return

self._run_command(["security", "create-keychain", "-P", self._file])
self._add_to_keychain_access()

def _add_to_keychain_access(self):
user_keychains = self.get_user_keychains()
user_keychains.append(self._get_resolved_file())

# Removes duplicates.
user_keychains = list(set(user_keychains))

self._run_command(["security", "list-keychains", "-d", "user", "-s"] + user_keychains)

def get_user_keychains(self):
results = []
lines = self._run_command(["security", "list-keychains", "-d", "user"])

for line in lines:
line = line.strip('" ')
results.append(line)

return results

def get_secret(self, account):
if account in self._data:
return self._data[account]
else:
raise Exception("Service not found")

def _dump(self):
return self._run_command(["security", "dump-keychain", "-d", self._file])

def _parse(self, raw_dump_data):
account = None
capture_secret = False
results = {}

if not raw_dump_data:
return {}

for line in raw_dump_data:
line = line.strip()

regs = re.search(r'"svce"<blob>="([^"]+)"', line)
if regs:
account = regs.group(1)
continue

if line == "data:" and account:
capture_secret = True
continue

if capture_secret:
results[account] = line.strip('"')

# Reset for the next entry
account = None
capture_secret = False

return results

def get_accounts(self):
return self._data.keys()

def add_account(self, account, secret):
if account in self._data:
return False

try:
subprocess.run(
["security", "add-generic-password", "-a", self._name, "-s", account, "-w", secret, self._file],
check=True
)

self._data[account] = secret

return True
except subprocess.CalledProcessError as e:
return False

def _run_command(self, command):
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
return result.stdout.splitlines()
except subprocess.CalledProcessError as e:
print(f"Error executing security command: {e}")
return []
3 changes: 3 additions & 0 deletions src/storage/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class StorageException(Exception):
def __init__(self, file):
self.file = file
67 changes: 67 additions & 0 deletions src/storage/plain_text_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import configparser
from storage.exceptions import StorageException
from storage.storage_interface import StorageInterface


class PlainTextStorage(StorageInterface):
_config_file_initial_content = """
#Examples of valid configurations:
#[google - [email protected]]
#secret=xxxxxxxxxxxxxxxxxx
#
#[evernote - robert]
#secret=yyyyyyyyyyyyyyyyyy
"""

def __init__(self, config_file='~/.gauth'):
self._config_file = config_file
self.config_file = os.path.expanduser(self._config_file)

self.config = configparser.RawConfigParser()

# If the configuration file doesn't exist, create an empty one
if not os.path.isfile(self.config_file):
self.create_storage()

def get_file(self):
return self._config_file

def create_storage(self):
if os.path.isfile(self.config_file):
return

with open(self.config_file, 'w') as f:
f.write(self._config_file_initial_content)
f.close()

def validate_storage(self):
try:
self.config.read(self.config_file)
except Exception as e:
raise StorageException(self._config_file) from e

def is_empty(self):
return not self.config.sections()

def get_secret(self, account):
try:
return self.config.get(account, 'secret')
except:
raise Exception("Service not found")

def get_accounts(self):
return self.config.sections()

def add_account(self, account, secret):
config_file = open(self.config_file, 'r+')
try:
self.config.add_section(account)
self.config.set(account, "secret", secret)
self.config.write(config_file)
except configparser.DuplicateSectionError:
return False
finally:
config_file.close()

return True
21 changes: 21 additions & 0 deletions src/storage/storage_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class StorageInterface:
def get_file(self):
pass

def create_storage(self):
pass

def validate_storage(self):
pass

def is_empty(self):
pass

def get_secret(self, account):
pass

def get_accounts(self):
pass

def add_account(self, account, secret):
pass
Loading