Coverage for custom_components/autoarm/calendar_events.py: 94%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-27 16:32 +0000

1import datetime as dt 

2import logging 

3import re 

4from collections.abc import Callable 

5from typing import TYPE_CHECKING, cast 

6 

7import homeassistant.util.dt as dt_util 

8from homeassistant.auth import HomeAssistant 

9from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState 

10from homeassistant.components.calendar import CalendarEntity, CalendarEvent 

11from homeassistant.const import CONF_ALIAS, CONF_ENTITY_ID 

12from homeassistant.helpers import entity_platform 

13from homeassistant.helpers.event import ( 

14 async_track_point_in_time, 

15 async_track_utc_time_change, 

16) 

17from homeassistant.helpers.typing import ConfigType 

18 

19from custom_components.autoarm.helpers import AppHealthTracker, alarm_state_as_enum 

20 

21from .const import ( 

22 ALARM_STATES, 

23 CONF_CALENDAR_EVENT_STATES, 

24 CONF_CALENDAR_POLL_INTERVAL, 

25 DOMAIN, 

26 NO_CAL_EVENT_MODE_AUTO, 

27 ChangeSource, 

28) 

29 

30if TYPE_CHECKING: 

31 from homeassistant.core import CALLBACK_TYPE 

32 

33_LOGGER = logging.getLogger(__name__) 

34 

35 

36def unlisten(listener: Callable[[], None] | None) -> None: 

37 if listener: 

38 try: 

39 listener() 

40 except Exception as e: 

41 _LOGGER.debug("AUTOARM Failure closing calendar listener %s: %s", listener, e) 

42 

43 

44class TrackedCalendarEvent: 

45 """Generate alarm state changes for a Home Assistant Calendar event""" 

46 

47 def __init__( 

48 self, 

49 calendar_id: str, 

50 event: CalendarEvent, 

51 arming_state: AlarmControlPanelState, 

52 no_event_mode: str | None, 

53 armer: "AlarmArmer", # type: ignore # noqa: F821 

54 hass: HomeAssistant, 

55 ) -> None: 

56 self.tracked_at: dt.datetime = dt_util.now() 

57 self.calendar_id: str = calendar_id 

58 self.id: str = TrackedCalendarEvent.event_id(calendar_id, event) 

59 self.event: CalendarEvent = event 

60 self.no_event_mode: str | None = no_event_mode 

61 self.arming_state: AlarmControlPanelState = arming_state 

62 self.start_listener: CALLBACK_TYPE | None = None 

63 self.end_listener: CALLBACK_TYPE | None = None 

64 self.armer: AlarmArmer = armer # type: ignore # noqa: F821 

65 self.hass: HomeAssistant = hass 

66 self.previous_state: AlarmControlPanelState | None = armer.armed_state() 

67 self.track_status: str = "pending" 

68 

69 async def initialize(self) -> None: 

70 if self.event.end_datetime_local < self.tracked_at: 

71 _LOGGER.debug("AUTOARM Ignoring past event") 

72 self.track_status = "ended" 

73 return 

74 if self.event.start_datetime_local > self.tracked_at: 

75 self.start_listener = async_track_point_in_time( 

76 self.hass, 

77 self.on_calendar_event_start, 

78 self.event.start_datetime_local, 

79 ) 

80 else: 

81 await self.on_calendar_event_start(dt_util.now()) 

82 self.track_status = "started" 

83 if self.event.end_datetime_local > self.tracked_at: 

84 self.end_listener = async_track_point_in_time( 

85 self.hass, 

86 self.end, 

87 self.event.end_datetime_local, 

88 ) 

89 _LOGGER.debug("AUTOARM Now tracking %s event %s, %s", self.calendar_id, self.event.uid, self.event.summary) 

90 

91 async def end(self, event_time: dt.datetime) -> None: 

92 """Handle an event that has reached its finish date and time""" 

93 _LOGGER.debug("AUTOARM Calendar event %s ended, event_time: %s", self.id, event_time) 

94 self.track_status = "ended" 

95 await self.on_calendar_event_end(dt_util.now()) 

96 self.shutdown() 

97 

98 async def update(self, new_event: CalendarEvent) -> None: 

99 _LOGGER.debug("AUTOARM Calendar event updated for %s: %s", self.id, self.event.summary) 

100 was_current = self.is_current() 

101 self.event = new_event 

102 if not self.is_current() and was_current: 

103 await self.end(dt_util.now()) 

104 

105 async def remove(self) -> None: 

106 _LOGGER.debug("AUTOARM Calendar event deletion for %s: %s", self.id, self.event.summary) 

107 if self.track_status == "started": 

108 await self.end(dt_util.now()) 

109 else: 

110 self.track_status = "ended" 

111 

112 async def on_calendar_event_start(self, triggered_at: dt.datetime) -> None: 

