diff --git a/bimmer_connected/account.py b/bimmer_connected/account.py index a00a543f..33d8bdeb 100644 --- a/bimmer_connected/account.py +++ b/bimmer_connected/account.py @@ -1,6 +1,5 @@ """Access to a MyBMW account and all vehicles therein.""" -import asyncio import datetime import logging import pathlib @@ -20,7 +19,6 @@ VALID_UNTIL_OFFSET = datetime.timedelta(seconds=10) _LOGGER = logging.getLogger(__name__) -# lock = asyncio.Lock() @dataclass @@ -64,18 +62,14 @@ async def get_vehicles(self) -> None: "appDateTime": int(datetime.datetime.now().timestamp() * 1000), "tireGuardMode": "ENABLED", } - vehicles_tasks: List[asyncio.Task] = [] - for brand in CarBrands: - vehicles_tasks.append( - asyncio.ensure_future( - client.get( - VEHICLES_URL, - params=vehicles_request_params, - headers=client.generate_default_header(brand), - ) - ) + vehicles_responses: List[httpx.Response] = [ + await client.get( + VEHICLES_URL, + params=vehicles_request_params, + headers=client.generate_default_header(brand), ) - vehicles_responses: List[httpx.Response] = await asyncio.gather(*vehicles_tasks) + for brand in CarBrands + ] for response in vehicles_responses: for vehicle_dict in response.json(): diff --git a/bimmer_connected/api/authentication.py b/bimmer_connected/api/authentication.py index 738eac0e..14505485 100644 --- a/bimmer_connected/api/authentication.py +++ b/bimmer_connected/api/authentication.py @@ -4,8 +4,10 @@ import base64 import datetime import logging -from dataclasses import dataclass, field -from typing import Optional +import math +from collections import defaultdict +from typing import AsyncGenerator, Generator, Optional +from uuid import uuid4 import httpx import jwt @@ -17,108 +19,115 @@ from bimmer_connected.api.utils import ( create_s256_code_challenge, generate_token, + get_correlation_id, handle_http_status_error, - raise_for_status_event_handler, ) from bimmer_connected.const import ( AUTH_CHINA_LOGIN_URL, AUTH_CHINA_PUBLIC_KEY_URL, AUTH_CHINA_TOKEN_URL, + HTTPX_TIMEOUT, OAUTH_CONFIG_URL, USER_AGENT, X_USER_AGENT, ) -EXPIRES_AT_OFFSET = datetime.timedelta(seconds=10) +EXPIRES_AT_OFFSET = datetime.timedelta(seconds=HTTPX_TIMEOUT * 2) _LOGGER = logging.getLogger(__name__) -@dataclass -class Authentication: - """Base class for Authentication.""" - - username: str - password: str - region: Regions - token: Optional[str] = None - expires_at: Optional[datetime.datetime] = None - refresh_token: Optional[str] = None - - async def login(self) -> None: - """Get a valid OAuth token.""" - raise NotImplementedError("Not implemented in Authentication base class.") - - async def get_authentication(self) -> str: - """Returns a valid Bearer token.""" - if not self.is_token_valid: - await self.login() - return f"Bearer {self.token}" - - @property - def is_token_valid(self) -> bool: - """Check if current token is still valid.""" - if self.token and self.expires_at and datetime.datetime.utcnow() < self.expires_at: - _LOGGER.debug("Old token is still valid. Not getting a new one.") - return True - return False - - -@dataclass -class MyBMWAuthentication(Authentication): +class MyBMWAuthentication(httpx.Auth): """Authentication for MyBMW API.""" - _lock: asyncio.Lock = field(default_factory=asyncio.Lock) + # pylint: disable=too-many-arguments,too-many-instance-attributes + def __init__( + self, + username: str, + password: str, + region: Regions, + access_token: Optional[str] = None, + expires_at: Optional[datetime.datetime] = None, + refresh_token: Optional[str] = None, + ): + self.username: str = username + self.password: str = password + self.region: Regions = region + self.access_token: Optional[str] = access_token + self.expires_at: Optional[datetime.datetime] = expires_at + self.refresh_token: Optional[str] = refresh_token + self.session_id: str = str(uuid4()) + self._lock: Optional[asyncio.Lock] = None - def _create_or_update_lock(self): + @property + def login_lock(self) -> asyncio.Lock: """Makes sure that there is a lock in the current event loop.""" - loop: asyncio.BaseEventLoop = self._lock._loop # pylint: disable=protected-access - if loop != asyncio.get_event_loop(): + if not self._lock: self._lock = asyncio.Lock() + return self._lock + + def sync_auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]: + raise RuntimeError("Cannot use a async authentication class with httpx.Client") + + async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]: + # Get an access token on first call + async with self.login_lock: + if not self.access_token: + await self.login() + request.headers["authorization"] = f"Bearer {self.access_token}" + request.headers["bmw-session-id"] = self.session_id + + # Try getting a response + response: httpx.Response = (yield request) + + if response.status_code == 401: + async with self.login_lock: + _LOGGER.debug("Received unauthorized response, refreshing token.") + await self.login() + request.headers["authorization"] = f"Bearer {self.access_token}" + request.headers["bmw-session-id"] = self.session_id + yield request async def login(self) -> None: """Get a valid OAuth token.""" - self._create_or_update_lock() - async with self._lock: - if self.is_token_valid: - return - - token_data = {} - if self.region in [Regions.NORTH_AMERICA, Regions.REST_OF_WORLD]: - # Try logging in with refresh token first - if self.refresh_token: - token_data = await self._refresh_token_row_na() - if not token_data: - token_data = await self._login_row_na() - - elif self.region in [Regions.CHINA]: - # Try logging in with refresh token first - if self.refresh_token: - token_data = await self._refresh_token_china() - if not token_data: - token_data = await self._login_china() - - self.token = token_data["access_token"] - self.expires_at = token_data["expires_at"] - EXPIRES_AT_OFFSET - self.refresh_token = token_data["refresh_token"] + token_data = {} + if self.region in [Regions.NORTH_AMERICA, Regions.REST_OF_WORLD]: + # Try logging in with refresh token first + if self.refresh_token: + token_data = await self._refresh_token_row_na() + if not token_data: + # clear refresh token as precaution + self.refresh_token = None + token_data = await self._login_row_na() + token_data["expires_at"] = token_data["expires_at"] - EXPIRES_AT_OFFSET + + elif self.region in [Regions.CHINA]: + # Try logging in with refresh token first + if self.refresh_token: + token_data = await self._refresh_token_china() + if not token_data: + # clear refresh token as precaution + self.refresh_token = None + token_data = await self._login_china() + token_data["expires_at"] = token_data["expires_at"] - EXPIRES_AT_OFFSET + + self.access_token = token_data["access_token"] + self.expires_at = token_data["expires_at"] + self.refresh_token = token_data["refresh_token"] async def _login_row_na(self): # pylint: disable=too-many-locals """Login to Rest of World and North America.""" try: - async with httpx.AsyncClient( - base_url=get_server_url(self.region), headers={"user-agent": USER_AGENT} - ) as client: + async with MyBMWLoginClient(region=self.region) as client: _LOGGER.debug("Authenticating with MyBMW flow for North America & Rest of World.") - # Attach raise_for_status event hook - client.event_hooks["response"].append(raise_for_status_event_handler) - # Get OAuth2 settings from BMW API r_oauth_settings = await client.get( OAUTH_CONFIG_URL, headers={ "ocp-apim-subscription-key": get_ocp_apim_key(self.region), - "x-user-agent": X_USER_AGENT.format("bmw"), + "bmw-session-id": self.session_id, + **get_correlation_id(), }, ) oauth_settings = r_oauth_settings.json() @@ -192,20 +201,16 @@ async def _login_row_na(self): # pylint: disable=too-many-locals async def _refresh_token_row_na(self): """Login to Rest of World and North America using existing refresh_token.""" try: - async with httpx.AsyncClient( - base_url=get_server_url(self.region), headers={"user-agent": USER_AGENT} - ) as client: - _LOGGER.debug("Authenticating with refresh_token flow for North America & Rest of World.") - - # Attach raise_for_status event hook - client.event_hooks["response"].append(raise_for_status_event_handler) + async with MyBMWLoginClient(region=self.region) as client: + _LOGGER.debug("Authenticating with refresh token for North America & Rest of World.") # Get OAuth2 settings from BMW API r_oauth_settings = await client.get( OAUTH_CONFIG_URL, headers={ "ocp-apim-subscription-key": get_ocp_apim_key(self.region), - "x-user-agent": X_USER_AGENT.format("bmw"), + "bmw-session-id": self.session_id, + **get_correlation_id(), }, ) oauth_settings = r_oauth_settings.json() @@ -227,8 +232,9 @@ async def _refresh_token_row_na(self): expiration_time = int(response_json["expires_in"]) expires_at = current_utc_time + datetime.timedelta(seconds=expiration_time) - except httpx.HTTPStatusError: + except httpx.HTTPStatusError as ex: _LOGGER.debug("Unable to get access token using refresh token.") + handle_http_status_error(ex, "Authentication", _LOGGER, debug=True) return {} return { @@ -239,20 +245,12 @@ async def _refresh_token_row_na(self): async def _login_china(self): try: - async with httpx.AsyncClient( - base_url=get_server_url(self.region), headers={"user-agent": USER_AGENT} - ) as client: + async with MyBMWLoginClient(region=self.region) as client: _LOGGER.debug("Authenticating with MyBMW flow for China.") - # Attach raise_for_status event hook - client.event_hooks["response"].append(raise_for_status_event_handler) - - login_header = {"x-user-agent": X_USER_AGENT.format("bmw")} - # Get current RSA public certificate & use it to encrypt password response = await client.get( AUTH_CHINA_PUBLIC_KEY_URL, - headers=login_header, ) pem_public_key = response.json()["data"]["value"] @@ -264,11 +262,11 @@ async def _login_china(self): cipher_aes = AES.new(**get_aes_keys(self.region), mode=AES.MODE_CBC) nonce = f"{self.username}|{datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')}".encode() - login_header["x-login-nonce"] = base64.b64encode(cipher_aes.encrypt(pad(nonce, 16))).decode() - # Get token response = await client.post( - AUTH_CHINA_LOGIN_URL, headers=login_header, json={"mobile": self.username, "password": pw_encrypted} + AUTH_CHINA_LOGIN_URL, + headers={"x-login-nonce": base64.b64encode(cipher_aes.encrypt(pad(nonce, 16))).decode()}, + json={"mobile": self.username, "password": pw_encrypted}, ) response_json = response.json()["data"] @@ -287,21 +285,14 @@ async def _login_china(self): async def _refresh_token_china(self): try: - async with httpx.AsyncClient( - base_url=get_server_url(self.region), headers={"user-agent": USER_AGENT} - ) as client: + async with MyBMWLoginClient(region=self.region) as client: _LOGGER.debug("Authenticating with refresh token for China.") - # Attach raise_for_status event hook - client.event_hooks["response"].append(raise_for_status_event_handler) - - login_header = {"x-user-agent": X_USER_AGENT.format("bmw")} current_utc_time = datetime.datetime.utcnow() # Try logging in using refresh_token response = await client.post( AUTH_CHINA_TOKEN_URL, - headers=login_header, data={ "refresh_token": self.refresh_token, "grant_type": "refresh_token", @@ -312,8 +303,9 @@ async def _refresh_token_china(self): expiration_time = int(response_json["expires_in"]) expires_at = current_utc_time + datetime.timedelta(seconds=expiration_time) - except httpx.HTTPStatusError: + except httpx.HTTPStatusError as ex: _LOGGER.debug("Unable to get access token using refresh token.") + handle_http_status_error(ex, "Authentication", _LOGGER, debug=True) return {} return { @@ -321,3 +313,58 @@ async def _refresh_token_china(self): "expires_at": expires_at, "refresh_token": response_json["refresh_token"], } + + +class MyBMWLoginClient(httpx.AsyncClient): + """Async HTTP client based on `httpx.AsyncClient` with automated OAuth token refresh.""" + + def __init__(self, *args, **kwargs): + # Increase timeout + kwargs["timeout"] = httpx.Timeout(HTTPX_TIMEOUT) + + kwargs["auth"] = MyBMWLoginRetry() + + # Set default values + kwargs["base_url"] = get_server_url(kwargs.pop("region")) + kwargs["headers"] = {"user-agent": USER_AGENT, "x-user-agent": X_USER_AGENT.format("bmw")} + + # Register event hooks + kwargs["event_hooks"] = defaultdict(list, **kwargs.get("event_hooks", {})) + + # Event hook which calls raise_for_status on all requests + async def raise_for_status_event_handler(response: httpx.Response): + """Event handler that automatically raises HTTPStatusErrors when attached. + + Will only raise on 4xx/5xx errors (but not on 429) and not raise on 3xx. + """ + if response.is_error and not response.status_code == 429: + await response.aread() + response.raise_for_status() + + kwargs["event_hooks"]["response"].append(raise_for_status_event_handler) + + super().__init__(*args, **kwargs) + + +class MyBMWLoginRetry(httpx.Auth): + """httpx.Auth used as workaround to retry & sleep on 429 Too Many Requests.""" + + def sync_auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]: + raise RuntimeError("Cannot use a async authentication class with httpx.Client") + + async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]: + # Try getting a response + response: httpx.Response = (yield request) + + for _ in range(5): + if response.status_code == 429: + await response.aread() + wait_time = math.ceil( + next(iter([int(i) for i in response.json().get("message", "") if i.isdigit()]), 2) * 1.25 + ) + _LOGGER.debug("Sleeping %s seconds due to 429 Too Many Requests", wait_time) + await asyncio.sleep(wait_time) + response = yield request + if response.status_code == 429: + await response.aread() + response.raise_for_status() diff --git a/bimmer_connected/api/client.py b/bimmer_connected/api/client.py index a35b9906..dad93c55 100644 --- a/bimmer_connected/api/client.py +++ b/bimmer_connected/api/client.py @@ -9,8 +9,8 @@ from bimmer_connected.api.authentication import MyBMWAuthentication from bimmer_connected.api.regions import get_server_url -from bimmer_connected.api.utils import log_to_to_file, raise_for_status_event_handler -from bimmer_connected.const import USER_AGENT, X_USER_AGENT, CarBrands +from bimmer_connected.api.utils import get_correlation_id, log_to_to_file +from bimmer_connected.const import HTTPX_TIMEOUT, USER_AGENT, X_USER_AGENT, CarBrands @dataclass @@ -27,8 +27,11 @@ class MyBMWClient(httpx.AsyncClient): def __init__(self, config: MyBMWClientConfiguration, *args, brand: CarBrands = None, **kwargs): self.config = config + # Add authentication + kwargs["auth"] = self.config.authentication + # Increase timeout - kwargs["timeout"] = httpx.Timeout(10.0) + kwargs["timeout"] = httpx.Timeout(HTTPX_TIMEOUT) # Set default values kwargs["base_url"] = kwargs.get("base_url") or get_server_url(config.authentication.region) @@ -37,12 +40,6 @@ def __init__(self, config: MyBMWClientConfiguration, *args, brand: CarBrands = N # Register event hooks kwargs["event_hooks"] = defaultdict(list, **kwargs.get("event_hooks", {})) - # Event hook which checks and updates token if required before request is sent - async def update_request_header(request: httpx.Request): - request.headers["authorization"] = await self.config.authentication.get_authentication() - - kwargs["event_hooks"]["request"].append(update_request_header) - # Event hook for logging content to file async def log_response(response: httpx.Response): content = await response.aread() @@ -54,6 +51,15 @@ async def log_response(response: httpx.Response): kwargs["event_hooks"]["response"].append(log_response) # Event hook which calls raise_for_status on all requests + async def raise_for_status_event_handler(response: httpx.Response): + """Event handler that automatically raises HTTPStatusErrors when attached. + + Will only raise on 4xx/5xx errors (but not 401!) and not raise on 3xx. + """ + if response.is_error and response.status_code != 401: + await response.aread() + response.raise_for_status() + kwargs["event_hooks"]["response"].append(raise_for_status_event_handler) super().__init__(*args, **kwargs) @@ -66,4 +72,5 @@ def generate_default_header(brand: CarBrands = None) -> Dict[str, str]: "accept-language": "en", "user-agent": USER_AGENT, "x-user-agent": X_USER_AGENT.format(brand or CarBrands.BMW), + **get_correlation_id(), } diff --git a/bimmer_connected/api/utils.py b/bimmer_connected/api/utils.py index b2b99fe7..bf95c11b 100644 --- a/bimmer_connected/api/utils.py +++ b/bimmer_connected/api/utils.py @@ -8,6 +8,7 @@ import random import string from typing import Dict, List, Union +from uuid import uuid4 import httpx @@ -26,27 +27,29 @@ def create_s256_code_challenge(code_verifier: str) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode("UTF-8") -async def raise_for_status_event_handler(response: httpx.Response): - """Event handler that automatically raises HTTPStatusErrors when attached. - - Will only raise on 4xx/5xx errors and not raise on 3xx. - """ - if response.is_error: - await response.aread() - response.raise_for_status() +def get_correlation_id() -> Dict[str, str]: + """Generate corrlation headers.""" + correlation_id = str(uuid4()) + return { + "x-identity-provider": "gcdm", + "x-correlation-id": correlation_id, + "bmw-correlation-id": correlation_id, + } def handle_http_status_error( - ex: httpx.HTTPStatusError, module: str = "MyBMW API", log_handler: logging.Logger = None + ex: httpx.HTTPStatusError, module: str = "MyBMW API", log_handler: logging.Logger = None, debug: bool = False ) -> None: """Try to extract information from response and re-raise Exception.""" _logger = log_handler or logging.getLogger(__name__) + _level = logging.DEBUG if debug else logging.ERROR try: err = ex.response.json() - _logger.error("%s error (%s): %s", module, err["error"], err["error_description"]) + _logger.log(_level, "%s error (%s): %s", module, err["error"], err["error_description"]) except (json.JSONDecodeError, KeyError): - _logger.error("%s error: %s", module, ex.response.text) - raise ex + _logger.log(_level, "%s error: %s", module, ex.response.text) + if not debug: + raise ex def anonymize_data(json_data: Union[List, Dict]) -> Union[List, Dict]: diff --git a/bimmer_connected/const.py b/bimmer_connected/const.py index 186a50c0..58b958ca 100644 --- a/bimmer_connected/const.py +++ b/bimmer_connected/const.py @@ -42,6 +42,8 @@ class Regions(Enum): } } +HTTPX_TIMEOUT = 30.0 + USER_AGENT = "Dart/2.13 (dart:io)" X_USER_AGENT = "android(v1.07_20200330);{};2.3.0(13603)" diff --git a/bimmer_connected/vehicle/fuel_and_battery.py b/bimmer_connected/vehicle/fuel_and_battery.py index c5a5b08d..065fc5a5 100644 --- a/bimmer_connected/vehicle/fuel_and_battery.py +++ b/bimmer_connected/vehicle/fuel_and_battery.py @@ -84,10 +84,11 @@ def _parse_vehicle_data(cls, vehicle_data: Dict) -> Optional[Dict]: properties = vehicle_data.get("properties", {}) fuel_level = properties.get("fuelLevel", {}) - retval["remaining_fuel"] = ValueWithUnit( - fuel_level.get("value"), - fuel_level.get("units"), - ) + if fuel_level: + retval["remaining_fuel"] = ValueWithUnit( + fuel_level.get("value"), + fuel_level.get("units"), + ) if properties.get("fuelPercentage", {}).get("value"): retval["remaining_fuel_percent"] = int(properties.get("fuelPercentage", {}).get("value")) @@ -97,7 +98,7 @@ def _parse_vehicle_data(cls, vehicle_data: Dict) -> Optional[Dict]: retval["is_charger_connected"] = properties["chargingState"].get("isChargerConnected", False) # Only parse ranges if vehicle has enabled LSC - if vehicle_data["capabilities"]["lastStateCall"]["lscState"] == "ACTIVATED": + if "capabilities" in vehicle_data and vehicle_data["capabilities"]["lastStateCall"]["lscState"] == "ACTIVATED": fuel_indicators = vehicle_data.get("status", {}).get("fuelIndicators", []) for indicator in fuel_indicators: if (indicator.get("rangeIconId") or indicator.get("infoIconId")) == 59691: # Combined diff --git a/bimmer_connected/vehicle/location.py b/bimmer_connected/vehicle/location.py index e848ad0e..3802cfb1 100644 --- a/bimmer_connected/vehicle/location.py +++ b/bimmer_connected/vehicle/location.py @@ -32,7 +32,7 @@ class VehicleLocation(VehicleDataBase): def from_vehicle_data(cls, vehicle_data: Dict): """Creates the class based on vehicle data from API.""" parsed = cls._parse_vehicle_data(vehicle_data) or {} - if len(parsed) > 0: + if len(parsed) > 1: # must be greater than 1 due to timestamp dummy return cls(**parsed) return None diff --git a/bimmer_connected/vehicle/models.py b/bimmer_connected/vehicle/models.py index e0a7d8af..62f504c6 100644 --- a/bimmer_connected/vehicle/models.py +++ b/bimmer_connected/vehicle/models.py @@ -78,6 +78,8 @@ def __eq__(self, other): return tuple(self.__iter__()) == other if hasattr(self, "__dict__") and hasattr(other, "__dict__"): return self.__dict__ == other.__dict__ + if hasattr(self, "__dict__") and isinstance(other, Dict): + return self.__dict__ == other return False @@ -113,19 +115,6 @@ def __post_init__(self, lat, lon, street, postal_code, city, country): # pylint self.locationAddress = PointOfInterestAddress(street, postal_code, city, country) -def check_strict_types(cls): - """Checks a dataclass for strict typing. Use in __post_init__.""" - for field_name, field_def in cls.__dataclass_fields__.items(): # pylint: disable=no-member - try: - original_type = field_def.type.__args__ - field_type = original_type or field_def.type - except AttributeError: - field_type = field_def.type - - if not isinstance(getattr(cls, field_name), field_type): - raise TypeError(f"'{field_name}' not of type '{field_def.type}'") - - class ValueWithUnit(NamedTuple): """A value with a corresponding unit.""" diff --git a/test/test_account.py b/test/test_account.py index 401b4c6b..f48bf297 100644 --- a/test/test_account.py +++ b/test/test_account.py @@ -1,16 +1,24 @@ """Tests for MyBMWAccount.""" import datetime +import logging from typing import Dict, List -from unittest import mock + +try: + from unittest import mock + + if not hasattr(mock, "AsyncMock"): + # AsyncMock was only introduced with Python3.8, so we have to use the backported module + raise ImportError() +except ImportError: + import mock # type: ignore[import,no-redef] import httpx import pytest import respx from bimmer_connected.account import ConnectedDriveAccount, MyBMWAccount -from bimmer_connected.api.authentication import Authentication -from bimmer_connected.api.client import MyBMWClientConfiguration +from bimmer_connected.api.authentication import MyBMWAuthentication, MyBMWLoginRetry from bimmer_connected.api.regions import get_region_from_name from bimmer_connected.vehicle.models import GPSPosition @@ -107,7 +115,7 @@ async def test_login_row_na(): @account_mock() @pytest.mark.asyncio -async def test_login_refresh_token_row_na(): +async def test_login_refresh_token_row_na_expired(): """Test the login flow using refresh_token.""" with mock.patch("bimmer_connected.api.authentication.EXPIRES_AT_OFFSET", datetime.timedelta(seconds=30000)): account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name(TEST_REGION_STRING)) @@ -120,10 +128,55 @@ async def test_login_refresh_token_row_na(): mock_listener.reset_mock() await account.get_vehicles() - assert mock_listener.call_count == 2 + # Should not be called at all, as expiry date is not checked anymore + assert mock_listener.call_count == 0 + assert account.mybmw_client_config.authentication.refresh_token is not None + + +@pytest.mark.asyncio +async def test_login_refresh_token_row_na_401(): + """Test the login flow using refresh_token.""" + with account_mock() as mock_api: + account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name(TEST_REGION_STRING)) + await account.get_vehicles() + + with mock.patch( + "bimmer_connected.api.authentication.MyBMWAuthentication._refresh_token_row_na", + wraps=account.mybmw_client_config.authentication._refresh_token_row_na, # pylint: disable=protected-access + ) as mock_listener: + mock_api.get("/eadrax-vcs/v1/vehicles").mock( + side_effect=[httpx.Response(401), *([httpx.Response(200, json=[])] * 10)] + ) + mock_listener.reset_mock() + await account.get_vehicles() + + assert mock_listener.call_count == 1 assert account.mybmw_client_config.authentication.refresh_token is not None +@pytest.mark.asyncio +async def test_login_refresh_token_row_na_invalid(caplog): + """Test the login flow using refresh_token.""" + with account_mock() as mock_api: + mock_api.post("/gcdm/oauth/token").mock( + side_effect=[ + httpx.Response(400), + httpx.Response(200, json=load_response(RESPONSE_DIR / "auth" / "auth_token.json")), + ] + ) + + account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name(TEST_REGION_STRING)) + account.set_refresh_token("INVALID") + + caplog.set_level(logging.DEBUG) + await account.get_vehicles() + + debug_messages = [r.message for r in caplog.records if r.name.startswith("bimmer_connected")] + assert "Authenticating with refresh token for North America & Rest of World." in debug_messages + assert "Unable to get access token using refresh token." in debug_messages + assert "Authenticating with MyBMW flow for North America & Rest of World." in debug_messages + + @account_mock() @pytest.mark.asyncio async def test_login_china(): @@ -135,7 +188,7 @@ async def test_login_china(): @account_mock() @pytest.mark.asyncio -async def test_login_refresh_token_china(): +async def test_login_refresh_token_china_expired(): """Test the login flow using refresh_token for region `china`.""" with mock.patch("bimmer_connected.api.authentication.EXPIRES_AT_OFFSET", datetime.timedelta(seconds=30000)): account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name("china")) @@ -148,10 +201,55 @@ async def test_login_refresh_token_china(): mock_listener.reset_mock() await account.get_vehicles() - assert mock_listener.call_count == 2 + # Should not be called at all, as expiry date is not checked anymore + assert mock_listener.call_count == 0 assert account.mybmw_client_config.authentication.refresh_token is not None +@pytest.mark.asyncio +async def test_login_refresh_token_china_401(): + """Test the login flow using refresh_token.""" + with account_mock() as mock_api: + account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name("china")) + await account.get_vehicles() + + with mock.patch( + "bimmer_connected.api.authentication.MyBMWAuthentication._refresh_token_china", + wraps=account.mybmw_client_config.authentication._refresh_token_china, # pylint: disable=protected-access + ) as mock_listener: + mock_api.get("/eadrax-vcs/v1/vehicles").mock( + side_effect=[httpx.Response(401), *([httpx.Response(200, json=[])] * 10)] + ) + mock_listener.reset_mock() + await account.get_vehicles() + + assert mock_listener.call_count == 1 + assert account.mybmw_client_config.authentication.refresh_token is not None + + +@pytest.mark.asyncio +async def test_login_refresh_token_china_invalid(caplog): + """Test the login flow using refresh_token.""" + with account_mock() as mock_api: + mock_api.post("/eadrax-coas/v1/oauth/token").mock( + side_effect=[ + httpx.Response(400), + httpx.Response(200, json=load_response(RESPONSE_DIR / "auth" / "auth_token.json")), + ] + ) + + account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name("china")) + account.set_refresh_token("INVALID") + + caplog.set_level(logging.DEBUG) + await account.get_vehicles() + + debug_messages = [r.message for r in caplog.records if r.name.startswith("bimmer_connected")] + assert "Authenticating with refresh token for China." in debug_messages + assert "Unable to get access token using refresh token." in debug_messages + assert "Authenticating with MyBMW flow for China." in debug_messages + + @account_mock() @pytest.mark.asyncio async def test_vehicles(): @@ -159,7 +257,7 @@ async def test_vehicles(): account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, get_region_from_name("china")) await account.get_vehicles() - assert account.mybmw_client_config.authentication.token is not None + assert account.mybmw_client_config.authentication.access_token is not None assert get_fingerprint_count() == len(account.vehicles) vehicle = account.get_vehicle(VIN_G21) @@ -233,7 +331,7 @@ async def test_storing_fingerprints(tmp_path): txt_files = [f for f in files if f.suffix == ".txt"] assert len(json_files) == 2 - assert len(txt_files) == 2 + assert len(txt_files) == 1 @pytest.mark.asyncio @@ -273,16 +371,6 @@ async def test_set_observer_invalid_values(): account.set_observer_position(1.0, "16.0") -@account_mock() -@pytest.mark.asyncio -async def test_base_authentication(): - """Test logging in with the Authentication base clas.""" - account = get_account() - account.mybmw_client_config = MyBMWClientConfiguration(Authentication(TEST_USERNAME, TEST_PASSWORD, TEST_REGION)) - with pytest.raises(NotImplementedError): - await account.get_vehicles() - - @account_mock() @pytest.mark.asyncio async def test_deprecated_account(caplog): @@ -305,3 +393,65 @@ async def test_refresh_token_getset(): account.set_refresh_token("new_refresh_token") assert account.refresh_token == "new_refresh_token" + + +@pytest.mark.asyncio +async def test_429_retry_ok(caplog): + """Test the login flow using refresh_token.""" + with account_mock() as mock_api: + account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, TEST_REGION) + + json_429 = {"statusCode": 429, "message": "Rate limit is exceeded. Try again in 2 seconds."} + + mock_api.get("/eadrax-ucs/v1/presentation/oauth/config").mock( + side_effect=[ + httpx.Response(429, json=json_429), + httpx.Response(429, json=json_429), + httpx.Response(200, json=load_response(RESPONSE_DIR / "auth" / "oauth_config.json")), + ] + ) + caplog.set_level(logging.DEBUG) + + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + await account.get_vehicles() + + log_429 = [ + r + for r in caplog.records + if r.module == "authentication" and "seconds due to 429 Too Many Requests" in r.message + ] + assert len(log_429) == 2 + + +@pytest.mark.asyncio +async def test_429_retry_raise(caplog): + """Test the login flow using refresh_token.""" + with account_mock() as mock_api: + account = MyBMWAccount(TEST_USERNAME, TEST_PASSWORD, TEST_REGION) + + json_429 = {"statusCode": 429, "message": "Rate limit is exceeded. Try again in 2 seconds."} + + mock_api.get("/eadrax-ucs/v1/presentation/oauth/config").mock( + side_effect=[ + *[httpx.Response(429, json=json_429)] * 6, + ] + ) + caplog.set_level(logging.DEBUG) + + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + with pytest.raises(httpx.HTTPStatusError): + await account.get_vehicles() + + +@account_mock() +@pytest.mark.asyncio +async def test_client_async_only(): + """Test that the Authentication providers only work async.""" + + with httpx.Client(auth=MyBMWAuthentication(TEST_USERNAME, TEST_PASSWORD, TEST_REGION)) as client: + with pytest.raises(RuntimeError): + client.get("/eadrax-ucs/v1/presentation/oauth/config") + + with httpx.Client(auth=MyBMWLoginRetry()) as client: + with pytest.raises(RuntimeError): + client.get("/eadrax-ucs/v1/presentation/oauth/config") diff --git a/test/test_api.py b/test/test_api.py index 7abefde1..bbef5cc8 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -1,17 +1,11 @@ """Tests for API that are not covered by other tests.""" -import datetime import json -import sys -from unittest import mock import pytest -from black import asyncio from bimmer_connected.api.regions import get_region_from_name, valid_regions from bimmer_connected.api.utils import anonymize_data, log_to_to_file -from .test_account import account_mock, get_account - def test_valid_regions(): """Test valid regions.""" @@ -56,28 +50,3 @@ def test_log_to_file_without_file_name(tmp_path): """Test not logging to file if no file name is given.""" assert log_to_to_file(content=[], logfile_path=tmp_path, logfile_name=None) is None assert len(list(tmp_path.iterdir())) == 0 - - -@account_mock() -def test_asyncio_run_lock(): - """Test calling asyncio.run() multiple times.""" - - with mock.patch("bimmer_connected.api.authentication.EXPIRES_AT_OFFSET", datetime.timedelta(seconds=30000)): - account = get_account() - with mock.patch( - "bimmer_connected.api.authentication.MyBMWAuthentication._create_or_update_lock", - wraps=account.mybmw_client_config.authentication._create_or_update_lock, # pylint: disable=protected-access - ) as mock_listener: - mock_listener.reset_mock() - - # Python 3.6 doesn't provide asyncio.run() - if sys.version_info < (3, 7): - for _ in range(2): - loop = asyncio.new_event_loop() - loop.run_until_complete(account.get_vehicles()) - loop.close() - else: - asyncio.run(account.get_vehicles()) - asyncio.run(account.get_vehicles()) - - assert mock_listener.call_count == 4 diff --git a/test/test_vehicle.py b/test/test_vehicle.py index b9759f7a..c5c85f0f 100644 --- a/test/test_vehicle.py +++ b/test/test_vehicle.py @@ -3,6 +3,7 @@ from bimmer_connected.const import CarBrands from bimmer_connected.vehicle import DriveTrainType, VehicleViewDirection +from bimmer_connected.vehicle.models import GPSPosition, StrEnum, VehicleDataBase from . import ( VIN_F11, @@ -209,3 +210,43 @@ async def test_vehicle_image(caplog): assert b"png_image" == await vehicle.get_vehicle_image(VehicleViewDirection.FRONT) assert len(get_deprecation_warning_count(caplog)) == 0 + + +@pytest.mark.asyncio +async def test_no_timestamp(): + """Test no timestamp available.""" + vehicle = (await get_mocked_account()).get_vehicle(VIN_F31) + vehicle._properties.pop("lastUpdatedAt") # pylint: disable=protected-access + vehicle._status.pop("lastUpdatedAt") # pylint: disable=protected-access + + assert vehicle.timestamp is None + + +def test_strenum(): + """Tests StrEnum.""" + + class TestEnum(StrEnum): + """Test StrEnum.""" + + HELLO = "HELLO" + + assert TestEnum("hello") == TestEnum.HELLO + assert TestEnum("HELLO") == TestEnum.HELLO + + with pytest.raises(ValueError): + TestEnum("WORLD") + + +def test_vehiclebasedata(): + """Tests VehicleBaseData.""" + with pytest.raises(NotImplementedError): + VehicleDataBase._parse_vehicle_data({}) # pylint: disable=protected-access + + +def test_gpsposition(): + """Tests around GPSPosition.""" + pos = GPSPosition(1.0, 2.0) + assert pos == GPSPosition(1, 2) + assert pos == {"latitude": 1.0, "longitude": 2.0} + assert pos == (1, 2) + assert pos != "(1, 2)" diff --git a/test/test_vehicle_status.py b/test/test_vehicle_status.py index 7eb02422..20afb9c0 100644 --- a/test/test_vehicle_status.py +++ b/test/test_vehicle_status.py @@ -7,7 +7,8 @@ from bimmer_connected.api.regions import get_region_from_name from bimmer_connected.vehicle.doors_windows import LidState, LockState -from bimmer_connected.vehicle.fuel_and_battery import ChargingState +from bimmer_connected.vehicle.fuel_and_battery import ChargingState, FuelAndBattery +from bimmer_connected.vehicle.location import VehicleLocation from bimmer_connected.vehicle.reports import CheckControlStatus, ConditionBasedServiceStatus from . import VIN_F11, VIN_F31, VIN_F48, VIN_G01, VIN_G08, VIN_G23, VIN_G30, VIN_I01_REX, get_deprecation_warning_count @@ -25,16 +26,14 @@ async def test_generic(caplog): assert 7991 == status.mileage[0] assert "km" == status.mileage[1] - assert (12.3456, 34.5678) == status.vehicle_location.location - assert 123 == status.vehicle_location.heading - assert len(get_deprecation_warning_count(caplog)) == 0 @pytest.mark.asyncio async def test_range_combustion_no_info(caplog): """Test if the parsing of mileage and range is working""" - status = (await get_mocked_account()).get_vehicle(VIN_F31).fuel_and_battery + vehicle = (await get_mocked_account()).get_vehicle(VIN_F31) + status = vehicle.fuel_and_battery assert (32, "LITERS") == status.remaining_fuel assert status.remaining_range_fuel == (None, None) @@ -45,6 +44,14 @@ async def test_range_combustion_no_info(caplog): assert status.remaining_range_total == (None, None) + status_from_vehicle_data = FuelAndBattery.from_vehicle_data(vehicle.data) + status_from_vehicle_data.account_timezone = status.account_timezone + assert status_from_vehicle_data == status + assert FuelAndBattery.from_vehicle_data({}) is None + + # pylint: disable=protected-access + assert FuelAndBattery._parse_to_tuple({"rangeValue": "seventeen", "rangeUnit": "mi"}) == (None, None) + assert len(get_deprecation_warning_count(caplog)) == 0 @@ -217,6 +224,21 @@ async def test_condition_based_services(caplog): assert len(get_deprecation_warning_count(caplog)) == 0 +@pytest.mark.asyncio +async def test_position_generic(caplog): + """Test generic attributes.""" + status = (await get_mocked_account()).get_vehicle(VIN_G30) + + assert (12.3456, 34.5678) == status.vehicle_location.location + assert 123 == status.vehicle_location.heading + + assert VehicleLocation.from_vehicle_data(status.data).location == status.vehicle_location.location + + assert VehicleLocation.from_vehicle_data({}) is None + + assert len(get_deprecation_warning_count(caplog)) == 0 + + @pytest.mark.asyncio async def test_parse_f31_no_position(caplog): """Test parsing of F31 data with position tracking disabled in the vehicle."""