Coverage for custom_components/autoarm/calendar.py: 91%
137 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-08 20:27 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-08 20:27 +0000
1import datetime as dt
2import logging
3import re
4from collections.abc import Callable
5from functools import partial
6from typing import TYPE_CHECKING, cast
8import homeassistant.util.dt as dt_util
9from homeassistant.components.alarm_control_panel.const import AlarmControlPanelState
10from homeassistant.components.calendar import CalendarEntity, CalendarEvent
11from homeassistant.const import CONF_ALIAS, CONF_ENTITY_ID
12from homeassistant.helpers import entity_platform
13from homeassistant.helpers.event import (
14 async_track_point_in_time,
15 async_track_utc_time_change,
16)
17from homeassistant.helpers.typing import ConfigType
19from custom_components.autoarm.helpers import alarm_state_as_enum
21from .const import (
22 CONF_CALENDAR_EVENT_STATES,
23 CONF_CALENDAR_POLL_INTERVAL,
24)
26if TYPE_CHECKING:
27 from homeassistant.core import CALLBACK_TYPE
29_LOGGER = logging.getLogger(__name__)
32def unlisten(listener: Callable[[], None] | None) -> None:
33 if listener:
34 try:
35 listener()
36 except Exception as e:
37 _LOGGER.debug("AUTOARM Failure closing calendar listener %s: %s", listener, e)
40class TrackedCalendar:
41 """Listener for a Home Assistant Calendar"""
43 def __init__(self, calendar_config: ConfigType, armer: "AlarmArmer") -> None: # type: ignore # noqa: F821
44 self.enabled = False
45 self.armer = armer
46 self.alias: str = cast("str", calendar_config.get(CONF_ALIAS, ""))
47 self.entity_id: str = cast("str", calendar_config.get(CONF_ENTITY_ID))
48 self.poll_interval: int = calendar_config.get(CONF_CALENDAR_POLL_INTERVAL, 30)
49 self.state_mappings: dict[str, list[str]] = cast("dict", calendar_config.get(CONF_CALENDAR_EVENT_STATES))
50 self.tracked_events: dict[str, TrackedCalendarEvent] = {}
51 self.poller_listener: CALLBACK_TYPE | None = None
53 async def initialize(self, calendar_platform: entity_platform.EntityPlatform) -> None:
54 try:
55 calendar_entity: CalendarEntity = cast("CalendarEntity", calendar_platform.domain_entities[self.entity_id])
56 if calendar_entity is None:
57 _LOGGER.warning("AUTOARM Unable to access calendar %s", self.entity_id)
58 else:
59 self.calendar_entity = calendar_entity
60 _LOGGER.info("AUTOARM Configured calendar %s from %s", self.entity_id, calendar_platform.platform_name)
61 self.poller_listener = async_track_utc_time_change(
62 self.armer.hass,
63 self.on_timed_poll,
64 "*",
65 minute=f"/{self.poll_interval}",
66 second=0,
67 local=True,
68 )
69 self.enabled = True
70 # force an initial poll
71 await self.match_events()
73 except Exception as _e:
74 _LOGGER.exception("AUTOARM Failed to initialize calendar entity %s", self.entity_id)
76 def shutdown(self) -> None:
77 unlisten(self.poller_listener)
78 self.poller_listener = None
79 for tracked_event in self.tracked_events.values():
80 tracked_event.shutdown()
81 self.enabled = False
82 self.tracked_events.clear()
84 async def on_timed_poll(self, _called_time: dt.datetime) -> None:
85 """Check for new and dead events, entry point for the timed calendar tracker listener"""
86 _LOGGER.debug("AUTOARM Calendar Poll")
87 await self.match_events()
88 await self.prune_events()
90 def has_active_event(self) -> bool:
91 """Is there any event matching a state pattern that is currently open"""
92 return any(tevent.is_current() for tevent in self.tracked_events.values())
94 def active_events(self) -> list[CalendarEvent]:
95 """List all the events matching a state pattern that are currently open"""
96 return [v.event for v in self.tracked_events.values() if v.is_current()]
98 async def match_events(self) -> None:
99 """Query the calendar for events that match state patterns"""
100 now_local = dt_util.now()
101 start_dt = now_local - dt.timedelta(minutes=15)
102 end_dt = now_local + dt.timedelta(minutes=self.poll_interval + 5)
104 events: list[CalendarEvent] = await self.calendar_entity.async_get_events(self.armer.hass, start_dt, end_dt)
106 for event in events:
107 # presume the events are sorted by start time
108 event_id = TrackedCalendarEvent.event_id(self.calendar_entity.entity_id, event)
109 _LOGGER.debug("AUTOARM Calendar Event: %s", event_id)
110 for state_str, patterns in self.state_mappings.items():
111 if any(
112 re.match(
113 patt,
114 event.summary,
115 )
116 for patt in patterns
117 ):
118 if event_id not in self.tracked_events:
119 state: AlarmControlPanelState | None = alarm_state_as_enum(state_str)
120 if state is None:
121 _LOGGER.warning(
122 "AUTOARM Calendar %s found event %s for invalid state %s",
123 self.calendar_entity.entity_id,
124 event.summary,
125 state_str,
126 )
127 else:
128 _LOGGER.info(
129 "AUTOARM Calendar %s matched event %s for state %s",
130 self.calendar_entity.entity_id,
131 event.summary,
132 state_str,
133 )
135 self.tracked_events[event_id] = TrackedCalendarEvent(
136 self.calendar_entity.entity_id, event, state, self.armer
137 )
138 await self.tracked_events[event_id].initialize()
140 async def prune_events(self) -> None:
141 """Remove past events"""
142 to_remove: list[str] = []
143 for event_id, tevent in self.tracked_events.items():
144 if not tevent.is_current() and not tevent.is_future():
145 _LOGGER.debug("AUTOARM Pruning expire calendar event: %s", tevent.event.uid)
146 to_remove.append(event_id)
147 await tevent.end(dt_util.now())
148 for event_id in to_remove:
149 del self.tracked_events[event_id]
152class TrackedCalendarEvent:
153 """Generate alarm state changes for a Home Assistant Calendar event"""
155 def __init__(
156 self,
157 calendar_id: str,
158 event: CalendarEvent,
159 arming_state: AlarmControlPanelState,
160 armer: "AlarmArmer", # type: ignore # noqa: F821
161 ) -> None:
162 self.tracked_at = dt_util.now()
163 self.calendar_id = calendar_id
164 self.id = TrackedCalendarEvent.event_id(calendar_id, event)
165 self.event: CalendarEvent = event
166 self.arming_state: AlarmControlPanelState = arming_state
167 self.start_listener: Callable | None = None
168 self.end_listener: Callable | None = None
169 self.armer = armer
170 self.previous_state: AlarmControlPanelState | None = armer.armed_state()
171 self.track_status: str = "pending"
173 async def initialize(self) -> None:
175 if self.event.end_datetime_local < self.tracked_at:
176 _LOGGER.debug("AUTOARM Ignoring past event")
177 self.track_status = "ended"
178 return
179 if self.event.start_datetime_local > self.tracked_at:
180 self.start_listener = async_track_point_in_time(
181 self.armer.hass,
182 partial(self.armer.on_calendar_event_start, self),
183 self.event.start_datetime_local,
184 )
185 else:
186 await self.armer.on_calendar_event_start(self, dt_util.now())
187 self.track_status = "started"
188 if self.event.end_datetime_local > self.tracked_at:
189 self.end_listener = async_track_point_in_time(
190 self.armer.hass,
191 self.end,
192 self.event.end_datetime_local,
193 )
194 _LOGGER.debug("AUTOARM Now tracking %s event %s, %s", self.calendar_id, self.event.uid, self.event.summary)
196 async def end(self, event_time: dt.datetime) -> None:
197 """Handle an event that has reached its finish date and time"""
198 _LOGGER.debug("AUTOARM Calendar event %s ended, event_time: %s", self.id, event_time)
199 self.track_status = "ended"
200 await self.armer.on_calendar_event_end(self, dt_util.now())
201 self.shutdown()
203 @classmethod
204 def event_id(cls, calendar_id: str, event: CalendarEvent) -> str:
205 """Generate an ID for the calendar even if it doesn't natively support `uid`"""
206 uid = event.uid or str(hash((event.summary, event.description, event.start.isoformat(), event.end.isoformat())))
207 return f"{calendar_id}:{uid}"
209 def is_current(self) -> bool:
210 if self.track_status == "ended":
211 return False
212 now_local: dt.datetime = dt_util.now()
213 return now_local >= self.event.start_datetime_local and now_local <= self.event.end_datetime_local
215 def is_future(self) -> bool:
216 if self.track_status == "ended":
217 return False
218 now_local: dt.datetime = dt_util.now()
219 return self.event.start_datetime_local > now_local
221 def shutdown(self) -> None:
222 unlisten(self.start_listener)
223 self.start_listener = None
224 unlisten(self.end_listener)
225 self.end_listener = None
227 def __eq__(self, other: object) -> bool:
228 """Compare two events based on underlying calendar event"""
229 if not isinstance(other, TrackedCalendarEvent):
230 return False
231 return self.event.uid == other.event.uid