113 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", self.id, triggered_at) 

114 target_state: AlarmControlPanelState = self.arming_state 

115 new_state: AlarmControlPanelState | None = None 

116 overridden: bool = False 

117 

118 # optionally allow occupancy overrides of recurring events 

119 overridable_event: bool = str(self.arming_state) in self.armer.calendar_occupancy_override_states 

120 if self.is_recurring() and overridable_event: 

121 occupied = self.armer.is_occupied() 

122 current_state: AlarmControlPanelState = self.armer.armed_state() 

123 if ( 

124 occupied is False 

125 and target_state 

126 in (AlarmControlPanelState.DISARMED, AlarmControlPanelState.ARMED_HOME, AlarmControlPanelState.ARMED_NIGHT) 

127 and current_state in (AlarmControlPanelState.ARMED_AWAY) 

128 ): 

129 _LOGGER.debug( 

130 "AUTOARM Calendar event %s: empty occupancy override cancelling calendar change to %s", 

131 self.id, 

132 self.arming_state, 

133 ) 

134 overridden = True 

135 elif ( 

136 occupied is True 

137 and target_state in (AlarmControlPanelState.ARMED_AWAY) 

138 and current_state 

139 in (AlarmControlPanelState.DISARMED, AlarmControlPanelState.ARMED_HOME, AlarmControlPanelState.ARMED_NIGHT) 

140 ): 

141 _LOGGER.debug( 

142 "AUTOARM Calendar event %s: present occupancy override cancelling calendar change to %s", 

143 self.id, 

144 self.arming_state, 

145 ) 

146 overridden = True 

147 

148 if overridden: 

149 _LOGGER.info("AUTOARM Calendar arming to %s overridden by occupancy", target_state) 

150 else: 

151 new_state = await self.armer.arm( 

152 arming_state=target_state, 

153 source=ChangeSource.CALENDAR, 

154 change_context={ 

155 "caller": "calendar.on_calendar_event_start", 

156 "calendar_id": self.calendar_id, 

157 "event_id": self.id, 

158 "recurring": self.is_recurring(), 

159 "overridable_event": overridable_event, 

160 }, 

161 ) 

162 self.hass.states.async_set( 

163 f"sensor.{DOMAIN}_last_calendar_event", 

164 new_state=self.event.summary or str(self.id), 

165 attributes={ 

166 "calendar": self.calendar_id, 

167 "start": self.event.start_datetime_local, 

168 "end": self.event.end_datetime_local, 

169 "summary": self.event.summary, 

170 "description": self.event.description, 

171 "uid": self.event.uid, 

172 "new_state": new_state, 

173 "overridden": overridden, 

174 }, 

175 ) 

176 

177 async def on_calendar_event_end(self, ended_at: dt.datetime) -> None: 

178 _LOGGER.debug("AUTOARM on_calendar_event_end(%s,%s)", self.id, ended_at) 

179 if self.armer.has_active_calendar_event(): 

180 _LOGGER.debug("AUTOARM No action on event end since other cal event active") 

181 return 

182 if self.no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

183 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", self.id) 

184 # avoid having state locked in vacation by state calculator by moving via 'Pending' 

185 await self.armer.pending_state( 

186 source=ChangeSource.CALENDAR, 

187 change_context={ 

188 "caller": "calendar.on_calendar_event_end", 

189 "calendar_id": self.calendar_id, 

190 "event_id": self.id, 

191 "no_event_mode": self.no_event_mode, 

192 }, 

193 ) 

194 await self.armer.reset_armed_state(source=ChangeSource.CALENDAR) 

195 elif self.no_event_mode in AlarmControlPanelState: 

196 _LOGGER.info("AUTOARM Calendar event %s ended, and returning to fixed state %s", self.id, self.no_event_mode) 

197 await self.armer.arm( 

198 alarm_state_as_enum(self.no_event_mode), 

199 source=ChangeSource.CALENDAR, 

200 change_context={ 

201 "caller": "calendar.on_calendar_event_end", 

202 "calendar_id": self.calendar_id, 

203 "event_id": self.id, 

204 "no_event_mode": self.no_event_mode, 

205 }, 

206 ) 

207 else: 

208 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode") 

209 await self.armer.arm( 

210 self.previous_state, 

211 source=ChangeSource.CALENDAR, 

212 change_context={ 

213 "caller": "calendar.on_calendar_event_end", 

214 "calendar_id": self.calendar_id, 

215 "event_id": self.id, 

216 "no_event_mode": self.no_event_mode, 

217 }, 

218 ) 

219 

220 @classmethod 

221 def event_id(cls, calendar_id: str, event: CalendarEvent) -> str: 

222 """Generate an ID for the calendar even if it doesn't natively support `uid`""" 

