Coverage for custom_components/autoarm/helpers.py: 100%
83 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-27 16:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-27 16:32 +0000
1import datetime as dt
2import logging
3import re
4from typing import TYPE_CHECKING, Any
6import homeassistant.util.dt as dt_util
7from homeassistant.auth import HomeAssistant
8from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState
9from homeassistant.core import State
10from homeassistant.helpers.json import ExtendedJSONEncoder
12from .const import DOMAIN, ChangeSource
14if TYPE_CHECKING:
15 from collections.abc import Callable
17_LOGGER = logging.getLogger(__name__)
20def alarm_state_as_enum(state_str: str | None) -> AlarmControlPanelState | None:
21 if state_str is None:
22 return None
23 try:
24 return AlarmControlPanelState(state_str)
25 except ValueError as e:
26 _LOGGER.warning("AUTOARM Invalid alarm state: %s", e)
27 return None
30def change_source_as_enum(source_str: str | None) -> ChangeSource | None:
31 if source_str is None:
32 return None
33 try:
34 return ChangeSource(source_str)
35 except ValueError as e:
36 _LOGGER.warning("AUTOARM Invalid change source: %s", e)
37 return None
40def safe_state(state: State | None) -> str | None:
41 try:
42 return state.state if state is not None else None
43 except Exception as e:
44 _LOGGER.debug("AUTOARM Failed to load state %s: %s", state, e)
45 return None
48class Limiter:
49 """Rate limiting tracker"""
51 def __init__(self, window: dt.timedelta, max_calls: int = 4) -> None:
52 self.calls: list[dt.datetime] = []
53 self.window: dt.timedelta = window
54 self.max_calls: int = max_calls
55 _LOGGER.debug(
56 "AUTOARM Rate limiter initialized with window %s and max_calls %s",
57 window,
58 max_calls,
59 )
61 def triggered(self) -> bool:
62 """Register a call and check if window based rate limit triggered"""
63 cut_off: dt.datetime = dt_util.now() - self.window
64 self.calls.append(dt_util.now())
65 in_scope = 0
67 for call in self.calls[:]:
68 if call >= cut_off:
69 in_scope += 1
70 else:
71 self.calls.remove(call)
73 return in_scope > self.max_calls
76def deobjectify(obj: object) -> dict[Any, Any] | str | int | float | bool | None:
77 if obj is None or isinstance(obj, (str, int, float, bool)):
78 return obj
79 if isinstance(obj, (dt.datetime, dt.time, dt.date)):
80 return obj.isoformat()
81 as_dict: Callable[[], dict[Any, Any]] | None = getattr(obj, "as_dict", None)
82 if as_dict is None:
83 return str(obj)
84 return as_dict()
87class AppHealthTracker:
88 def __init__(self, hass: HomeAssistant) -> None:
89 self.hass = hass
90 self.initialization_errors: dict[str, int] = {}
91 self.failures = 0
93 def app_initialized(self) -> None:
94 self.hass.states.async_set(
95 f"binary_sensor.{DOMAIN}_initialized",
96 "valid" if not self.initialization_errors else "invalid",
97 attributes=self.initialization_errors,
98 )
99 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures))
101 def record_initialization_error(self, stage: str) -> None:
102 self.initialization_errors.setdefault(stage, 0)
103 self.initialization_errors[stage] += 1
104 self.failures += 1
105 self.hass.states.async_set(
106 f"sensor.{DOMAIN}_failures", str(self.failures), attributes={"initialization_errors": self.initialization_errors}
107 )
109 def record_runtime_error(self) -> None:
110 self.failures += 1
111 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures))
114class ExtendedExtendedJSONEncoder(ExtendedJSONEncoder):
115 def default(self, o: Any) -> Any:
116 if isinstance(o, dt.time):
117 return o.isoformat()
118 if isinstance(o, dt.timedelta):
119 td: dt.timedelta = o
120 return f"{td.seconds} seconds"
121 if isinstance(o, re.Pattern):
122 return {o.pattern}
123 return super().default(o)