Coverage for custom_components/autoarm/calendar_events.py: 94%
223 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-27 16:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-27 16:32 +0000
1import datetime as dt
2import logging
3import re
4from collections.abc import Callable
5from typing import TYPE_CHECKING, cast
7import homeassistant.util.dt as dt_util
8from homeassistant.auth import HomeAssistant
9from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState
10from homeassistant.components.calendar import CalendarEntity, CalendarEvent
11from homeassistant.const import CONF_ALIAS, CONF_ENTITY_ID
12from homeassistant.helpers import entity_platform
13from homeassistant.helpers.event import (
14 async_track_point_in_time,
15 async_track_utc_time_change,
16)
17from homeassistant.helpers.typing import ConfigType
19from custom_components.autoarm.helpers import AppHealthTracker, alarm_state_as_enum
21from .const import (
22 ALARM_STATES,
23 CONF_CALENDAR_EVENT_STATES,
24 CONF_CALENDAR_POLL_INTERVAL,
25 DOMAIN,
26 NO_CAL_EVENT_MODE_AUTO,
27 ChangeSource,
28)
30if TYPE_CHECKING:
31 from homeassistant.core import CALLBACK_TYPE
33_LOGGER = logging.getLogger(__name__)
36def unlisten(listener: Callable[[], None] | None) -> None:
37 if listener:
38 try:
39 listener()
40 except Exception as e:
41 _LOGGER.debug("AUTOARM Failure closing calendar listener %s: %s", listener, e)
44class TrackedCalendarEvent:
45 """Generate alarm state changes for a Home Assistant Calendar event"""
47 def __init__(
48 self,
49 calendar_id: str,
50 event: CalendarEvent,
51 arming_state: AlarmControlPanelState,
52 no_event_mode: str | None,
53 armer: "AlarmArmer", # type: ignore # noqa: F821
54 hass: HomeAssistant,
55 ) -> None:
56 self.tracked_at: dt.datetime = dt_util.now()
57 self.calendar_id: str = calendar_id
58 self.id: str = TrackedCalendarEvent.event_id(calendar_id, event)
59 self.event: CalendarEvent = event
60 self.no_event_mode: str | None = no_event_mode
61 self.arming_state: AlarmControlPanelState = arming_state
62 self.start_listener: CALLBACK_TYPE | None = None
63 self.end_listener: CALLBACK_TYPE | None = None
64 self.armer: AlarmArmer = armer # type: ignore # noqa: F821
65 self.hass: HomeAssistant = hass
66 self.previous_state: AlarmControlPanelState | None = armer.armed_state()
67 self.track_status: str = "pending"
69 async def initialize(self) -> None:
70 if self.event.end_datetime_local < self.tracked_at:
71 _LOGGER.debug("AUTOARM Ignoring past event")
72 self.track_status = "ended"
73 return
74 if self.event.start_datetime_local > self.tracked_at:
75 self.start_listener = async_track_point_in_time(
76 self.hass,
77 self.on_calendar_event_start,
78 self.event.start_datetime_local,
79 )
80 else:
81 await self.on_calendar_event_start(dt_util.now())
82 self.track_status = "started"
83 if self.event.end_datetime_local > self.tracked_at:
84 self.end_listener = async_track_point_in_time(
85 self.hass,
86 self.end,
87 self.event.end_datetime_local,
88 )
89 _LOGGER.debug("AUTOARM Now tracking %s event %s, %s", self.calendar_id, self.event.uid, self.event.summary)
91 async def end(self, event_time: dt.datetime) -> None:
92 """Handle an event that has reached its finish date and time"""
93 _LOGGER.debug("AUTOARM Calendar event %s ended, event_time: %s", self.id, event_time)
94 self.track_status = "ended"
95 await self.on_calendar_event_end(dt_util.now())
96 self.shutdown()
98 async def update(self, new_event: CalendarEvent) -> None:
99 _LOGGER.debug("AUTOARM Calendar event updated for %s: %s", self.id, self.event.summary)
100 was_current = self.is_current()
101 self.event = new_event
102 if not self.is_current() and was_current:
103 await self.end(dt_util.now())
105 async def remove(self) -> None:
106 _LOGGER.debug("AUTOARM Calendar event deletion for %s: %s", self.id, self.event.summary)
107 if self.track_status == "started":
108 await self.end(dt_util.now())
109 else:
110 self.track_status = "ended"
112 async def on_calendar_event_start(self, triggered_at: dt.datetime) -> None:
113 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", self.id, triggered_at)
114 target_state: AlarmControlPanelState = self.arming_state
115 new_state: AlarmControlPanelState | None = None
116 overridden: bool = False
118 # optionally allow occupancy overrides of recurring events
119 overridable_event: bool = str(self.arming_state) in self.armer.calendar_occupancy_override_states
120 if self.is_recurring() and overridable_event:
121 occupied = self.armer.is_occupied()
122 current_state: AlarmControlPanelState = self.armer.armed_state()
123 if (
124 occupied is False
125 and target_state
126 in (AlarmControlPanelState.DISARMED, AlarmControlPanelState.ARMED_HOME, AlarmControlPanelState.ARMED_NIGHT)
127 and current_state in (AlarmControlPanelState.ARMED_AWAY)
128 ):
129 _LOGGER.debug(
130 "AUTOARM Calendar event %s: empty occupancy override cancelling calendar change to %s",
131 self.id,
132 self.arming_state,
133 )
134 overridden = True
135 elif (
136 occupied is True
137 and target_state in (AlarmControlPanelState.ARMED_AWAY)
138 and current_state
139 in (AlarmControlPanelState.DISARMED, AlarmControlPanelState.ARMED_HOME, AlarmControlPanelState.ARMED_NIGHT)
140 ):
141 _LOGGER.debug(
142 "AUTOARM Calendar event %s: present occupancy override cancelling calendar change to %s",
143 self.id,
144 self.arming_state,
145 )
146 overridden = True
148 if overridden:
149 _LOGGER.info("AUTOARM Calendar arming to %s overridden by occupancy", target_state)
150 else:
151 new_state = await self.armer.arm(
152 arming_state=target_state,
153 source=ChangeSource.CALENDAR,
154 change_context={
155 "caller": "calendar.on_calendar_event_start",
156 "calendar_id": self.calendar_id,
157 "event_id": self.id,
158 "recurring": self.is_recurring(),
159 "overridable_event": overridable_event,
160 },
161 )
162 self.hass.states.async_set(
163 f"sensor.{DOMAIN}_last_calendar_event",
164 new_state=self.event.summary or str(self.id),
165 attributes={
166 "calendar": self.calendar_id,
167 "start": self.event.start_datetime_local,
168 "end": self.event.end_datetime_local,
169 "summary": self.event.summary,
170 "description": self.event.description,
171 "uid": self.event.uid,
172 "new_state": new_state,
173 "overridden": overridden,
174 },
175 )
177 async def on_calendar_event_end(self, ended_at: dt.datetime) -> None:
178 _LOGGER.debug("AUTOARM on_calendar_event_end(%s,%s)", self.id, ended_at)
179 if self.armer.has_active_calendar_event():
180 _LOGGER.debug("AUTOARM No action on event end since other cal event active")
181 return
182 if self.no_event_mode == NO_CAL_EVENT_MODE_AUTO:
183 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", self.id)
184 # avoid having state locked in vacation by state calculator by moving via 'Pending'
185 await self.armer.pending_state(
186 source=ChangeSource.CALENDAR,
187 change_context={
188 "caller": "calendar.on_calendar_event_end",
189 "calendar_id": self.calendar_id,
190 "event_id": self.id,
191 "no_event_mode": self.no_event_mode,
192 },
193 )
194 await self.armer.reset_armed_state(source=ChangeSource.CALENDAR)
195 elif self.no_event_mode in AlarmControlPanelState:
196 _LOGGER.info("AUTOARM Calendar event %s ended, and returning to fixed state %s", self.id, self.no_event_mode)
197 await self.armer.arm(
198 alarm_state_as_enum(self.no_event_mode),
199 source=ChangeSource.CALENDAR,
200 change_context={
201 "caller": "calendar.on_calendar_event_end",
202 "calendar_id": self.calendar_id,
203 "event_id": self.id,
204 "no_event_mode": self.no_event_mode,
205 },
206 )
207 else:
208 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode")
209 await self.armer.arm(
210 self.previous_state,
211 source=ChangeSource.CALENDAR,
212 change_context={
213 "caller": "calendar.on_calendar_event_end",
214 "calendar_id": self.calendar_id,
215 "event_id": self.id,
216 "no_event_mode": self.no_event_mode,
217 },
218 )
220 @classmethod
221 def event_id(cls, calendar_id: str, event: CalendarEvent) -> str:
222 """Generate an ID for the calendar even if it doesn't natively support `uid`"""
223 uid = event.uid or str(hash((event.summary, event.description, event.start.isoformat(), event.end.isoformat())))
224 return f"{calendar_id}:{uid}"
226 def is_current(self) -> bool:
227 if self.track_status == "ended":
228 return False
229 now_local: dt.datetime = dt_util.now()
230 return now_local >= self.event.start_datetime_local and now_local <= self.event.end_datetime_local
232 def is_recurring(self) -> bool:
233 return self.event.recurrence_id is not None
235 def is_future(self) -> bool:
236 if self.track_status == "ended":
237 return False
238 now_local: dt.datetime = dt_util.now()
239 return self.event.start_datetime_local > now_local
241 def shutdown(self) -> None:
242 unlisten(self.start_listener)
243 self.start_listener = None
244 unlisten(self.end_listener)
245 self.end_listener = None
247 def __eq__(self, other: object) -> bool:
248 """Compare two events based on underlying calendar event"""
249 if not isinstance(other, TrackedCalendarEvent):
250 return False
251 return self.event.uid == other.event.uid
254class TrackedCalendar:
255 """Listener for a Home Assistant Calendar"""
257 def __init__(
258 self,
259 hass: HomeAssistant,
260 calendar_config: ConfigType,
261 no_event_mode: str | None,
262 armer: "AlarmArmer", # type: ignore # noqa: F821
263 app_health_tracker: AppHealthTracker,
264 ) -> None:
265 self.enabled = False
266 self.armer = armer
267 self.app_health_tracker: AppHealthTracker = app_health_tracker
268 self.hass: HomeAssistant = hass
269 self.no_event_mode: str | None = no_event_mode
270 self.alias: str = cast("str", calendar_config.get(CONF_ALIAS, ""))
271 self.entity_id: str = cast("str", calendar_config.get(CONF_ENTITY_ID))
272 self.poll_interval: int = calendar_config.get(CONF_CALENDAR_POLL_INTERVAL, 30)
273 self.state_mappings: dict[str, list[str]] = cast(
274 "dict[str, list[str]]", calendar_config.get(CONF_CALENDAR_EVENT_STATES)
275 )
276 # self.notify_on_change: str = calendar_config.get(CONF_CALENDAR_ENTRY_NOTIFICATIONS, ENTRY_NOTIFICATION_MATCHED)
277 self.tracked_events: dict[str, TrackedCalendarEvent] = {}
278 self.poller_listener: CALLBACK_TYPE | None = None
280 async def initialize(self, calendar_platform: entity_platform.EntityPlatform) -> None:
281 try:
282 calendar_entity: CalendarEntity | None = cast(
283 "CalendarEntity|None", calendar_platform.domain_entities.get(self.entity_id)
284 )
285 if calendar_entity is None:
286 self.app_health_tracker.record_initialization_error("calendar_setup")
287 _LOGGER.warning("AUTOARM Unable to access calendar %s", self.entity_id)
288 else:
289 self.calendar_entity = calendar_entity
290 _LOGGER.info(
291 "AUTOARM Configured calendar %s from %s, polling every %s minutes",
292 self.entity_id,
293 calendar_platform.platform_name,
294 self.poll_interval,
295 )
296 self.poller_listener = async_track_utc_time_change(
297 self.hass,
298 self.on_timed_poll,
299 "*",
300 minute=f"/{self.poll_interval}",
301 second=0,
302 local=True,
303 )
304 self.enabled = True
305 # force an initial poll
306 await self.match_events()
308 except Exception as _e:
309 self.app_health_tracker.record_runtime_error()
310 _LOGGER.exception("AUTOARM Failed to initialize calendar entity %s", self.entity_id)
312 def shutdown(self) -> None:
313 unlisten(self.poller_listener)
314 self.poller_listener = None
315 for tracked_event in self.tracked_events.values():
316 tracked_event.shutdown()
317 self.enabled = False
318 self.tracked_events.clear()
320 async def on_timed_poll(self, _called_time: dt.datetime) -> None:
321 """Check for new and dead events, entry point for the timed calendar tracker listener"""
322 _LOGGER.debug("AUTOARM Calendar Poll")
323 await self.match_events()
324 await self.prune_events()
326 def has_active_event(self) -> bool:
327 """Is there any event matching a state pattern that is currently open"""
328 return any(tevent.is_current() for tevent in self.tracked_events.values())
330 def active_events(self) -> list[TrackedCalendarEvent]:
331 """List all the events matching a state pattern that are currently open"""
332 return [v for v in self.tracked_events.values() if v.is_current()]
334 def match_event(self, summary: str | None, description: str | None) -> str | None:
335 for state_str in ALARM_STATES:
336 if summary and (state_str.upper() in summary):
337 return state_str
338 if description and (state_str.upper() in description):
339 return state_str
340 for state_str, patterns in self.state_mappings.items():
341 if (
342 summary
343 and any(
344 re.search(
345 patt,
346 summary,
347 )
348 for patt in patterns
349 )
350 ) or (
351 description
352 and any(
353 re.search(
354 patt,
355 description,
356 )
357 for patt in patterns
358 )
359 ):
360 return state_str
361 return None
363 async def match_events(self) -> None:
364 """Query the calendar for events that match state patterns"""
365 now_local = dt_util.now()
366 start_dt = now_local - dt.timedelta(minutes=15)
367 end_dt = now_local + dt.timedelta(minutes=self.poll_interval + 5)
369 events: list[CalendarEvent] = await self.calendar_entity.async_get_events(self.hass, start_dt, end_dt)
371 for event in events:
372 # presume the events are sorted by start time
373 event_id = TrackedCalendarEvent.event_id(self.calendar_entity.entity_id, event)
374 _LOGGER.debug("AUTOARM Calendar Event: %s [%s]", event.summary, event_id)
376 state_str: str | None = self.match_event(event.summary, event.description)
377 if state_str is None:
378 if event_id in self.tracked_events:
379 existing_event: TrackedCalendarEvent = self.tracked_events[event_id]
380 _LOGGER.info(
381 "AUTOARM Calendar %s found updated event %s no longer matching",
382 self.calendar_entity.entity_id,
383 event.summary,
384 )
385 await existing_event.remove()
386 else:
387 _LOGGER.debug("AUTOARM Ignoring untracked unmatched event")
388 else:
389 if event_id not in self.tracked_events:
390 state: AlarmControlPanelState | None = alarm_state_as_enum(state_str)
391 if state is None:
392 _LOGGER.warning(
393 "AUTOARM Calendar %s found event %s for invalid state %s",
394 self.calendar_entity.entity_id,
395 event.summary,
396 state_str,
397 )
398 else:
399 _LOGGER.debug(
400 "AUTOARM Calendar %s matched event %s for state %s",
401 self.calendar_entity.entity_id,
402 event.summary,
403 state_str,
404 )
406 self.tracked_events[event_id] = TrackedCalendarEvent(
407 self.calendar_entity.entity_id,
408 event=event,
409 arming_state=state,
410 no_event_mode=self.no_event_mode,
411 armer=self.armer,
412 hass=self.hass,
413 )
414 await self.tracked_events[event_id].initialize()
415 else:
416 existing_event = self.tracked_events[event_id]
417 if existing_event.event != event:
418 _LOGGER.info(
419 "AUTOARM Calendar %s found updated event %s for state %s",
420 self.calendar_entity.entity_id,
421 event.summary,
422 state_str,
423 )
424 await existing_event.update(event)
425 else:
426 _LOGGER.debug("AUTOARM No change to previously tracked event")
428 async def prune_events(self) -> None:
429 """Remove past events"""
430 to_remove: list[str] = []
431 min_start: dt.datetime | None = None
432 max_end: dt.datetime | None = None
433 for event_id, tevent in self.tracked_events.items():
434 if min_start is None or min_start > tevent.event.start_datetime_local:
435 min_start = tevent.event.start_datetime_local
436 if max_end is None or max_end < tevent.event.end_datetime_local:
437 max_end = tevent.event.end_datetime_local
438 if not tevent.is_current() and not tevent.is_future():
439 _LOGGER.debug("AUTOARM Pruning expire calendar event: %s", tevent.event.uid)
440 to_remove.append(event_id)
441 await tevent.end(dt_util.now())
443 if min_start and max_end:
444 live_event_ids: list[str] = [
445 e.uid for e in await self.calendar_entity.async_get_events(self.hass, min_start, max_end) if e.uid is not None
446 ]
447 for tevent in self.tracked_events.values():
448 if tevent.event.uid not in live_event_ids:
449 _LOGGER.debug("AUTOARM Pruning dead calendar event: %s", tevent.event.uid)
450 await tevent.remove()
451 to_remove.append(tevent.id)
452 for event_id in to_remove:
453 del self.tracked_events[event_id]