223 uid = event.uid or str(hash((event.summary, event.description, event.start.isoformat(), event.end.isoformat()))) 

224 return f"{calendar_id}:{uid}" 

225 

226 def is_current(self) -> bool: 

227 if self.track_status == "ended": 

228 return False 

229 now_local: dt.datetime = dt_util.now() 

230 return now_local >= self.event.start_datetime_local and now_local <= self.event.end_datetime_local 

231 

232 def is_recurring(self) -> bool: 

233 return self.event.recurrence_id is not None 

234 

235 def is_future(self) -> bool: 

236 if self.track_status == "ended": 

237 return False 

238 now_local: dt.datetime = dt_util.now() 

239 return self.event.start_datetime_local > now_local 

240 

241 def shutdown(self) -> None: 

242 unlisten(self.start_listener) 

243 self.start_listener = None 

244 unlisten(self.end_listener) 

245 self.end_listener = None 

246 

247 def __eq__(self, other: object) -> bool: 

248 """Compare two events based on underlying calendar event""" 

249 if not isinstance(other, TrackedCalendarEvent): 

250 return False 

251 return self.event.uid == other.event.uid 

252 

253 

254class TrackedCalendar: 

255 """Listener for a Home Assistant Calendar""" 

256 

257 def __init__( 

258 self, 

259 hass: HomeAssistant, 

260 calendar_config: ConfigType, 

261 no_event_mode: str | None, 

262 armer: "AlarmArmer", # type: ignore # noqa: F821 

263 app_health_tracker: AppHealthTracker, 

264 ) -> None: 

265 self.enabled = False 

266 self.armer = armer 

267 self.app_health_tracker: AppHealthTracker = app_health_tracker 

268 self.hass: HomeAssistant = hass 

269 self.no_event_mode: str | None = no_event_mode 

270 self.alias: str = cast("str", calendar_config.get(CONF_ALIAS, "")) 

271 self.entity_id: str = cast("str", calendar_config.get(CONF_ENTITY_ID)) 

272 self.poll_interval: int = calendar_config.get(CONF_CALENDAR_POLL_INTERVAL, 30) 

273 self.state_mappings: dict[str, list[str]] = cast( 

274 "dict[str, list[str]]", calendar_config.get(CONF_CALENDAR_EVENT_STATES) 

275 ) 

276 # self.notify_on_change: str = calendar_config.get(CONF_CALENDAR_ENTRY_NOTIFICATIONS, ENTRY_NOTIFICATION_MATCHED) 

277 self.tracked_events: dict[str, TrackedCalendarEvent] = {} 

278 self.poller_listener: CALLBACK_TYPE | None = None 

279 

280 async def initialize(self, calendar_platform: entity_platform.EntityPlatform) -> None: 

281 try: 

282 calendar_entity: CalendarEntity | None = cast( 

283 "CalendarEntity|None", calendar_platform.domain_entities.get(self.entity_id) 

284 ) 

285 if calendar_entity is None: 

286 self.app_health_tracker.record_initialization_error("calendar_setup") 

287 _LOGGER.warning("AUTOARM Unable to access calendar %s", self.entity_id) 

288 else: 

289 self.calendar_entity = calendar_entity 

290 _LOGGER.info( 

291 "AUTOARM Configured calendar %s from %s, polling every %s minutes", 

292 self.entity_id, 

293 calendar_platform.platform_name, 

294 self.poll_interval, 

295 ) 

296 self.poller_listener = async_track_utc_time_change( 

297 self.hass, 

298 self.on_timed_poll, 

299 "*", 

300 minute=f"/{self.poll_interval}", 

301 second=0, 

302 local=True, 

303 ) 

304 self.enabled = True 

305 # force an initial poll 

306 await self.match_events() 

307 

308 except Exception as _e: 

309 self.app_health_tracker.record_runtime_error() 

310 _LOGGER.exception("AUTOARM Failed to initialize calendar entity %s", self.entity_id) 

311 

312 def shutdown(self) -> None: 

313 unlisten(self.poller_listener) 

314 self.poller_listener = None 

315 for tracked_event in self.tracked_events.values(): 

316 tracked_event.shutdown() 

317 self.enabled = False 

318 self.tracked_events.clear() 

319 

320 async def on_timed_poll(self, _called_time: dt.datetime) -> None: 

321 """Check for new and dead events, entry point for the timed calendar tracker listener""" 

322 _LOGGER.debug("AUTOARM Calendar Poll") 

323 await self.match_events() 

324 await self.prune_events() 

325 

326 def has_active_event(self) -> bool: 

327 """Is there any event matching a state pattern that is currently open""" 

328 return any(tevent.is_current() for tevent in self.tracked_events.values()) 

329 

330 def active_events(self) -> list[TrackedCalendarEvent]: 

331 """List all the events matching a state pattern that are currently open""" 

