Coverage for custom_components/autoarm/calendar.py: 93%

206 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-17 01:14 +0000

1import datetime as dt 

2import logging 

3import re 

4from collections.abc import Callable 

5from typing import TYPE_CHECKING, cast 

6 

7import homeassistant.util.dt as dt_util 

8from homeassistant.auth import HomeAssistant 

9from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState 

10from homeassistant.components.calendar import CalendarEntity, CalendarEvent 

11from homeassistant.const import CONF_ALIAS, CONF_ENTITY_ID 

12from homeassistant.helpers import entity_platform 

13from homeassistant.helpers.event import ( 

14 async_track_point_in_time, 

15 async_track_utc_time_change, 

16) 

17from homeassistant.helpers.typing import ConfigType 

18 

19from custom_components.autoarm.helpers import AppHealthTracker, alarm_state_as_enum 

20 

21from .const import ( 

22 ALARM_STATES, 

23 CONF_CALENDAR_EVENT_STATES, 

24 CONF_CALENDAR_POLL_INTERVAL, 

25 DOMAIN, 

26 NO_CAL_EVENT_MODE_AUTO, 

27 ChangeSource, 

28) 

29 

30if TYPE_CHECKING: 

31 from homeassistant.core import CALLBACK_TYPE 

32 

33_LOGGER = logging.getLogger(__name__) 

34 

35 

36def unlisten(listener: Callable[[], None] | None) -> None: 

37 if listener: 

38 try: 

39 listener() 

40 except Exception as e: 

41 _LOGGER.debug("AUTOARM Failure closing calendar listener %s: %s", listener, e) 

42 

43 

44class TrackedCalendar: 

45 """Listener for a Home Assistant Calendar""" 

46 

47 def __init__( 

48 self, 

49 hass: HomeAssistant, 

50 calendar_config: ConfigType, 

51 no_event_mode: str | None, 

52 armer: "AlarmArmer", # type: ignore # noqa: F821 

53 app_health_tracker: AppHealthTracker, 

54 ) -> None: 

55 self.enabled = False 

56 self.armer = armer 

57 self.app_health_tracker: AppHealthTracker = app_health_tracker 

58 self.hass: HomeAssistant = hass 

59 self.no_event_mode: str | None = no_event_mode 

60 self.alias: str = cast("str", calendar_config.get(CONF_ALIAS, "")) 

61 self.entity_id: str = cast("str", calendar_config.get(CONF_ENTITY_ID)) 

62 self.poll_interval: int = calendar_config.get(CONF_CALENDAR_POLL_INTERVAL, 30) 

63 self.state_mappings: dict[str, list[str]] = cast("dict", calendar_config.get(CONF_CALENDAR_EVENT_STATES)) 

64 # self.notify_on_change: str = calendar_config.get(CONF_CALENDAR_ENTRY_NOTIFICATIONS, ENTRY_NOTIFICATION_MATCHED) 

65 self.tracked_events: dict[str, TrackedCalendarEvent] = {} 

66 self.poller_listener: CALLBACK_TYPE | None = None 

67 

68 async def initialize(self, calendar_platform: entity_platform.EntityPlatform) -> None: 

69 try: 

70 calendar_entity: CalendarEntity | None = cast( 

71 "CalendarEntity|None", calendar_platform.domain_entities.get(self.entity_id) 

72 ) 

73 if calendar_entity is None: 

74 self.app_health_tracker.record_initialization_error("calendar_setup") 

75 _LOGGER.warning("AUTOARM Unable to access calendar %s", self.entity_id) 

76 else: 

77 self.calendar_entity = calendar_entity 

78 _LOGGER.info( 

79 "AUTOARM Configured calendar %s from %s, polling every %s minutes", 

80 self.entity_id, 

81 calendar_platform.platform_name, 

82 self.poll_interval, 

83 ) 

84 self.poller_listener = async_track_utc_time_change( 

85 self.hass, 

86 self.on_timed_poll, 

87 "*", 

88 minute=f"/{self.poll_interval}", 

89 second=0, 

90 local=True, 

91 ) 

