Coverage for custom_components/autoarm/helpers.py: 92%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-17 01:14 +0000

1import datetime as dt 

2import logging 

3import re 

4from typing import Any, cast 

5 

6import homeassistant.util.dt as dt_util 

7from homeassistant.auth import HomeAssistant 

8from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState 

9from homeassistant.core import State 

10from homeassistant.helpers.json import ExtendedJSONEncoder 

11 

12from .const import DOMAIN 

13 

14_LOGGER = logging.getLogger(__name__) 

15 

16 

17def alarm_state_as_enum(state_str: str | None) -> AlarmControlPanelState | None: 

18 if state_str is None: 

19 return None 

20 try: 

21 return AlarmControlPanelState(state_str) 

22 except ValueError as e: 

23 _LOGGER.warning("AUTOARM Invalid alarm state: %s", e) 

24 return None 

25 

26 

27def safe_state(state: State | None) -> str | None: 

28 try: 

29 return state.state if state is not None else None 

30 except Exception as e: 

31 _LOGGER.debug("AUTOARM Failed to load state %s: %s", state, e) 

32 return None 

33 

34 

35class Limiter: 

36 """Rate limiting tracker""" 

37 

38 def __init__(self, window: dt.timedelta, max_calls: int = 4) -> None: 

39 self.calls: list[dt.datetime] = [] 

40 self.window: dt.timedelta = window 

41 self.max_calls: int = max_calls 

42 _LOGGER.debug( 

43 "AUTOARM Rate limiter initialized with window %s and max_calls %s", 

44 window, 

45 max_calls, 

46 ) 

47 

48 def triggered(self) -> bool: 

49 """Register a call and check if window based rate limit triggered""" 

50 cut_off: dt.datetime = dt_util.now() - self.window 

51 self.calls.append(dt_util.now()) 

52 in_scope = 0 

53 

54 for call in self.calls[:]: 

55 if call >= cut_off: 

56 in_scope += 1 

57 else: 

58 self.calls.remove(call) 

59 

60 return in_scope > self.max_calls 

61 

62 

63def deobjectify(obj: object) -> dict[Any, Any] | str | int | float | bool | None: 

64 if obj is None or isinstance(obj, (str, int, float, bool)): 

65 return obj 

66 if isinstance(obj, (dt.datetime, dt.time, dt.date)): 

67 return obj.isoformat() 

68 as_dict = getattr(obj, "as_dict", None) 

69 if as_dict is None: 

70 return str(obj) 

71 return as_dict() 

72 

73 

74class AppHealthTracker: 

75 def __init__(self, hass: HomeAssistant) -> None: 

76 self.hass = hass 

77 self.initialization_errors: dict[str, int] = {} 

78 self.failures = 0 

79 

80 def app_initialized(self) -> None: 

81 self.hass.states.async_set( 

82 f"binary_sensor.{DOMAIN}_initialized", 

83 "valid" if not self.initialization_errors else "invalid", 

84 attributes=self.initialization_errors, 

85 ) 

86 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures)) 

87 

88 def record_initialization_error(self, stage: str) -> None: 

89 self.initialization_errors.setdefault(stage, 0) 

90 self.initialization_errors[stage] += 1 

91 self.failures += 1 

92 self.hass.states.async_set( 

93 f"sensor.{DOMAIN}_failures", str(self.failures), attributes={"initialization_errors": self.initialization_errors} 

94 ) 

95 

96 def record_runtime_error(self) -> None: 

97 self.failures += 1 

98 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures)) 

99 

100 

101class ExtendedExtendedJSONEncoder(ExtendedJSONEncoder): 

102 def default(self, o: Any) -> Any: 

103 if isinstance(o, dt.time): 

104 return cast("dt.time", o).isoformat() 

105 if isinstance(o, dt.timedelta): 

106 td: dt.timedelta = cast("dt.timedelta", o) 

107 return f"{td.seconds} seconds" 

108 if isinstance(o, re.Pattern): 

109 return {cast("re.Pattern", o).pattern} 

110 return super().default(o)