Coverage for custom_components/autoarm/calendar.py: 91%

137 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-08 20:27 +0000

1import datetime as dt 

2import logging 

3import re 

4from collections.abc import Callable 

5from functools import partial 

6from typing import TYPE_CHECKING, cast 

7 

8import homeassistant.util.dt as dt_util 

9from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState 

10from homeassistant.components.calendar import CalendarEntity, CalendarEvent 

11from homeassistant.const import CONF_ALIAS, CONF_ENTITY_ID 

12from homeassistant.helpers import entity_platform 

13from homeassistant.helpers.event import ( 

14 async_track_point_in_time, 

15 async_track_utc_time_change, 

16) 

17from homeassistant.helpers.typing import ConfigType 

18 

19from custom_components.autoarm.helpers import alarm_state_as_enum 

20 

21from .const import ( 

22 CONF_CALENDAR_EVENT_STATES, 

23 CONF_CALENDAR_POLL_INTERVAL, 

24) 

25 

26if TYPE_CHECKING: 

27 from homeassistant.core import CALLBACK_TYPE 

28 

29_LOGGER = logging.getLogger(__name__) 

30 

31 

32def unlisten(listener: Callable[[], None] | None) -> None: 

33 if listener: 

34 try: 

35 listener() 

36 except Exception as e: 

37 _LOGGER.debug("AUTOARM Failure closing calendar listener %s: %s", listener, e) 

38 

39 

40class TrackedCalendar: 

41 """Listener for a Home Assistant Calendar""" 

42 

43 def __init__(self, calendar_config: ConfigType, armer: "AlarmArmer") -> None: # type: ignore # noqa: F821 

44 self.enabled = False 

45 self.armer = armer 

46 self.alias: str = cast("str", calendar_config.get(CONF_ALIAS, "")) 

47 self.entity_id: str = cast("str", calendar_config.get(CONF_ENTITY_ID)) 

48 self.poll_interval: int = calendar_config.get(CONF_CALENDAR_POLL_INTERVAL, 30) 

49 self.state_mappings: dict[str, list[str]] = cast("dict", calendar_config.get(CONF_CALENDAR_EVENT_STATES)) 

50 self.tracked_events: dict[str, TrackedCalendarEvent] = {} 

51 self.poller_listener: CALLBACK_TYPE | None = None 

52 

53 async def initialize(self, calendar_platform: entity_platform.EntityPlatform) -> None: 

54 try: 

55 calendar_entity: CalendarEntity = cast("CalendarEntity", calendar_platform.domain_entities[self.entity_id]) 

56 if calendar_entity is None: 

57 _LOGGER.warning("AUTOARM Unable to access calendar %s", self.entity_id) 

58 else: 

59 self.calendar_entity = calendar_entity 

60 _LOGGER.info("AUTOARM Configured calendar %s from %s", self.entity_id, calendar_platform.platform_name) 

61 self.poller_listener = async_track_utc_time_change( 

62 self.armer.hass, 

63 self.on_timed_poll, 

64 "*", 

65 minute=f"/{self.poll_interval}", 

66 second=0, 

67 local=True, 

68 ) 

69 self.enabled = True 

70 # force an initial poll 

71 await self.match_events() 

72 

73 except Exception as _e: 

74 _LOGGER.exception("AUTOARM Failed to initialize calendar entity %s", self.entity_id) 

75 

76 def shutdown(self) -> None: 

77 unlisten(self.poller_listener) 

78 self.poller_listener = None 

79 for tracked_event in self.tracked_events.values(): 

80 tracked_event.shutdown() 

81 self.enabled = False 

82 self.tracked_events.clear() 

83 

84 async def on_timed_poll(self, _called_time: dt.datetime) -> None: 

85 """Check for new and dead events, entry point for the timed calendar tracker listener""" 

86 _LOGGER.debug("AUTOARM Calendar Poll") 

87 await self.match_events() 

88 await self.prune_events() 

89 

90 def has_active_event(self) -> bool: 

91 """Is there any event matching a state pattern that is currently open""" 

92 return any(tevent.is_current() for tevent in self.tracked_events.values()) 

93 

94 def active_events(self) -> list[CalendarEvent]: 

95 """List all the events matching a state pattern that are currently open""" 

96 return [v.event for v in self.tracked_events.values() if v.is_current()] 

97 

98 async def match_events(self) -> None: 

99 """Query the calendar for events that match state patterns""" 

100 now_local = dt_util.now() 

101 start_dt = now_local - dt.timedelta(minutes=15) 

102 end_dt = now_local + dt.timedelta(minutes=self.poll_interval + 5) 

103 

104 events: list[CalendarEvent] = await self.calendar_entity.async_get_events(self.armer.hass, start_dt, end_dt) 

105 

106 for event in events: 

107 # presume the events are sorted by start time 

108 event_id = TrackedCalendarEvent.event_id(self.calendar_entity.entity_id, event) 

109 _LOGGER.debug("AUTOARM Calendar Event: %s", event_id) 