92 self.enabled = True 

93 # force an initial poll 

94 await self.match_events() 

95 

96 except Exception as _e: 

97 self.app_health_tracker.record_runtime_error() 

98 _LOGGER.exception("AUTOARM Failed to initialize calendar entity %s", self.entity_id) 

99 

100 def shutdown(self) -> None: 

101 unlisten(self.poller_listener) 

102 self.poller_listener = None 

103 for tracked_event in self.tracked_events.values(): 

104 tracked_event.shutdown() 

105 self.enabled = False 

106 self.tracked_events.clear() 

107 

108 async def on_timed_poll(self, _called_time: dt.datetime) -> None: 

109 """Check for new and dead events, entry point for the timed calendar tracker listener""" 

110 _LOGGER.debug("AUTOARM Calendar Poll") 

111 await self.match_events() 

112 await self.prune_events() 

113 

114 def has_active_event(self) -> bool: 

115 """Is there any event matching a state pattern that is currently open""" 

116 return any(tevent.is_current() for tevent in self.tracked_events.values()) 

117 

118 def active_events(self) -> list[CalendarEvent]: 

119 """List all the events matching a state pattern that are currently open""" 

120 return [v.event for v in self.tracked_events.values() if v.is_current()] 

121 

122 def match_event(self, summary: str | None, description: str | None) -> str | None: 

123 for state_str in ALARM_STATES: 

124 if summary and (state_str.upper() in summary): 

125 return state_str 

126 if description and (state_str.upper() in description): 

127 return state_str 

128 for state_str, patterns in self.state_mappings.items(): 

129 if ( 

130 summary 

131 and any( 

132 re.search( 

133 patt, 

134 summary, 

135 ) 

136 for patt in patterns 

137 ) 

138 ) or ( 

139 description 

140 and any( 

141 re.search( 

142 patt, 

143 description, 

144 ) 

145 for patt in patterns 

146 ) 

147 ): 

148 return state_str 

149 return None 

150 

151 async def match_events(self) -> None: 

152 """Query the calendar for events that match state patterns""" 

153 now_local = dt_util.now() 

154 start_dt = now_local - dt.timedelta(minutes=15) 

155 end_dt = now_local + dt.timedelta(minutes=self.poll_interval + 5) 

156 

157 events: list[CalendarEvent] = await self.calendar_entity.async_get_events(self.hass, start_dt, end_dt) 

158 

159 for event in events: 

160 # presume the events are sorted by start time 

161 event_id = TrackedCalendarEvent.event_id(self.calendar_entity.entity_id, event) 

162 _LOGGER.debug("AUTOARM Calendar Event: %s [%s]", event.summary, event_id) 

163 

164 state_str: str | None = self.match_event(event.summary, event.description) 

165 if state_str is None: 

166 if event_id in self.tracked_events: 

167 existing_event: TrackedCalendarEvent = self.tracked_events[event_id] 

168 _LOGGER.info( 

169 "AUTOARM Calendar %s found updated event %s no longer matching", 

170 self.calendar_entity.entity_id, 

171 event.summary, 

172 ) 

173 await existing_event.remove() 

174 else: 

175 _LOGGER.debug("AUTOARM Ignoring untracked unmatched event") 

176 else: 

177 if event_id not in self.tracked_events: 

178 state: AlarmControlPanelState | None = alarm_state_as_enum(state_str) 

179 if state is None: 

180 _LOGGER.warning( 

181 "AUTOARM Calendar %s found event %s for invalid state %s", 

182 self.calendar_entity.entity_id, 

183 event.summary, 

184 state_str, 

185 ) 

186 else: 

187 _LOGGER.debug( 

188 "AUTOARM Calendar %s matched event %s for state %s", 

189 self.calendar_entity.entity_id, 

190 event.summary, 

191 state_str, 

192 ) 

193 

194 self.tracked_events[event_id] = TrackedCalendarEvent( 

195 self.calendar_entity.entity_id, 

196 event=event, 

197 arming_state=state, 

198 no_event_mode=self.no_event_mode, 

199 armer=self.armer, 

200 hass=self.hass, 

201 ) 

