Skip to content

Commit

Permalink
Refactor, HA compatibility adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
PiotrMachowski committed Jun 10, 2024
1 parent 39748dd commit a611c5f
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 300 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@

# Custom Templates

> [!CAUTION]
> This custom integration tampers with internal code of Home Assistant which _might_ cause some unforeseen issues (especially after HA updates).
> If you encounter any problems related to templating engine or translations try uninstalling this integration before raising an issue for Home Assistant.

This integration adds possibility to use new functions in Home Assistant Jinja2 templating engine:
- `ct_state_translated` - returns translated state of an entity
- `ct_state_attr_translated` - returns translated value of an attribute of an entity
Expand Down
323 changes: 24 additions & 299 deletions custom_components/custom_templates/__init__.py
Original file line number Diff line number Diff line change
@@ -1,322 +1,49 @@
import asyncio
from collections import ChainMap
import logging
from typing import Any, Callable

from homeassistant.exceptions import TemplateError
from homeassistant.const import EVENT_COMPONENT_LOADED, STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import Event, HomeAssistant, valid_entity_id
from homeassistant.helpers.entity_registry import async_get
from homeassistant.helpers.template import _get_state_if_valid, _RESERVED_NAMES, Template, TemplateEnvironment
from homeassistant.const import EVENT_COMPONENT_LOADED
from homeassistant.core import Event, HomeAssistant
from homeassistant.helpers.template import Template, TemplateEnvironment
from homeassistant.helpers.translation import _TranslationCache, TRANSLATION_FLATTEN_CACHE
from homeassistant.loader import bind_hass

from .const import (DOMAIN, CUSTOM_TEMPLATES_SCHEMA, CONF_PRELOAD_TRANSLATIONS, CONST_EVAL_FUNCTION_NAME,
CONST_STATE_TRANSLATED_FUNCTION_NAME, CONST_STATE_ATTR_TRANSLATED_FUNCTION_NAME,
CONST_TRANSLATED_FUNCTION_NAME, CONST_ALL_TRANSLATIONS_FUNCTION_NAME,
DEFAULT_UNAVAILABLE_STATES, CONST_IS_AVAILABLE_FUNCTION_NAME,
CONST_IS_AVAILABLE_FUNCTION_NAME,
CONST_DICT_MERGE_FUNCTION_NAME)
from .templates.all_translations import AllTranslations
from .templates.dict_merge import DictMerge
from .templates.eval_template import EvalTemplate
from .templates.is_available import IsAvailable
from .templates.state_attr_translated import StateAttrTranslated
from .templates.state_translated import StateTranslated
from .templates.translated import Translated

_LOGGER = logging.getLogger(__name__)

CONFIG_SCHEMA = CUSTOM_TEMPLATES_SCHEMA
ConfigType = dict[str, Any]


class TranslatableTemplate:

def __init__(self, hass: HomeAssistant, available_languages):
self._hass = hass
self._available_languages = available_languages

def validate_language(self, language):
if language not in self._available_languages:
raise TemplateError(f"Language {language} is not loaded") # type: ignore[arg-type]


class DictMerge:

def __init__(self, hass: HomeAssistant):
self._hass = hass

def __call__(self, *args):
result = {k:v for d in args for k, v in d.items()}
return result

def __repr__(self):
return f"<template CT_DictMerge 0.01>"

class IsAvailable:

def __init__(self, hass: HomeAssistant):
self._hass = hass

def __call__(self, entity_id: str, unavailable_states=DEFAULT_UNAVAILABLE_STATES):
unavailable_states = [s.lower() if type(s) is str else s for s in unavailable_states]
state = None
if "." in entity_id:
state = _get_state_if_valid(self._hass, entity_id)
else:
if entity_id in _RESERVED_NAMES:
return None
if not valid_entity_id(f"{entity_id}.entity"):
raise TemplateError(f"Invalid domain name '{entity_id}'")

if state is not None:
state = state.state
if state is str:
state = state.lower()
result = state not in unavailable_states
return result

def __repr__(self):
return f"<template CT_IsAvailable>"


class StateTranslated(TranslatableTemplate):

def __init__(self, hass: HomeAssistant, available_languages):
super().__init__(hass, available_languages)

def __call__(self, entity_id: str, language: str):
self.validate_language(language)
state = None
if "." in entity_id:
state = _get_state_if_valid(self._hass, entity_id)

else:
if entity_id in _RESERVED_NAMES:
return None

if not valid_entity_id(f"{entity_id}.entity"):
raise TemplateError(f"Invalid domain name '{entity_id}'") # type: ignore[arg-type]