110 for state_str, patterns in self.state_mappings.items(): 

111 if any( 

112 re.match( 

113 patt, 

114 event.summary, 

115 ) 

116 for patt in patterns 

117 ): 

118 if event_id not in self.tracked_events: 

119 state: AlarmControlPanelState | None = alarm_state_as_enum(state_str) 

120 if state is None: 

121 _LOGGER.warning( 

122 "AUTOARM Calendar %s found event %s for invalid state %s", 

123 self.calendar_entity.entity_id, 

124 event.summary, 

125 state_str, 

126 ) 

127 else: 

128 _LOGGER.info( 

129 "AUTOARM Calendar %s matched event %s for state %s", 

130 self.calendar_entity.entity_id, 

131 event.summary, 

132 state_str, 

133 ) 

134 

135 self.tracked_events[event_id] = TrackedCalendarEvent( 

136 self.calendar_entity.entity_id, event, state, self.armer 

137 ) 

138 await self.tracked_events[event_id].initialize() 

139 

140 async def prune_events(self) -> None: 

141 """Remove past events""" 

142 to_remove: list[str] = [] 

143 for event_id, tevent in self.tracked_events.items(): 

144 if not tevent.is_current() and not tevent.is_future(): 

145 _LOGGER.debug("AUTOARM Pruning expire calendar event: %s", tevent.event.uid) 

146 to_remove.append(event_id) 

147 await tevent.end(dt_util.now()) 

148 for event_id in to_remove: 

149 del self.tracked_events[event_id] 

150 

151 

152class TrackedCalendarEvent: 

153 """Generate alarm state changes for a Home Assistant Calendar event""" 

154 

155 def __init__( 

156 self, 

157 calendar_id: str, 

158 event: CalendarEvent, 

159 arming_state: AlarmControlPanelState, 

160 armer: "AlarmArmer", # type: ignore # noqa: F821 

161 ) -> None: 

162 self.tracked_at = dt_util.now() 

163 self.calendar_id = calendar_id 

164 self.id = TrackedCalendarEvent.event_id(calendar_id, event) 

165 self.event: CalendarEvent = event 

166 self.arming_state: AlarmControlPanelState = arming_state 

167 self.start_listener: Callable | None = None 

168 self.end_listener: Callable | None = None 

169 self.armer = armer 

170 self.previous_state: AlarmControlPanelState | None = armer.armed_state() 

171 self.track_status: str = "pending" 

172 

173 async def initialize(self) -> None: 

174 

175 if self.event.end_datetime_local < self.tracked_at: 

176 _LOGGER.debug("AUTOARM Ignoring past event") 

177 self.track_status = "ended" 

178 return 

179 if self.event.start_datetime_local > self.tracked_at: 

180 self.start_listener = async_track_point_in_time( 

181 self.armer.hass, 

182 partial(self.armer.on_calendar_event_start, self), 

183 self.event.start_datetime_local, 

184 ) 

185 else: 

186 await self.armer.on_calendar_event_start(self, dt_util.now()) 

187 self.track_status = "started" 

188 if self.event.end_datetime_local > self.tracked_at: 

189 self.end_listener = async_track_point_in_time( 

190 self.armer.hass, 

191 self.end, 

192 self.event.end_datetime_local, 

193 ) 

194 _LOGGER.debug("AUTOARM Now tracking %s event %s, %s", self.calendar_id, self.event.uid, self.event.summary) 

195 

196 async def end(self, event_time: dt.datetime) -> None: 

197 """Handle an event that has reached its finish date and time""" 

198 _LOGGER.debug("AUTOARM Calendar event %s ended, event_time: %s", self.id, event_time) 

199 self.track_status = "ended" 

200 await self.armer.on_calendar_event_end(self, dt_util.now()) 

201 self.shutdown() 

202 

203 @classmethod 

204 def event_id(cls, calendar_id: str, event: CalendarEvent) -> str: 

205 """Generate an ID for the calendar even if it doesn't natively support `uid`""" 

206 uid = event.uid or str(hash((event.summary, event.description, event.start.isoformat(), event.end.isoformat()))) 

207 return f"{calendar_id}:{uid}" 

208 

209 def is_current(self) -> bool: 

210 if self.track_status == "ended": 

211 return False 

212 now_local: dt.datetime = dt_util.now() 

213 return now_local >= self.event.start_datetime_local and now_local <= self.event.end_datetime_local 

214 

215 def is_future(self) -> bool: 

216 if self.track_status == "ended": 

217 return False 

218 now_local: dt.datetime = dt_util.now() 

219 return self.event.start_datetime_local > now_local 

220 

221 def shutdown(self) -> None: 

222 unlisten(self.start_listener) 

223 self.start_listener = None 

224 unlisten(self.end_listener) 

225 self.end_listener = None 

226 

227 def __eq__(self, other: object) -> bool: 

228 """Compare two events based on underlying calendar event""" 

229 if not isinstance(other, TrackedCalendarEvent): 

230 return False 

231 return self.event.uid == other.event.uid