202 await self.tracked_events[event_id].initialize() 

203 else: 

204 existing_event = self.tracked_events[event_id] 

205 if existing_event.event != event: 

206 _LOGGER.info( 

207 "AUTOARM Calendar %s found updated event %s for state %s", 

208 self.calendar_entity.entity_id, 

209 event.summary, 

210 state_str, 

211 ) 

212 await existing_event.update(event) 

213 else: 

214 _LOGGER.debug("AUTOARM No change to previously tracked event") 

215 

216 async def prune_events(self) -> None: 

217 """Remove past events""" 

218 to_remove: list[str] = [] 

219 min_start: dt.datetime | None = None 

220 max_end: dt.datetime | None = None 

221 for event_id, tevent in self.tracked_events.items(): 

222 if min_start is None or min_start > tevent.event.start_datetime_local: 

223 min_start = tevent.event.start_datetime_local 

224 if max_end is None or max_end < tevent.event.end_datetime_local: 

225 max_end = tevent.event.end_datetime_local 

226 if not tevent.is_current() and not tevent.is_future(): 

227 _LOGGER.debug("AUTOARM Pruning expire calendar event: %s", tevent.event.uid) 

228 to_remove.append(event_id) 

229 await tevent.end(dt_util.now()) 

230 

231 if min_start and max_end: 

232 live_event_ids: list[str] = [ 

233 e.uid for e in await self.calendar_entity.async_get_events(self.hass, min_start, max_end) if e.uid is not None 

234 ] 

235 for tevent in self.tracked_events.values(): 

236 if tevent.event.uid not in live_event_ids: 

237 _LOGGER.debug("AUTOARM Pruning dead calendar event: %s", tevent.event.uid) 

238 await tevent.remove() 

239 to_remove.append(tevent.id) 

240 for event_id in to_remove: 

241 del self.tracked_events[event_id] 

242 

243 

244class TrackedCalendarEvent: 

245 """Generate alarm state changes for a Home Assistant Calendar event""" 

246 

247 def __init__( 

248 self, 

249 calendar_id: str, 

250 event: CalendarEvent, 

251 arming_state: AlarmControlPanelState, 

252 no_event_mode: str | None, 

253 armer: "AlarmArmer", # type: ignore # noqa: F821 

254 hass: HomeAssistant, 

255 ) -> None: 

256 self.tracked_at: dt.datetime = dt_util.now() 

257 self.calendar_id: str = calendar_id 

258 self.id: str = TrackedCalendarEvent.event_id(calendar_id, event) 

259 self.event: CalendarEvent = event 

260 self.no_event_mode: str | None = no_event_mode 

261 self.arming_state: AlarmControlPanelState = arming_state 

262 self.start_listener: Callable | None = None 

263 self.end_listener: Callable | None = None 

264 self.armer: AlarmArmer = armer # type: ignore # noqa: F821 

265 self.hass: HomeAssistant = hass 

266 self.previous_state: AlarmControlPanelState | None = armer.armed_state() 

267 self.track_status: str = "pending" 

268 

269 async def initialize(self) -> None: 

270 if self.event.end_datetime_local < self.tracked_at: 

271 _LOGGER.debug("AUTOARM Ignoring past event") 

272 self.track_status = "ended" 

273 return 

274 if self.event.start_datetime_local > self.tracked_at: 

275 self.start_listener = async_track_point_in_time( 

276 self.hass, 

277 self.on_calendar_event_start, 

278 self.event.start_datetime_local, 

279 ) 

280 else: 

281 await self.on_calendar_event_start(dt_util.now()) 

282 self.track_status = "started" 

283 if self.event.end_datetime_local > self.tracked_at: 

284 self.end_listener = async_track_point_in_time( 

285 self.hass, 

286 self.end, 

287 self.event.end_datetime_local, 

288 ) 

289 _LOGGER.debug("AUTOARM Now tracking %s event %s, %s", self.calendar_id, self.event.uid, self.event.summary) 

