Coverage for custom_components/autoarm/hass_api.py: 83%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-08 20:27 +0000

1from __future__ import annotations 

2 

3import logging 

4from functools import partial 

5from typing import TYPE_CHECKING, Any, cast 

6 

7from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState 

8from homeassistant.core import HomeAssistant 

9from homeassistant.exceptions import ConditionError, ConditionErrorContainer 

10from homeassistant.helpers import condition as condition 

11from homeassistant.helpers import issue_registry as ir 

12from homeassistant.helpers.template import Template 

13from homeassistant.helpers.typing import ConfigType 

14 

15from .const import DOMAIN, ConditionVariables 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Callable 

19 

20 from homeassistant.core import HomeAssistant 

21 from homeassistant.helpers.typing import ConfigType 

22 

23 

24_LOGGER = logging.getLogger(__name__) 

25 

26 

27class HomeAssistantAPI: 

28 def __init__(self, hass: HomeAssistant | None = None) -> None: 

29 self._hass = hass 

30 

31 def raise_issue( 

32 self, 

33 issue_id: str, 

34 issue_key: str, 

35 issue_map: dict[str, str], 

36 severity: ir.IssueSeverity = ir.IssueSeverity.WARNING, 

37 learn_more_url: str = "https://autoarm.rhizomatics.org.uk", 

38 is_fixable: bool = False, 

39 ) -> None: 

40 if not self._hass: 

41 return 

42 ir.async_create_issue( 

43 self._hass, 

44 DOMAIN, 

45 issue_id, 

46 translation_key=issue_key, 

47 translation_placeholders=issue_map, 

48 severity=severity, 

49 learn_more_url=learn_more_url, 

50 is_fixable=is_fixable, 

51 ) 

52 

53 async def build_condition( 

54 self, condition_config: list[ConfigType], strict: bool = False, validate: bool = False, name: str = DOMAIN 

55 ) -> Callable | None: 

56 if self._hass is None: 

57 raise ValueError("HomeAssistant not available") 

58 capturing_logger: ConditionErrorLoggingAdaptor = ConditionErrorLoggingAdaptor(_LOGGER) 

59 condition_variables: ConditionVariables = ConditionVariables(False, False, AlarmControlPanelState.PENDING, {}) 

60 cond_list: list[ConfigType] 

61 try: 

62 if validate: 

63 cond_list = cast( 

64 "list[ConfigType]", await condition.async_validate_conditions_config(self._hass, condition_config) 

65 ) 

66 else: 

67 cond_list = condition_config 

68 except Exception as e: 

69 _LOGGER.exception("AUTOARM Condition validation failed: %s", e) 

70 raise 

71 try: 

72 if strict: 

73 force_strict_template_mode(cond_list, undo=False) 

74 

75 test: Callable = await condition.async_conditions_from_config( 

76 self._hass, cond_list, cast("logging.Logger", capturing_logger), name 

77 ) 

78 if test is None: 

79 raise ValueError(f"Invalid condition {condition_config}") 

80 test({DOMAIN: condition_variables.as_dict()}) 

81 if strict and capturing_logger.condition_errors: 

82 for exception in capturing_logger.condition_errors: 

83 _LOGGER.warning("AUTOARM Invalid condition %s:%s", condition_config, exception) 

84 raise capturing_logger.condition_errors[0] 

85 return test 

86 except Exception as e: 

87 _LOGGER.exception("AUTOARM Condition eval failed: %s", e) 

88 raise 

89 finally: 

90 if strict: 

91 force_strict_template_mode(condition_config, undo=True) 

92 

93 def evaluate_condition( 

94 self, 

95 condition: Callable, 

96 condition_variables: ConditionVariables | None = None, 

97 ) -> bool | None: 

98 if self._hass is None: 

99 raise ValueError("HomeAssistant not available") 

100 try: 

101 return condition({DOMAIN: condition_variables.as_dict()} if condition_variables else None) 

102 except Exception as e: 

103 _LOGGER.error("AUTOARM Condition eval failed: %s", e) 

104 raise 

105 

106 

107class ConditionErrorLoggingAdaptor(logging.LoggerAdapter): 

108 def __init__(self, *args: Any, **kwargs: Any) -> None: 

109 super().__init__(*args, **kwargs) 

110 self.condition_errors: list[ConditionError] = [] 

111 

112 def capture(self, args: Any) -> None: 

113 if args and isinstance(args, (list, tuple)): 

114 for arg in args: 

115 if isinstance(arg, ConditionErrorContainer): 

116 self.condition_errors.extend(arg.errors) 

117 elif isinstance(arg, ConditionError): 

118 self.condition_errors.append(arg) 

119 

120 def error(self, msg: Any, *args: object, **kwargs: Any) -> None: 

121 self.capture(args) 

122 self.logger.error(msg, args, kwargs) 

123 

124 def warning(self, msg: Any, *args: Any, **kwargs: Any) -> None: 

125 self.capture(args) 

126 self.logger.warning(msg, args, kwargs) 

127 

128 

129def force_strict_template_mode(conditions: list[ConfigType], undo: bool = False) -> None: 

130 class TemplateWrapper: 

131 def __init__(self, obj: Template) -> None: 

132 self._obj = obj 

133 

134 def __getattr__(self, name: str) -> Any: 

135 if name == "async_render_to_info": 

136 return partial(self._obj.async_render_to_info, strict=True) 

137 return getattr(self._obj, name) 

138 

139 def __setattr__(self, name: str, value: Any) -> None: 

140 super().__setattr__(name, value) 

141 

142 def wrap_template(cond: ConfigType, undo: bool) -> ConfigType: 

143 for key, val in cond.items(): 

144 if not undo and isinstance(val, Template) and hasattr(val, "_env"): 

145 cond[key] = TemplateWrapper(val) 

146 elif undo and isinstance(val, TemplateWrapper): 

147 cond[key] = val._obj 

148 elif isinstance(val, dict): 

149 wrap_template(val, undo) 

150 return cond 

151 

152 if conditions is not None: 

153 conditions = [wrap_template(condition, undo) for condition in conditions]