Coverage for custom_components/autoarm/calendar.py: 93%
206 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-17 01:14 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-17 01:14 +0000
1import datetime as dt
2import logging
3import re
4from collections.abc import Callable
5from typing import TYPE_CHECKING, cast
7import homeassistant.util.dt as dt_util
8from homeassistant.auth import HomeAssistant
9from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState
10from homeassistant.components.calendar import CalendarEntity, CalendarEvent
11from homeassistant.const import CONF_ALIAS, CONF_ENTITY_ID
12from homeassistant.helpers import entity_platform
13from homeassistant.helpers.event import (
14 async_track_point_in_time,
15 async_track_utc_time_change,
16)
17from homeassistant.helpers.typing import ConfigType
19from custom_components.autoarm.helpers import AppHealthTracker, alarm_state_as_enum
21from .const import (
22 ALARM_STATES,
23 CONF_CALENDAR_EVENT_STATES,
24 CONF_CALENDAR_POLL_INTERVAL,
25 DOMAIN,
26 NO_CAL_EVENT_MODE_AUTO,
27 ChangeSource,
28)
30if TYPE_CHECKING:
31 from homeassistant.core import CALLBACK_TYPE
33_LOGGER = logging.getLogger(__name__)
36def unlisten(listener: Callable[[], None] | None) -> None:
37 if listener:
38 try:
39 listener()
40 except Exception as e:
41 _LOGGER.debug("AUTOARM Failure closing calendar listener %s: %s", listener, e)
44class TrackedCalendar:
45 """Listener for a Home Assistant Calendar"""
47 def __init__(
48 self,
49 hass: HomeAssistant,
50 calendar_config: ConfigType,
51 no_event_mode: str | None,
52 armer: "AlarmArmer", # type: ignore # noqa: F821
53 app_health_tracker: AppHealthTracker,
54 ) -> None:
55 self.enabled = False
56 self.armer = armer
57 self.app_health_tracker: AppHealthTracker = app_health_tracker
58 self.hass: HomeAssistant = hass
59 self.no_event_mode: str | None = no_event_mode
60 self.alias: str = cast("str", calendar_config.get(CONF_ALIAS, ""))
61 self.entity_id: str = cast("str", calendar_config.get(CONF_ENTITY_ID))
62 self.poll_interval: int = calendar_config.get(CONF_CALENDAR_POLL_INTERVAL, 30)
63 self.state_mappings: dict[str, list[str]] = cast("dict", calendar_config.get(CONF_CALENDAR_EVENT_STATES))
64 # self.notify_on_change: str = calendar_config.get(CONF_CALENDAR_ENTRY_NOTIFICATIONS, ENTRY_NOTIFICATION_MATCHED)
65 self.tracked_events: dict[str, TrackedCalendarEvent] = {}
66 self.poller_listener: CALLBACK_TYPE | None = None
68 async def initialize(self, calendar_platform: entity_platform.EntityPlatform) -> None:
69 try:
70 calendar_entity: CalendarEntity | None = cast(
71 "CalendarEntity|None", calendar_platform.domain_entities.get(self.entity_id)
72 )
73 if calendar_entity is None:
74 self.app_health_tracker.record_initialization_error("calendar_setup")
75 _LOGGER.warning("AUTOARM Unable to access calendar %s", self.entity_id)
76 else:
77 self.calendar_entity = calendar_entity
78 _LOGGER.info(
79 "AUTOARM Configured calendar %s from %s, polling every %s minutes",
80 self.entity_id,
81 calendar_platform.platform_name,
82 self.poll_interval,
83 )
84 self.poller_listener = async_track_utc_time_change(
85 self.hass,
86 self.on_timed_poll,
87 "*",
88 minute=f"/{self.poll_interval}",
89 second=0,
90 local=True,
91 )
92 self.enabled = True
93 # force an initial poll
94 await self.match_events()
96 except Exception as _e:
97 self.app_health_tracker.record_runtime_error()
98 _LOGGER.exception("AUTOARM Failed to initialize calendar entity %s", self.entity_id)
100 def shutdown(self) -> None:
101 unlisten(self.poller_listener)
102 self.poller_listener = None
103 for tracked_event in self.tracked_events.values():
104 tracked_event.shutdown()
105 self.enabled = False
106 self.tracked_events.clear()
108 async def on_timed_poll(self, _called_time: dt.datetime) -> None:
109 """Check for new and dead events, entry point for the timed calendar tracker listener"""
110 _LOGGER.debug("AUTOARM Calendar Poll")
111 await self.match_events()
112 await self.prune_events()
114 def has_active_event(self) -> bool:
115 """Is there any event matching a state pattern that is currently open"""
116 return any(tevent.is_current() for tevent in self.tracked_events.values())
118 def active_events(self) -> list[CalendarEvent]:
119 """List all the events matching a state pattern that are currently open"""
120 return [v.event for v in self.tracked_events.values() if v.is_current()]
122 def match_event(self, summary: str | None, description: str | None) -> str | None:
123 for state_str in ALARM_STATES:
124 if summary and (state_str.upper() in summary):
125 return state_str
126 if description and (state_str.upper() in description):
127 return state_str
128 for state_str, patterns in self.state_mappings.items():
129 if (
130 summary
131 and any(
132 re.search(
133 patt,
134 summary,
135 )
136 for patt in patterns
137 )
138 ) or (
139 description
140 and any(
141 re.search(
142 patt,
143 description,
144 )
145 for patt in patterns
146 )
147 ):
148 return state_str
149 return None
151 async def match_events(self) -> None:
152 """Query the calendar for events that match state patterns"""
153 now_local = dt_util.now()
154 start_dt = now_local - dt.timedelta(minutes=15)
155 end_dt = now_local + dt.timedelta(minutes=self.poll_interval + 5)
157 events: list[CalendarEvent] = await self.calendar_entity.async_get_events(self.hass, start_dt, end_dt)
159 for event in events:
160 # presume the events are sorted by start time
161 event_id = TrackedCalendarEvent.event_id(self.calendar_entity.entity_id, event)
162 _LOGGER.debug("AUTOARM Calendar Event: %s [%s]", event.summary, event_id)
164 state_str: str | None = self.match_event(event.summary, event.description)
165 if state_str is None:
166 if event_id in self.tracked_events:
167 existing_event: TrackedCalendarEvent = self.tracked_events[event_id]
168 _LOGGER.info(
169 "AUTOARM Calendar %s found updated event %s no longer matching",
170 self.calendar_entity.entity_id,
171 event.summary,
172 )
173 await existing_event.remove()
174 else:
175 _LOGGER.debug("AUTOARM Ignoring untracked unmatched event")
176 else:
177 if event_id not in self.tracked_events:
178 state: AlarmControlPanelState | None = alarm_state_as_enum(state_str)
179 if state is None:
180 _LOGGER.warning(
181 "AUTOARM Calendar %s found event %s for invalid state %s",
182 self.calendar_entity.entity_id,
183 event.summary,
184 state_str,
185 )
186 else:
187 _LOGGER.debug(
188 "AUTOARM Calendar %s matched event %s for state %s",
189 self.calendar_entity.entity_id,
190 event.summary,
191 state_str,
192 )
194 self.tracked_events[event_id] = TrackedCalendarEvent(
195 self.calendar_entity.entity_id,
196 event=event,
197 arming_state=state,
198 no_event_mode=self.no_event_mode,
199 armer=self.armer,
200 hass=self.hass,
201 )
202 await self.tracked_events[event_id].initialize()
203 else:
204 existing_event = self.tracked_events[event_id]
205 if existing_event.event != event:
206 _LOGGER.info(
207 "AUTOARM Calendar %s found updated event %s for state %s",
208 self.calendar_entity.entity_id,
209 event.summary,
210 state_str,
211 )
212 await existing_event.update(event)
213 else:
214 _LOGGER.debug("AUTOARM No change to previously tracked event")
216 async def prune_events(self) -> None:
217 """Remove past events"""
218 to_remove: list[str] = []
219 min_start: dt.datetime | None = None
220 max_end: dt.datetime | None = None
221 for event_id, tevent in self.tracked_events.items():
222 if min_start is None or min_start > tevent.event.start_datetime_local:
223 min_start = tevent.event.start_datetime_local
224 if max_end is None or max_end < tevent.event.end_datetime_local:
225 max_end = tevent.event.end_datetime_local
226 if not tevent.is_current() and not tevent.is_future():
227 _LOGGER.debug("AUTOARM Pruning expire calendar event: %s", tevent.event.uid)
228 to_remove.append(event_id)
229 await tevent.end(dt_util.now())
231 if min_start and max_end:
232 live_event_ids: list[str] = [
233 e.uid for e in await self.calendar_entity.async_get_events(self.hass, min_start, max_end) if e.uid is not None
234 ]
235 for tevent in self.tracked_events.values():
236 if tevent.event.uid not in live_event_ids:
237 _LOGGER.debug("AUTOARM Pruning dead calendar event: %s", tevent.event.uid)
238 await tevent.remove()
239 to_remove.append(tevent.id)
240 for event_id in to_remove:
241 del self.tracked_events[event_id]
244class TrackedCalendarEvent:
245 """Generate alarm state changes for a Home Assistant Calendar event"""
247 def __init__(
248 self,
249 calendar_id: str,
250 event: CalendarEvent,
251 arming_state: AlarmControlPanelState,
252 no_event_mode: str | None,
253 armer: "AlarmArmer", # type: ignore # noqa: F821
254 hass: HomeAssistant,
255 ) -> None:
256 self.tracked_at: dt.datetime = dt_util.now()
257 self.calendar_id: str = calendar_id
258 self.id: str = TrackedCalendarEvent.event_id(calendar_id, event)
259 self.event: CalendarEvent = event
260 self.no_event_mode: str | None = no_event_mode
261 self.arming_state: AlarmControlPanelState = arming_state
262 self.start_listener: Callable | None = None
263 self.end_listener: Callable | None = None
264 self.armer: AlarmArmer = armer # type: ignore # noqa: F821
265 self.hass: HomeAssistant = hass
266 self.previous_state: AlarmControlPanelState | None = armer.armed_state()
267 self.track_status: str = "pending"
269 async def initialize(self) -> None:
270 if self.event.end_datetime_local < self.tracked_at:
271 _LOGGER.debug("AUTOARM Ignoring past event")
272 self.track_status = "ended"
273 return
274 if self.event.start_datetime_local > self.tracked_at:
275 self.start_listener = async_track_point_in_time(
276 self.hass,
277 self.on_calendar_event_start,
278 self.event.start_datetime_local,
279 )
280 else:
281 await self.on_calendar_event_start(dt_util.now())
282 self.track_status = "started"
283 if self.event.end_datetime_local > self.tracked_at:
284 self.end_listener = async_track_point_in_time(
285 self.hass,
286 self.end,
287 self.event.end_datetime_local,
288 )
289 _LOGGER.debug("AUTOARM Now tracking %s event %s, %s", self.calendar_id, self.event.uid, self.event.summary)
291 async def end(self, event_time: dt.datetime) -> None:
292 """Handle an event that has reached its finish date and time"""
293 _LOGGER.debug("AUTOARM Calendar event %s ended, event_time: %s", self.id, event_time)
294 self.track_status = "ended"
295 await self.on_calendar_event_end(dt_util.now())
296 self.shutdown()
298 async def update(self, new_event: CalendarEvent) -> None:
299 _LOGGER.debug("AUTOARM Calendar event updated for %s: %s", self.id, self.event.summary)
300 was_current = self.is_current()
301 self.event = new_event
302 if not self.is_current() and was_current:
303 await self.end(dt_util.now())
305 async def remove(self) -> None:
306 _LOGGER.debug("AUTOARM Calendar event deletion for %s: %s", self.id, self.event.summary)
307 if self.track_status == "started":
308 await self.end(dt_util.now())
309 else:
310 self.track_status = "ended"
312 async def on_calendar_event_start(self, triggered_at: dt.datetime) -> None:
313 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", self.id, triggered_at)
314 new_state = await self.armer.arm(arming_state=self.arming_state, source=ChangeSource.CALENDAR)
315 self.hass.states.async_set(
316 f"sensor.{DOMAIN}_last_calendar_event",
317 new_state=self.event.summary or str(self.id),
318 attributes={
319 "calendar": self.calendar_id,
320 "start": self.event.start_datetime_local,
321 "end": self.event.end_datetime_local,
322 "summary": self.event.summary,
323 "description": self.event.description,
324 "uid": self.event.uid,
325 "new_state": new_state,
326 },
327 )
329 async def on_calendar_event_end(self, ended_at: dt.datetime) -> None:
330 _LOGGER.debug("AUTOARM on_calendar_event_end(%s,%s)", self.id, ended_at)
331 if self.armer.has_active_calendar_event():
332 _LOGGER.debug("AUTOARM No action on event end since other cal event active")
333 return
334 if self.no_event_mode == NO_CAL_EVENT_MODE_AUTO:
335 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", self.id)
336 # avoid having state locked in vacation by state calculator
337 await self.armer.pending_state(source=ChangeSource.CALENDAR)
338 await self.armer.reset_armed_state(source=ChangeSource.CALENDAR)
339 elif self.no_event_mode in AlarmControlPanelState:
340 _LOGGER.info("AUTOARM Calendar event %s ended, and returning to fixed state %s", self.id, self.no_event_mode)
341 await self.armer.arm(alarm_state_as_enum(self.no_event_mode), source=ChangeSource.CALENDAR)
342 else:
343 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode")
344 await self.armer.arm(self.previous_state, source=ChangeSource.CALENDAR)
346 @classmethod
347 def event_id(cls, calendar_id: str, event: CalendarEvent) -> str:
348 """Generate an ID for the calendar even if it doesn't natively support `uid`"""
349 uid = event.uid or str(hash((event.summary, event.description, event.start.isoformat(), event.end.isoformat())))
350 return f"{calendar_id}:{uid}"
352 def is_current(self) -> bool:
353 if self.track_status == "ended":
354 return False
355 now_local: dt.datetime = dt_util.now()
356 return now_local >= self.event.start_datetime_local and now_local <= self.event.end_datetime_local
358 def is_future(self) -> bool:
359 if self.track_status == "ended":
360 return False
361 now_local: dt.datetime = dt_util.now()
362 return self.event.start_datetime_local > now_local
364 def shutdown(self) -> None:
365 unlisten(self.start_listener)
366 self.start_listener = None
367 unlisten(self.end_listener)
368 self.end_listener = None
370 def __eq__(self, other: object) -> bool:
371 """Compare two events based on underlying calendar event"""
372 if not isinstance(other, TrackedCalendarEvent):
373 return False
374 return self.event.uid == other.event.uid