290 

291 async def end(self, event_time: dt.datetime) -> None: 

292 """Handle an event that has reached its finish date and time""" 

293 _LOGGER.debug("AUTOARM Calendar event %s ended, event_time: %s", self.id, event_time) 

294 self.track_status = "ended" 

295 await self.on_calendar_event_end(dt_util.now()) 

296 self.shutdown() 

297 

298 async def update(self, new_event: CalendarEvent) -> None: 

299 _LOGGER.debug("AUTOARM Calendar event updated for %s: %s", self.id, self.event.summary) 

300 was_current = self.is_current() 

301 self.event = new_event 

302 if not self.is_current() and was_current: 

303 await self.end(dt_util.now()) 

304 

305 async def remove(self) -> None: 

306 _LOGGER.debug("AUTOARM Calendar event deletion for %s: %s", self.id, self.event.summary) 

307 if self.track_status == "started": 

308 await self.end(dt_util.now()) 

309 else: 

310 self.track_status = "ended" 

311 

312 async def on_calendar_event_start(self, triggered_at: dt.datetime) -> None: 

313 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", self.id, triggered_at) 

314 new_state = await self.armer.arm(arming_state=self.arming_state, source=ChangeSource.CALENDAR) 

315 self.hass.states.async_set( 

316 f"sensor.{DOMAIN}_last_calendar_event", 

317 new_state=self.event.summary or str(self.id), 

318 attributes={ 

319 "calendar": self.calendar_id, 

320 "start": self.event.start_datetime_local, 

321 "end": self.event.end_datetime_local, 

322 "summary": self.event.summary, 

323 "description": self.event.description, 

324 "uid": self.event.uid, 

325 "new_state": new_state, 

326 }, 

327 ) 

328 

329 async def on_calendar_event_end(self, ended_at: dt.datetime) -> None: 

330 _LOGGER.debug("AUTOARM on_calendar_event_end(%s,%s)", self.id, ended_at) 

331 if self.armer.has_active_calendar_event(): 

332 _LOGGER.debug("AUTOARM No action on event end since other cal event active") 

333 return 

334 if self.no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

335 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", self.id) 

336 # avoid having state locked in vacation by state calculator 

337 await self.armer.pending_state(source=ChangeSource.CALENDAR) 

338 await self.armer.reset_armed_state(source=ChangeSource.CALENDAR) 

339 elif self.no_event_mode in AlarmControlPanelState: 

340 _LOGGER.info("AUTOARM Calendar event %s ended, and returning to fixed state %s", self.id, self.no_event_mode) 

341 await self.armer.arm(alarm_state_as_enum(self.no_event_mode), source=ChangeSource.CALENDAR) 

342 else: 

343 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode") 

344 await self.armer.arm(self.previous_state, source=ChangeSource.CALENDAR) 

345 

346 @classmethod 

347 def event_id(cls, calendar_id: str, event: CalendarEvent) -> str: 

348 """Generate an ID for the calendar even if it doesn't natively support `uid`""" 

349 uid = event.uid or str(hash((event.summary, event.description, event.start.isoformat(), event.end.isoformat()))) 

350 return f"{calendar_id}:{uid}" 

351 

352 def is_current(self) -> bool: 

353 if self.track_status == "ended": 

354 return False 

355 now_local: dt.datetime = dt_util.now() 

356 return now_local >= self.event.start_datetime_local and now_local <= self.event.end_datetime_local 

357 

358 def is_future(self) -> bool: 

359 if self.track_status == "ended": 

360 return False 

361 now_local: dt.datetime = dt_util.now() 

362 return self.event.start_datetime_local > now_local 

363 

364 def shutdown(self) -> None: 

365 unlisten(self.start_listener) 

366 self.start_listener = None 

367 unlisten(self.end_listener) 

368 self.end_listener = None 

369 

370 def __eq__(self, other: object) -> bool: 

371 """Compare two events based on underlying calendar event""" 

372 if not isinstance(other, TrackedCalendarEvent): 

373 return False 

374 return self.event.uid == other.event.uid