Skip to content

Commit

Permalink
Merge pull request #195 from FBoissadier/feature/add-core-user
Browse files Browse the repository at this point in the history
Add some Core.User api calls and encryption
  • Loading branch information
N4S4 authored Jan 20, 2025
2 parents 0fc1749 + b0f6ea6 commit d4b1df2
Show file tree
Hide file tree
Showing 2 changed files with 689 additions and 5 deletions.
133 changes: 128 additions & 5 deletions synology_api/auth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from random import randint
from typing import Optional
import requests
import json
Expand All @@ -11,6 +12,17 @@
from .exceptions import CertificateError, CloudSyncError, DHCPServerError, DirectoryServerError, DockerError, DriveAdminError
from .exceptions import LogCenterError, NoteStationError, OAUTHError, PhotosError, SecurityAdvisorError, TaskSchedulerError, EventSchedulerError
from .exceptions import UniversalSearchError, USBCopyError, VPNError, CoreSysInfoError, UndefinedError
import hashlib
from os import urandom
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
import base64
import hashlib
import urllib




USE_EXCEPTIONS: bool = True

Expand Down Expand Up @@ -59,9 +71,16 @@ def verify_cert_enabled(self) -> bool:
return self._verify

def login(self, application: str) -> None:
login_api = 'auth.cgi?api=SYNO.API.Auth'
params = {'version': self._version, 'method': 'login', 'account': self._username,
'passwd': self._password, 'session': application, 'format': 'cookie', 'enable_syno_token':'yes'}
login_api = 'auth.cgi'
params = {'api': "SYNO.API.Auth", 'version': self._version, 'method': 'login', 'enable_syno_token':'yes'}

params_enc = {'account': self._username, 'passwd': self._password, 'session': application, 'format': 'cookie'}
if self._secure:
params.update(params_enc)
else:
encrypted_params = self.encrypt_params(params_enc)
params.update(encrypted_params)

if self._otp_code:
params['otp_code'] = self._otp_code
if self._device_id is not None and self._device_name is not None:
Expand All @@ -79,7 +98,7 @@ def login(self, application: str) -> None:
session_request_json: dict[str, object] = {}
if USE_EXCEPTIONS:
try:
session_request = requests.get(self._base_url + login_api, params, verify=self._verify)
session_request = requests.post(self._base_url + login_api, data=params, verify=self._verify)
session_request.raise_for_status()
session_request_json = session_request.json()
except requests.exceptions.ConnectionError as e:
Expand All @@ -90,7 +109,7 @@ def login(self, application: str) -> None:
raise JSONDecodeError(error_message=str(e.args))
else:
# Will raise its own errors:
session_request = requests.get(self._base_url + login_api, params, verify=self._verify)
session_request = requests.post(self._base_url + login_api, data=params, verify=self._verify)
session_request_json = session_request.json()

# Check dsm response for error:
Expand Down Expand Up @@ -195,6 +214,71 @@ def search_by_app(self, app: str) -> None:
if print_check == 0:
print('Not Found')
return

def _random_AES_passphrase(self, length):
available = ('0123456789'
'abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'~!@#$%^&*()_+-/')
key = b''

while length > 0:
key += available[randint(0, len(available) - 1)].encode('utf-8')
length -= 1

return key

def _get_enc_info(self):
api_name = 'SYNO.API.Encryption'
req_params = {
"method": "getinfo",
"version": 1,
"format": "module"
}
response = self.request_data(api_name, "encryption.cgi", req_params)
return response["data"]

def _encrypt_RSA(self, modulus, passphrase, text):
public_numbers = rsa.RSAPublicNumbers(passphrase, modulus)
public_key = public_numbers.public_key(default_backend())

if isinstance(text, str):
text = text.encode('utf-8')

ciphertext = public_key.encrypt(
text,
padding.PKCS1v15()
)
return ciphertext

def _encrypt_AES(self, passphrase, text):
cipher = AESCipher(passphrase)

return cipher.encrypt(text)

def encrypt_params(self, params):
enc_info = self._get_enc_info()
public_key = enc_info["public_key"]
cipher_key = enc_info["cipherkey"]
cipher_token = enc_info["ciphertoken"]
server_time = enc_info["server_time"]
random_passphrase = self._random_AES_passphrase(501)

params[cipher_token] = server_time

encrypted_passphrase = self._encrypt_RSA(int(public_key, 16),
int("10001", 16),
random_passphrase)

encrypted_params = self._encrypt_AES(random_passphrase,
urllib.parse.urlencode(params))

enc_params = {
"rsa": base64.b64encode(encrypted_passphrase).decode("utf-8"),
"aes": base64.b64encode(encrypted_params).decode("utf-8")
}

return {cipher_key: json.dumps(enc_params)}

def request_multi_datas(self,
compound: dict[object] = None,
Expand Down Expand Up @@ -424,3 +508,42 @@ def sid(self) -> Optional[str]:
@property
def base_url(self) -> str:
return self._base_url



class AESCipher(object):
"""Encrypt with OpenSSL-compatible way"""

SALT_MAGIC = b'Salted__'

def __init__(self, password, key_length=32):
self._bs = 16
self._salt = urandom(self._bs - len(self.SALT_MAGIC))

self._key, self._iv = self._derive_key_and_iv(password,
self._salt,
key_length,
self._bs)

def _pad(self, s):
bs = self._bs
return (s + (bs - len(s) % bs) * chr(bs - len(s) % bs)).encode('utf-8')

def _derive_key_and_iv(self, password, salt, key_length, iv_length):
d = d_i = b''
while len(d) < key_length + iv_length:
md5_str = d_i + password + salt
d_i = hashlib.md5(md5_str).digest()
d += d_i
return d[:key_length], d[key_length:key_length + iv_length]

def encrypt(self, text):
cipher = Cipher(
algorithms.AES(self._key),
modes.CBC(self._iv),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(self._pad(text)) + encryptor.finalize()

return self.SALT_MAGIC + self._salt + ciphertext
Loading

0 comments on commit d4b1df2

Please sign in to comment.