if state is None:
return STATE_UNKNOWN

state_value = state.state
domain = state.domain
device_class = state.attributes.get("device_class")
entry = async_get(self._hass).async_get(entity_id)
platform = None if entry is None else entry.platform
translation_key = None if entry is None else entry.translation_key

return async_translate_state(
self._hass, state_value, domain, platform, translation_key, device_class
)

def __repr__(self):
return "<template CT_StateTranslated>"


class StateAttrTranslated(TranslatableTemplate):

def __init__(self, hass: HomeAssistant, available_languages):
super().__init__(hass, available_languages)

def __call__(self, entity_id: str, attribute: str, language: str):
self.validate_language(language)
state = None
if "." in entity_id:
state = _get_state_if_valid(self._hass, entity_id)

else:
if entity_id in _RESERVED_NAMES:
return None

if not valid_entity_id(f"{entity_id}.entity"):
raise TemplateError(f"Invalid domain name '{entity_id}'") # type: ignore[arg-type]

if state is None:
return STATE_UNKNOWN
attribute_value = None
if attribute in state.attributes:
attribute_value = state.attributes.get(attribute)
entry = async_get(self._hass).async_get(entity_id)
domain = state.domain
device_class = "_"
if "device_class" in state.attributes:
device_class = state.attributes["device_class"]

translations = get_cached_translations(self._hass, language, "entity_component")
key = f"component.{domain}.entity_component.{device_class}.state_attributes.{attribute}.state.{attribute_value}"
if len(translations) > 0 and key in translations:
return str(translations[key])
if (entry is not None and
entry.unique_id is not None and
hasattr(entry, "translation_key") and
entry.translation_key is not None):
key = f"component.{entry.platform}.entity.{domain}.{entry.translation_key}.state_attributes.{attribute}.state.{attribute_value}"
translations = get_cached_translations(self._hass, language, "entity")
if len(translations) > 0 and key in translations:
return str(translations[key])
_LOGGER.warning(f"No translation found for entity: {entity_id} and attribute: {attribute}")
return attribute_value

def __repr__(self):
return "<template CT_StateAttrTranslated>"


class Translated(TranslatableTemplate):

def __init__(self, hass: HomeAssistant, available_languages):
super().__init__(hass, available_languages)

def __call__(self, key: str, language: str):
self.validate_language(language)
translations = get_cached_translations(self._hass, language, "state")
if len(translations) > 0 and key in translations:
return str(translations[key])
translations = get_cached_translations(self._hass, language, "entity_component")
if len(translations) > 0 and key in translations:
return str(translations[key])
translations = get_cached_translations(self._hass, language, "entity")
if len(translations) > 0 and key in translations:
return str(translations[key])
_LOGGER.warning(f"No translation found for key: {key}")
return key

def __repr__(self):
return "<template CT_Translated>"


class AllTranslations(TranslatableTemplate):

def __init__(self, hass: HomeAssistant, available_languages):
super().__init__(hass, available_languages)

def __call__(self, language: str):
self.validate_language(language)
translations = {}
translations.update(get_cached_translations(self._hass, language, "state"))
translations.update(get_cached_translations(self._hass, language, "entity"))
translations.update(get_cached_translations(self._hass, language, "entity_component"))
return translations

def __repr__(self):
return "<template CT_AllTranslations>"


class EvalTemplate:

def __init__(self, hass: HomeAssistant):
self._hass = hass

def __call__(self, content: str):
tpl = Template(content, self._hass)
return tpl.async_render()

def __repr__(self):
return "<template CT_EvalTemplate>"


def get_cached(
self,
language: str,
category: str,
components: set[str],
):
category_cache = self.cache.get(language, {}).get(category, {})
if len(components) == 1 and (component := next(iter(components))):
return category_cache.get(component, {})
result: dict[str, str] = {}
for component in components.intersection(category_cache):
result.update(category_cache[component])
return result

def async_translate_state(
hass: HomeAssistant,
state: str,
domain: str,
platform: str | None,
translation_key: str | None,
device_class: str | None,
) -> str:
"""Translate provided state using cached translations for currently selected language."""
if state in [STATE_UNAVAILABLE, STATE_UNKNOWN]:
return state
language = hass.config.language
if platform is not None and translation_key is not None:
localize_key = (
f"component.{platform}.entity.{domain}.{translation_key}.state.{state}"
)
translations = get_cached_translations(hass, language, "entity")
if localize_key in translations:
return translations[localize_key]

