Coverage for custom_components/autoarm/helpers.py: 92%
75 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-17 01:14 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-17 01:14 +0000
1import datetime as dt
2import logging
3import re
4from typing import Any, cast
6import homeassistant.util.dt as dt_util
7from homeassistant.auth import HomeAssistant
8from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState
9from homeassistant.core import State
10from homeassistant.helpers.json import ExtendedJSONEncoder
12from .const import DOMAIN
14_LOGGER = logging.getLogger(__name__)
17def alarm_state_as_enum(state_str: str | None) -> AlarmControlPanelState | None:
18 if state_str is None:
19 return None
20 try:
21 return AlarmControlPanelState(state_str)
22 except ValueError as e:
23 _LOGGER.warning("AUTOARM Invalid alarm state: %s", e)
24 return None
27def safe_state(state: State | None) -> str | None:
28 try:
29 return state.state if state is not None else None
30 except Exception as e:
31 _LOGGER.debug("AUTOARM Failed to load state %s: %s", state, e)
32 return None
35class Limiter:
36 """Rate limiting tracker"""
38 def __init__(self, window: dt.timedelta, max_calls: int = 4) -> None:
39 self.calls: list[dt.datetime] = []
40 self.window: dt.timedelta = window
41 self.max_calls: int = max_calls
42 _LOGGER.debug(
43 "AUTOARM Rate limiter initialized with window %s and max_calls %s",
44 window,
45 max_calls,
46 )
48 def triggered(self) -> bool:
49 """Register a call and check if window based rate limit triggered"""
50 cut_off: dt.datetime = dt_util.now() - self.window
51 self.calls.append(dt_util.now())
52 in_scope = 0
54 for call in self.calls[:]:
55 if call >= cut_off:
56 in_scope += 1
57 else:
58 self.calls.remove(call)
60 return in_scope > self.max_calls
63def deobjectify(obj: object) -> dict[Any, Any] | str | int | float | bool | None:
64 if obj is None or isinstance(obj, (str, int, float, bool)):
65 return obj
66 if isinstance(obj, (dt.datetime, dt.time, dt.date)):
67 return obj.isoformat()
68 as_dict = getattr(obj, "as_dict", None)
69 if as_dict is None:
70 return str(obj)
71 return as_dict()
74class AppHealthTracker:
75 def __init__(self, hass: HomeAssistant) -> None:
76 self.hass = hass
77 self.initialization_errors: dict[str, int] = {}
78 self.failures = 0
80 def app_initialized(self) -> None:
81 self.hass.states.async_set(
82 f"binary_sensor.{DOMAIN}_initialized",
83 "valid" if not self.initialization_errors else "invalid",
84 attributes=self.initialization_errors,
85 )
86 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures))
88 def record_initialization_error(self, stage: str) -> None:
89 self.initialization_errors.setdefault(stage, 0)
90 self.initialization_errors[stage] += 1
91 self.failures += 1
92 self.hass.states.async_set(
93 f"sensor.{DOMAIN}_failures", str(self.failures), attributes={"initialization_errors": self.initialization_errors}
94 )
96 def record_runtime_error(self) -> None:
97 self.failures += 1
98 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures))
101class ExtendedExtendedJSONEncoder(ExtendedJSONEncoder):
102 def default(self, o: Any) -> Any:
103 if isinstance(o, dt.time):
104 return cast("dt.time", o).isoformat()
105 if isinstance(o, dt.timedelta):
106 td: dt.timedelta = cast("dt.timedelta", o)
107 return f"{td.seconds} seconds"
108 if isinstance(o, re.Pattern):
109 return {cast("re.Pattern", o).pattern}
110 return super().default(o)