Coverage for custom_components/autoarm/helpers.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-27 16:32 +0000

1import datetime as dt 

2import logging 

3import re 

4from typing import TYPE_CHECKING, Any 

5 

6import homeassistant.util.dt as dt_util 

7from homeassistant.auth import HomeAssistant 

8from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState 

9from homeassistant.core import State 

10from homeassistant.helpers.json import ExtendedJSONEncoder 

11 

12from .const import DOMAIN, ChangeSource 

13 

14if TYPE_CHECKING: 

15 from collections.abc import Callable 

16 

17_LOGGER = logging.getLogger(__name__) 

18 

19 

20def alarm_state_as_enum(state_str: str | None) -> AlarmControlPanelState | None: 

21 if state_str is None: 

22 return None 

23 try: 

24 return AlarmControlPanelState(state_str) 

25 except ValueError as e: 

26 _LOGGER.warning("AUTOARM Invalid alarm state: %s", e) 

27 return None 

28 

29 

30def change_source_as_enum(source_str: str | None) -> ChangeSource | None: 

31 if source_str is None: 

32 return None 

33 try: 

34 return ChangeSource(source_str) 

35 except ValueError as e: 

36 _LOGGER.warning("AUTOARM Invalid change source: %s", e) 

37 return None 

38 

39 

40def safe_state(state: State | None) -> str | None: 

41 try: 

42 return state.state if state is not None else None 

43 except Exception as e: 

44 _LOGGER.debug("AUTOARM Failed to load state %s: %s", state, e) 

45 return None 

46 

47 

48class Limiter: 

49 """Rate limiting tracker""" 

50 

51 def __init__(self, window: dt.timedelta, max_calls: int = 4) -> None: 

52 self.calls: list[dt.datetime] = [] 

53 self.window: dt.timedelta = window 

54 self.max_calls: int = max_calls 

55 _LOGGER.debug( 

56 "AUTOARM Rate limiter initialized with window %s and max_calls %s", 

57 window, 

58 max_calls, 

59 ) 

60 

61 def triggered(self) -> bool: 

62 """Register a call and check if window based rate limit triggered""" 

63 cut_off: dt.datetime = dt_util.now() - self.window 

64 self.calls.append(dt_util.now()) 

65 in_scope = 0 

66 

67 for call in self.calls[:]: 

68 if call >= cut_off: 

69 in_scope += 1 

70 else: 

71 self.calls.remove(call) 

72 

73 return in_scope > self.max_calls 

74 

75 

76def deobjectify(obj: object) -> dict[Any, Any] | str | int | float | bool | None: 

77 if obj is None or isinstance(obj, (str, int, float, bool)): 

78 return obj 

79 if isinstance(obj, (dt.datetime, dt.time, dt.date)): 

80 return obj.isoformat() 

81 as_dict: Callable[[], dict[Any, Any]] | None = getattr(obj, "as_dict", None) 

82 if as_dict is None: 

83 return str(obj) 

84 return as_dict() 

85 

86 

87class AppHealthTracker: 

88 def __init__(self, hass: HomeAssistant) -> None: 

89 self.hass = hass 

90 self.initialization_errors: dict[str, int] = {} 

91 self.failures = 0 

92 

93 def app_initialized(self) -> None: 

94 self.hass.states.async_set( 

95 f"binary_sensor.{DOMAIN}_initialized", 

96 "valid" if not self.initialization_errors else "invalid", 

97 attributes=self.initialization_errors, 

98 ) 

99 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures)) 

100 

101 def record_initialization_error(self, stage: str) -> None: 

102 self.initialization_errors.setdefault(stage, 0) 

103 self.initialization_errors[stage] += 1 

104 self.failures += 1 

105 self.hass.states.async_set( 

106 f"sensor.{DOMAIN}_failures", str(self.failures), attributes={"initialization_errors": self.initialization_errors} 

107 ) 

108 

109 def record_runtime_error(self) -> None: 

110 self.failures += 1 

111 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures)) 

112 

113 

114class ExtendedExtendedJSONEncoder(ExtendedJSONEncoder): 

115 def default(self, o: Any) -> Any: 

116 if isinstance(o, dt.time): 

117 return o.isoformat() 

118 if isinstance(o, dt.timedelta): 

119 td: dt.timedelta = o 

120 return f"{td.seconds} seconds" 

121 if isinstance(o, re.Pattern): 

122 return {o.pattern} 

123 return super().default(o)