translations = get_cached_translations(hass, language, "entity_component")
if device_class is not None:
localize_key = (
f"component.{domain}.entity_component.{device_class}.state.{state}"
)
if localize_key in translations:
return translations[localize_key]
localize_key = f"component.{domain}.entity_component._.state.{state}"
if localize_key in translations:
return translations[localize_key]

translations = get_cached_translations(hass, language, "state", domain)
if device_class is not None:
localize_key = f"component.{domain}.state.{device_class}.{state}"
if localize_key in translations:
return translations[localize_key]
localize_key = f"component.{domain}.state._.{state}"
if localize_key in translations:
return translations[localize_key]

return state



@bind_hass
async def load_translations_to_cache(
hass: HomeAssistant,
language: str,
):
components_entities = {
component for component in hass.config.components if "." not in component
}
components_state = set(hass.config.components)
cache = hass.data.setdefault(TRANSLATION_FLATTEN_CACHE, _TranslationCache(hass))
await cache.async_fetch(language, "entity", components_entities)
await cache.async_fetch(language, "states", components_state)
await cache.async_fetch(language, "entity_component", components_state)


@bind_hass
def get_cached_translations(
hass: HomeAssistant,
language: str,
category: str,
integration=None,
):
if integration is not None:
components = {integration}
elif category == "state":
components = set(hass.config.components)
else:
components = {
component for component in hass.config.components if "." not in component
}

cache = hass.data.setdefault(TRANSLATION_FLATTEN_CACHE, _TranslationCache(hass))
# noinspection PyUnresolvedReferences
return cache.ct_patched_get_cached(language, category, components)


# noinspection PyProtectedMember
def setup(hass: HomeAssistant, config: ConfigType):
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
if DOMAIN not in config:
return True
languages = []
if CONF_PRELOAD_TRANSLATIONS in config[DOMAIN]:
languages = config[DOMAIN][CONF_PRELOAD_TRANSLATIONS]
cache: _TranslationCache = hass.data[TRANSLATION_FLATTEN_CACHE]

async def load_translations(_event: Event):
async def _async_load_translations(_: Event) -> None:
for language in languages:
await load_translations_to_cache(hass, language)
_LOGGER.debug("Loading translations for language: %s", language)
_
await cache.async_load(language, hass.config.components)

hass.bus.async_listen(EVENT_COMPONENT_LOADED, load_translations)
hass.bus.async_listen(
EVENT_COMPONENT_LOADED,
_async_load_translations
)

state_translated_template = StateTranslated(hass, languages)
state_attr_translated_template = StateAttrTranslated(hass, languages)
Expand All @@ -326,15 +53,13 @@ async def load_translations(_event: Event):
is_available_template = IsAvailable(hass)
dict_merge_template = DictMerge(hass)

_TranslationCache.ct_patched_get_cached = get_cached

def is_safe_callable(self: TemplateEnvironment, obj):
def is_safe_callable(self: TemplateEnvironment, obj) -> bool:
# noinspection PyUnresolvedReferences
return (isinstance(obj, (
StateTranslated, StateAttrTranslated, EvalTemplate, Translated, AllTranslations, IsAvailable, DictMerge))
or self.ct_original_is_safe_callable(obj))

def patch_environment(env: TemplateEnvironment):
def patch_environment(env: TemplateEnvironment) -> None:
env.globals[CONST_STATE_TRANSLATED_FUNCTION_NAME] = state_translated_template
env.globals[CONST_STATE_ATTR_TRANSLATED_FUNCTION_NAME] = state_attr_translated_template
env.globals[CONST_TRANSLATED_FUNCTION_NAME] = translated_template
Expand All @@ -355,7 +80,7 @@ def patched_init(
limited: bool | None = False,
strict: bool | None = False,
log_fn: Callable[[int, str], None] | None = None,
):
) -> None:
# noinspection PyUnresolvedReferences
self.ct_original__init__(hass_param, limited, strict, log_fn)
patch_environment(self)
Expand Down
2 changes: 1 addition & 1 deletion custom_components/custom_templates/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
CONST_TRANSLATED_FUNCTION_NAME = "ct_translated"
CONST_ALL_TRANSLATIONS_FUNCTION_NAME = "ct_all_translations"
CONST_IS_AVAILABLE_FUNCTION_NAME = "ct_is_available"
CONST_DICT_MERGE_FUNCTION_NAME = 'ct_dict_merge'
CONST_DICT_MERGE_FUNCTION_NAME = 'ct_dict_merge'
1 change: 1 addition & 0 deletions custom_components/custom_templates/templates/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Templates available in custom_templates
Loading

0 comments on commit a611c5f

Please sign in to comment.