-
-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implemented Apple Keychain support with migration procedure
- Loading branch information
Showing
7 changed files
with
300 additions
and
78 deletions.
There are no files selected for viewing
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import os | ||
import subprocess | ||
import re | ||
|
||
from storage.storage_interface import StorageInterface | ||
|
||
class AppleKeychainStorage(StorageInterface): | ||
def __init__(self, name = 'alfred-totp'): | ||
self._name = name | ||
self._file = name + ".keychain" | ||
self._data = {} | ||
|
||
if not self.keychain_exists(): | ||
self.create_storage() | ||
|
||
self._data = self._parse(self._dump()) | ||
|
||
def get_file(self): | ||
return "~/Library/Keychains/" + self._file + "-db" | ||
|
||
def validate_storage(self): | ||
pass | ||
|
||
def is_empty(self): | ||
return not self._data | ||
|
||
def keychain_exists(self): | ||
# The "security list-keychains -d user" only shows keychains from "Keychain Access" app. | ||
return os.path.isfile(self._get_resolved_file()) | ||
|
||
def _get_resolved_file(self): | ||
return os.path.expanduser(self.get_file()) | ||
|
||
def create_storage(self): | ||
if self.keychain_exists(): | ||
return | ||
|
||
self._run_command(["security", "create-keychain", "-P", self._file]) | ||
self._add_to_keychain_access() | ||
|
||
def _add_to_keychain_access(self): | ||
user_keychains = self.get_user_keychains() | ||
user_keychains.append(self._get_resolved_file()) | ||
|
||
# Removes duplicates. | ||
user_keychains = list(set(user_keychains)) | ||
|
||
self._run_command(["security", "list-keychains", "-d", "user", "-s"] + user_keychains) | ||
|
||
def get_user_keychains(self): | ||
results = [] | ||
lines = self._run_command(["security", "list-keychains", "-d", "user"]) | ||
|
||
for line in lines: | ||
line = line.strip('" ') | ||
results.append(line) | ||
|
||
return results | ||
|
||
def get_secret(self, account): | ||
if account in self._data: | ||
return self._data[account] | ||
else: | ||
raise Exception("Service not found") | ||
|
||
def _dump(self): | ||
return self._run_command(["security", "dump-keychain", "-d", self._file]) | ||
|
||
def _parse(self, raw_dump_data): | ||
account = None | ||
capture_secret = False | ||
results = {} | ||
|
||
if not raw_dump_data: | ||
return {} | ||
|
||
for line in raw_dump_data: | ||
line = line.strip() | ||
|
||
regs = re.search(r'"svce"<blob>="([^"]+)"', line) | ||
if regs: | ||
account = regs.group(1) | ||
continue | ||
|
||
if line == "data:" and account: | ||
capture_secret = True | ||
continue | ||
|
||
if capture_secret: | ||
results[account] = line.strip('"') | ||
|
||
# Reset for the next entry | ||
account = None | ||
capture_secret = False | ||
|
||
return results | ||
|
||
def get_accounts(self): | ||
return self._data.keys() | ||
|
||
def add_account(self, account, secret): | ||
if account in self._data: | ||
return False | ||
|
||
try: | ||
subprocess.run( | ||
["security", "add-generic-password", "-a", self._name, "-s", account, "-w", secret, self._file], | ||
check=True | ||
) | ||
|
||
self._data[account] = secret | ||
|
||
return True | ||
except subprocess.CalledProcessError as e: | ||
return False | ||
|
||
def _run_command(self, command): | ||
try: | ||
result = subprocess.run(command, capture_output=True, text=True, check=True) | ||
return result.stdout.splitlines() | ||
except subprocess.CalledProcessError as e: | ||
print(f"Error executing security command: {e}") | ||
return [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
class StorageException(Exception): | ||
def __init__(self, file): | ||
self.file = file |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import os | ||
import configparser | ||
from storage.exceptions import StorageException | ||
from storage.storage_interface import StorageInterface | ||
|
||
|
||
class PlainTextStorage(StorageInterface): | ||
_config_file_initial_content = """ | ||
#Examples of valid configurations: | ||
#[google - [email protected]] | ||
#secret=xxxxxxxxxxxxxxxxxx | ||
# | ||
#[evernote - robert] | ||
#secret=yyyyyyyyyyyyyyyyyy | ||
""" | ||
|
||
def __init__(self, config_file='~/.gauth'): | ||
self._config_file = config_file | ||
self.config_file = os.path.expanduser(self._config_file) | ||
|
||
self.config = configparser.RawConfigParser() | ||
|
||
# If the configuration file doesn't exist, create an empty one | ||
if not os.path.isfile(self.config_file): | ||
self.create_storage() | ||
|
||
def get_file(self): | ||
return self._config_file | ||
|
||
def create_storage(self): | ||
if os.path.isfile(self.config_file): | ||
return | ||
|
||
with open(self.config_file, 'w') as f: | ||
f.write(self._config_file_initial_content) | ||
f.close() | ||
|
||
def validate_storage(self): | ||
try: | ||
self.config.read(self.config_file) | ||
except Exception as e: | ||
raise StorageException(self._config_file) from e | ||
|
||
def is_empty(self): | ||
return not self.config.sections() | ||
|
||
def get_secret(self, account): | ||
try: | ||
return self.config.get(account, 'secret') | ||
except: | ||
raise Exception("Service not found") | ||
|
||
def get_accounts(self): | ||
return self.config.sections() | ||
|
||
def add_account(self, account, secret): | ||
config_file = open(self.config_file, 'r+') | ||
try: | ||
self.config.add_section(account) | ||
self.config.set(account, "secret", secret) | ||
self.config.write(config_file) | ||
except configparser.DuplicateSectionError: | ||
return False | ||
finally: | ||
config_file.close() | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
class StorageInterface: | ||
def get_file(self): | ||
pass | ||
|
||
def create_storage(self): | ||
pass | ||
|
||
def validate_storage(self): | ||
pass | ||
|
||
def is_empty(self): | ||
pass | ||
|
||
def get_secret(self, account): | ||
pass | ||
|
||
def get_accounts(self): | ||
pass | ||
|
||
def add_account(self, account, secret): | ||
pass |
Oops, something went wrong.