332 return [v for v in self.tracked_events.values() if v.is_current()] 

333 

334 def match_event(self, summary: str | None, description: str | None) -> str | None: 

335 for state_str in ALARM_STATES: 

336 if summary and (state_str.upper() in summary): 

337 return state_str 

338 if description and (state_str.upper() in description): 

339 return state_str 

340 for state_str, patterns in self.state_mappings.items(): 

341 if ( 

342 summary 

343 and any( 

344 re.search( 

345 patt, 

346 summary, 

347 ) 

348 for patt in patterns 

349 ) 

350 ) or ( 

351 description 

352 and any( 

353 re.search( 

354 patt, 

355 description, 

356 ) 

357 for patt in patterns 

358 ) 

359 ): 

360 return state_str 

361 return None 

362 

363 async def match_events(self) -> None: 

364 """Query the calendar for events that match state patterns""" 

365 now_local = dt_util.now() 

366 start_dt = now_local - dt.timedelta(minutes=15) 

367 end_dt = now_local + dt.timedelta(minutes=self.poll_interval + 5) 

368 

369 events: list[CalendarEvent] = await self.calendar_entity.async_get_events(self.hass, start_dt, end_dt) 

370 

371 for event in events: 

372 # presume the events are sorted by start time 

373 event_id = TrackedCalendarEvent.event_id(self.calendar_entity.entity_id, event) 

374 _LOGGER.debug("AUTOARM Calendar Event: %s [%s]", event.summary, event_id) 

375 

376 state_str: str | None = self.match_event(event.summary, event.description) 

377 if state_str is None: 

378 if event_id in self.tracked_events: 

379 existing_event: TrackedCalendarEvent = self.tracked_events[event_id] 

380 _LOGGER.info( 

381 "AUTOARM Calendar %s found updated event %s no longer matching", 

382 self.calendar_entity.entity_id, 

383 event.summary, 

384 ) 

385 await existing_event.remove() 

386 else: 

387 _LOGGER.debug("AUTOARM Ignoring untracked unmatched event") 

388 else: 

389 if event_id not in self.tracked_events: 

390 state: AlarmControlPanelState | None = alarm_state_as_enum(state_str) 

391 if state is None: 

392 _LOGGER.warning( 

393 "AUTOARM Calendar %s found event %s for invalid state %s", 

394 self.calendar_entity.entity_id, 

395 event.summary, 

396 state_str, 

397 ) 

398 else: 

399 _LOGGER.debug( 

400 "AUTOARM Calendar %s matched event %s for state %s", 

401 self.calendar_entity.entity_id, 

402 event.summary, 

403 state_str, 

404 ) 

405 

406 self.tracked_events[event_id] = TrackedCalendarEvent( 

407 self.calendar_entity.entity_id, 

408 event=event, 

409 arming_state=state, 

410 no_event_mode=self.no_event_mode, 

411 armer=self.armer, 

412 hass=self.hass, 

413 ) 

414 await self.tracked_events[event_id].initialize() 

415 else: 

416 existing_event = self.tracked_events[event_id] 

417 if existing_event.event != event: 

418 _LOGGER.info( 

419 "AUTOARM Calendar %s found updated event %s for state %s", 

420 self.calendar_entity.entity_id, 

421 event.summary, 

422 state_str, 

423 ) 

424 await existing_event.update(event) 

425 else: 

426 _LOGGER.debug("AUTOARM No change to previously tracked event") 

427 

428 async def prune_events(self) -> None: 

429 """Remove past events""" 

430 to_remove: list[str] = [] 

431 min_start: dt.datetime | None = None 

432 max_end: dt.datetime | None = None 

433 for event_id, tevent in self.tracked_events.items(): 

434 if min_start is None or min_start > tevent.event.start_datetime_local: 

435 min_start = tevent.event.start_datetime_local 

436 if max_end is None or max_end < tevent.event.end_datetime_local: 

437 max_end = tevent.event.end_datetime_local 

438 if not tevent.is_current() and not tevent.is_future(): 

439 _LOGGER.debug("AUTOARM Pruning expire calendar event: %s", tevent.event.uid) 

440 to_remove.append(event_id) 

441 await tevent.end(dt_util.now()) 

442 

443 if min_start and max_end: 

444 live_event_ids: list[str] = [ 

445 e.uid for e in await self.calendar_entity.async_get_events(self.hass, min_start, max_end) if e.uid is not None 

446 ] 

447 for tevent in self.tracked_events.values(): 

448 if tevent.event.uid not in live_event_ids: 

449 _LOGGER.debug("AUTOARM Pruning dead calendar event: %s", tevent.event.uid) 

450 await tevent.remove() 

451 to_remove.append(tevent.id) 

452 for event_id in to_remove: 

453 del self.tracked_events[event_id]