Coverage for custom_components/autoarm/autoarming.py: 87%
535 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-26 21:24 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-26 21:24 +0000
1import asyncio
2import contextlib
3import datetime as dt
4import json
5import logging
6from collections.abc import Callable
7from dataclasses import dataclass
8from functools import partial
9from typing import TYPE_CHECKING, Any
11import homeassistant.util.dt as dt_util
12import voluptuous as vol
13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState
14from homeassistant.components.calendar import CalendarEvent
15from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN
16from homeassistant.components.sun.const import STATE_BELOW_HORIZON
17from homeassistant.const import CONF_CONDITIONS, CONF_ENTITY_ID, EVENT_HOMEASSISTANT_STOP, SERVICE_RELOAD, STATE_HOME
18from homeassistant.core import (
19 Event,
20 EventStateChangedData,
21 HomeAssistant,
22 ServiceCall,
23 ServiceResponse,
24 State,
25 SupportsResponse,
26 callback,
27)
28from homeassistant.exceptions import ConditionError, HomeAssistantError
29from homeassistant.helpers import condition as condition
30from homeassistant.helpers import config_validation as cv
31from homeassistant.helpers import entity_platform
32from homeassistant.helpers import issue_registry as ir
33from homeassistant.helpers.event import (
34 async_track_point_in_time,
35 async_track_state_change_event,
36 async_track_sunrise,
37 async_track_sunset,
38 async_track_time_change,
39)
40from homeassistant.helpers.json import ExtendedJSONEncoder
41from homeassistant.helpers.reload import (
42 async_integration_yaml_config,
43)
44from homeassistant.helpers.service import async_register_admin_service
45from homeassistant.helpers.typing import ConfigType
46from homeassistant.util.hass_dict import HassKey
48from custom_components.autoarm.hass_api import HomeAssistantAPI
50from .calendar import TrackedCalendar, TrackedCalendarEvent
51from .const import (
52 ATTR_RESET,
53 CONF_ALARM_PANEL,
54 CONF_BUTTONS,
55 CONF_CALENDAR_CONTROL,
56 CONF_CALENDAR_NO_EVENT,
57 CONF_CALENDARS,
58 CONF_DAY,
59 CONF_DELAY_TIME,
60 CONF_DIURNAL,
61 CONF_EARLIEST,
62 CONF_NOTIFY,
63 CONF_OCCUPANCY,
64 CONF_OCCUPANCY_DEFAULT,
65 CONF_RATE_LIMIT,
66 CONF_RATE_LIMIT_CALLS,
67 CONF_RATE_LIMIT_PERIOD,
68 CONF_SUNRISE,
69 CONF_TRANSITIONS,
70 CONFIG_SCHEMA,
71 DEFAULT_TRANSITIONS,
72 DOMAIN,
73 NO_CAL_EVENT_MODE_AUTO,
74 NO_CAL_EVENT_MODE_MANUAL,
75 ChangeSource,
76 ConditionVariables,
77)
78from .helpers import Limiter, alarm_state_as_enum, deobjectify, safe_state
80if TYPE_CHECKING:
81 from homeassistant.helpers.condition import ConditionCheckerType
83_LOGGER = logging.getLogger(__name__)
85OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS)
86EPHEMERAL_STATES = (
87 AlarmControlPanelState.PENDING,
88 AlarmControlPanelState.ARMING,
89 AlarmControlPanelState.DISARMING,
90 AlarmControlPanelState.TRIGGERED,
91)
92ZOMBIE_STATES = ("unknown", "unavailable")
93NS_MOBILE_ACTIONS = "mobile_actions"
94PLATFORMS = ["autoarm"]
96HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN)
99@dataclass
100class AutoArmData:
101 armer: "AlarmArmer"
102 other_data: dict[str, str | dict[str, str] | list[str] | int | float | bool | None]
105# async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
106async def async_setup(
107 hass: HomeAssistant,
108 config: ConfigType,
109) -> bool:
110 _ = CONFIG_SCHEMA
111 if DOMAIN not in config:
112 _LOGGER.warning("AUTOARM No config found")
113 return True
114 config = config.get(DOMAIN, {})
116 hass.data[HASS_DATA_KEY] = AutoArmData(_async_process_config(hass, config), {})
117 await hass.data[HASS_DATA_KEY].armer.initialize()
119 async def reload_service_handler(service_call: ServiceCall) -> None:
120 """Reload yaml entities."""
121 config = None
122 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data)
123 with contextlib.suppress(HomeAssistantError):
124 config = await async_integration_yaml_config(hass, DOMAIN)
125 if config is None or DOMAIN not in config:
126 _LOGGER.warning("AUTOARM reload rejected for lack of config: %s", config)
127 return
128 hass.data[HASS_DATA_KEY].armer.shutdown()
129 hass.data[HASS_DATA_KEY].armer = _async_process_config(hass, config[DOMAIN])
130 await hass.data[HASS_DATA_KEY].armer.initialize()
132 async_register_admin_service(
133 hass,
134 DOMAIN,
135 SERVICE_RELOAD,
136 reload_service_handler,
137 )
139 def supplemental_action_enquire_configuration(_call: ServiceCall) -> ConfigType:
140 data: ConfigType = {
141 CONF_ALARM_PANEL: config.get(CONF_ALARM_PANEL, {}).get(CONF_ENTITY_ID),
142 CONF_DIURNAL: config.get(CONF_DIURNAL),
143 CONF_CALENDAR_CONTROL: config.get(CONF_CALENDAR_CONTROL),
144 CONF_BUTTONS: config.get(CONF_BUTTONS, {}),
145 CONF_OCCUPANCY: config.get(CONF_OCCUPANCY, {}),
146 CONF_NOTIFY: config.get(CONF_NOTIFY, {}),
147 CONF_RATE_LIMIT: config.get(CONF_RATE_LIMIT, {}),
148 }
149 try:
150 jsonized: str = json.dumps(obj=data, cls=ExtendedJSONEncoder)
151 return json.loads(jsonized)
152 except Exception as e:
153 _LOGGER.error("AUTOARM Failed to expose config data as entity: %s, %s", data, e)
154 return {"error": str(e)}
156 hass.services.async_register(
157 DOMAIN,
158 "enquire_configuration",
159 supplemental_action_enquire_configuration,
160 supports_response=SupportsResponse.ONLY,
161 )
163 return True
166def _async_process_config(hass: HomeAssistant, config: ConfigType) -> "AlarmArmer":
167 calendar_config: ConfigType = config.get(CONF_CALENDAR_CONTROL, {})
168 migrate(hass)
169 service: AlarmArmer = AlarmArmer(
170 hass,
171 alarm_panel=config[CONF_ALARM_PANEL].get(CONF_ENTITY_ID),
172 diurnal=config.get(CONF_DIURNAL, {}),
173 buttons=config.get(CONF_BUTTONS, {}),
174 occupancy=config[CONF_OCCUPANCY],
175 notify=config[CONF_NOTIFY],
176 rate_limit=config.get(CONF_RATE_LIMIT, {}),
177 calendars=calendar_config.get(CONF_CALENDARS, []),
178 transitions=config.get(CONF_TRANSITIONS),
179 calendar_no_event_mode=calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO),
180 )
181 return service
184def migrate(hass: HomeAssistant) -> None:
185 for entity_id in (
186 "autoarm.configured",
187 "autoarm.last_calendar_event",
188 "autoarm.last_intervention",
189 "autoarm.initialized",
190 "autoarm.last_calculation",
191 ):
192 try:
193 if hass.states.get(entity_id):
194 _LOGGER.info("AUTOARM Migration removing legacy entity_id: %s", entity_id)
195 hass.states.async_remove(entity_id)
196 except Exception as e:
197 _LOGGER.warning("AUTOARM Migration fail for %s:%s", entity_id, e)
200def unlisten(listener: Callable[[], None] | None) -> None:
201 if listener:
202 try:
203 listener()
204 except Exception as e:
205 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e)
208@dataclass
209class Intervention:
210 """Record of a manual intervention, such as a button push, mobile action or alarm panel change"""
212 created_at: dt.datetime
213 source: ChangeSource
214 state: AlarmControlPanelState | None
216 def as_dict(self) -> dict[str, str | None]:
217 return {
218 "created_at": self.created_at.isoformat(),
219 "source": str(self.source),
220 "state": str(self.state) if self.state is not None else None,
221 }
224class AlarmArmer:
225 def __init__(
226 self,
227 hass: HomeAssistant,
228 alarm_panel: str,
229 buttons: dict[str, ConfigType] | None = None,
230 occupancy: ConfigType | None = None,
231 actions: list | None = None,
232 notify: ConfigType | None = None,
233 diurnal: ConfigType | None = None,
234 rate_limit: ConfigType | None = None,
235 calendar_no_event_mode: str | None = None,
236 calendars: list[ConfigType] | None = None,
237 transitions: dict[str, dict[str, list[ConfigType]]] | None = None,
238 ) -> None:
239 occupancy = occupancy or {}
240 rate_limit = rate_limit or {}
241 diurnal = diurnal or {}
243 self.hass: HomeAssistant = hass
244 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone)
245 self.calendar_configs: list[ConfigType] = calendars or []
246 self.calendar_no_event_mode: str = calendar_no_event_mode or NO_CAL_EVENT_MODE_AUTO
247 self.calendars: list[TrackedCalendar] = []
248 self.alarm_panel: str = alarm_panel
249 self.sunrise_cutoff: dt.time | None = diurnal.get(CONF_SUNRISE, {}).get(CONF_EARLIEST)
250 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, [])
251 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get(
252 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME}
253 )
254 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {})
255 self.buttons: ConfigType = buttons or {}
257 self.actions: list[str] = actions or []
258 self.notify_profiles: dict[str, dict] = notify or {}
259 self.unsubscribes: list = []
260 self.pre_pending_state: AlarmControlPanelState | None = None
261 self.button_device: dict[str, str] = {}
262 self.arming_in_progress: asyncio.Event = asyncio.Event()
264 self.rate_limiter: Limiter = Limiter(
265 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)),
266 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5),
267 )
269 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass)
270 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {}
271 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {}
273 self.initialization_errors: dict[str, int] = {}
274 self.interventions: list[Intervention] = []
275 self.intervention_ttl: int = 60
276 self.failures = 0
278 async def initialize(self) -> None:
279 """Async initialization"""
280 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars))
282 self.initialize_alarm_panel()
283 await self.initialize_calendar()
284 await self.initialize_logic()
285 self.initialize_diurnal()
286 self.initialize_occupancy()
287 self.initialize_buttons()
288 self.initialize_integration()
289 self.initialize_housekeeping()
290 self.initialize_home_assistant()
291 await self.reset_armed_state(source=ChangeSource.STARTUP)
293 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state())
295 def record_initialization_error(self, stage: str) -> None:
296 self.initialization_errors.setdefault(stage, 0)
297 self.initialization_errors[stage] += 1
298 self.failures += 1
299 self.hass.states.async_set(
300 f"sensor.{DOMAIN}_failures", str(self.failures), attributes={"initialization_errors": self.initialization_errors}
301 )
303 def record_runtime_error(self) -> None:
304 self.failures += 1
305 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures))
307 def initialize_home_assistant(self) -> None:
308 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once(
309 EVENT_HOMEASSISTANT_STOP, self.async_shutdown
310 )
311 self.hass.states.async_set(
312 f"binary_sensor.{DOMAIN}_initialized",
313 "valid" if not self.initialization_errors else "invalid",
314 attributes=self.initialization_errors,
315 )
316 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures))
317 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calculation", "unavailable", attributes={})
319 self.hass.services.async_register(
320 DOMAIN,
321 "reset_state",
322 self.reset_service,
323 supports_response=SupportsResponse.OPTIONAL,
324 )
326 async def reset_service(self, _call: ServiceCall) -> ServiceResponse:
327 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None))
328 return {"change": new_state or "NO_CHANGE"}
330 def initialize_integration(self) -> None:
331 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", "unavailable", attributes={})
333 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
335 def initialize_alarm_panel(self) -> None:
336 """Set up automation for Home Assistant alarm panel
338 See https://www.home-assistant.io/integrations/alarm_control_panel/
340 Succeeds even if control panel has not yet started, listener will pick up events when it does
341 """
342 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change))
343 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel)
345 def initialize_housekeeping(self) -> None:
346 self.unsubscribes.append(
347 async_track_time_change(
348 self.hass,
349 action=self.housekeeping,
350 minute=0,
351 )
352 )
354 def initialize_diurnal(self) -> None:
355 # events API expects a function, however underlying HassJob is fine with coroutines
356 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore
357 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore
359 def initialize_occupancy(self) -> None:
360 """Configure occupants, and listen for changes in their state"""
361 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants))
362 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change))
364 def initialize_buttons(self) -> None:
365 """Initialize (optional) physical alarm state control buttons"""
367 def setup_button(state_name: str, button_entity: str, cb: Callable) -> None:
368 self.button_device[state_name] = button_entity
369 if self.button_device[state_name]:
370 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb))
372 _LOGGER.debug(
373 "AUTOARM Configured %s button for %s",
374 state_name,
375 self.button_device[state_name],
376 )
378 for button_use, button_config in self.buttons.items():
379 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME)
380 for entity_id in button_config[CONF_ENTITY_ID]:
381 if button_use == ATTR_RESET:
382 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay))
383 else:
384 setup_button(
385 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay)
386 )
388 async def initialize_calendar(self) -> None:
389 """Configure calendar polling (optional)"""
390 stage: str = "calendar"
391 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calendar_event", "unavailable", attributes={})
392 if not self.calendar_configs:
393 return
394 try:
395 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN)
396 if platforms:
397 platform: entity_platform.EntityPlatform = platforms[0]
398 else:
399 self.record_initialization_error(stage)
400 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant")
401 return
402 except Exception as _e:
403 self.record_initialization_error(stage)
404 _LOGGER.exception("AUTOARM Unable to access calendar platform")
405 return
406 for calendar_config in self.calendar_configs:
407 tracked_calendar = TrackedCalendar(calendar_config, self)
408 await tracked_calendar.initialize(platform)
409 self.calendars.append(tracked_calendar)
411 async def initialize_logic(self) -> None:
412 stage: str = "logic"
413 for state_str, raw_condition in DEFAULT_TRANSITIONS.items():
414 if state_str not in self.transition_config:
415 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str)
416 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)}
418 for state_str, transition_config in self.transition_config.items():
419 error: str = ""
420 condition_config = transition_config.get(CONF_CONDITIONS)
421 if condition_config is None:
422 error = "Empty conditions"
423 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition")
424 else:
425 try:
426 state = AlarmControlPanelState(state_str)
427 cond: ConditionCheckerType | None = await self.hass_api.build_condition(
428 condition_config, strict=True, validate=True, name=state_str
429 )
431 if cond:
432 # re-run without strict wrapper
433 cond = await self.hass_api.build_condition(condition_config, name=state_str)
434 if cond:
435 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}")
436 self.transitions[state] = cond
437 else:
438 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}")
439 error = "Condition validation failed"
440 except ValueError as ve:
441 self.record_initialization_error(stage)
442 error = f"Invalid state {ve}"
443 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}")
444 except vol.Invalid as vi:
445 self.record_initialization_error(stage)
446 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}")
447 error = f"Schema error {vi}"
448 except ConditionError as ce:
449 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}")
450 if hasattr(ce, "message"):
451 error = ce.message # type: ignore
452 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore[attr-defined]
453 error = ce.error.message # type: ignore
454 else:
455 error = str(ce)
456 except Exception as e:
457 self.record_initialization_error(stage)
458 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config)
459 error = f"Unknown exception {e}"
460 if error:
461 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}")
462 self.hass_api.raise_issue(
463 f"transition_condition_{state_str}",
464 is_fixable=False,
465 issue_key="transition_condition",
466 issue_map={"state": state_str, "error": error},
467 severity=ir.IssueSeverity.ERROR,
468 )
470 async def async_shutdown(self, _event: Event) -> None:
471 _LOGGER.info("AUTOARM shut down event received")
472 self.stop_listener = None
473 self.shutdown()
475 def shutdown(self) -> None:
476 _LOGGER.info("AUTOARM shutting down")
477 for calendar in self.calendars:
478 calendar.shutdown()
479 while self.unsubscribes:
480 unlisten(self.unsubscribes.pop())
481 unlisten(self.stop_listener)
482 self.stop_listener = None
483 _LOGGER.info("AUTOARM shut down")
485 def active_calendar_event(self) -> CalendarEvent | None:
486 events: list[CalendarEvent] = []
487 for cal in self.calendars:
488 events.extend(cal.active_events())
489 if events:
490 # TODO: consider sorting events to LIFO
491 return events[0]
492 return None
494 def is_occupied(self) -> bool:
495 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants)
497 def at_home(self) -> list[str]:
498 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME]
500 def not_home(self) -> list[str]:
501 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME]
503 def is_unoccupied(self) -> bool:
504 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants)
506 def is_night(self) -> bool:
507 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON
509 def armed_state(self) -> AlarmControlPanelState:
510 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel))
511 alarm_state = alarm_state_as_enum(raw_state)
512 if alarm_state is None:
513 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING")
514 return AlarmControlPanelState.PENDING
515 return alarm_state
517 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, str]]:
518 entity_id = old = new = None
519 new_attributes: dict[str, str] = {}
520 if event and event.data:
521 entity_id = event.data.get("entity_id")
522 old_obj = event.data.get("old_state")
523 if old_obj:
524 old = old_obj.state
525 new_obj = event.data.get("new_state")
526 if new_obj:
527 new = new_obj.state
528 new_attributes = new_obj.attributes
529 return entity_id, old, new, new_attributes
531 async def pending_state(self, source: ChangeSource | None) -> None:
532 self.pre_pending_state = self.armed_state()
533 await self.arm(AlarmControlPanelState.PENDING, source=source)
535 @callback
536 async def delayed_reset_armed_state(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None:
537 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
538 if self.is_intervention_since_request(requested_at):
539 return
540 await self.reset_armed_state(**kwargs)
542 async def reset_armed_state(
543 self, intervention: Intervention | None = None, source: ChangeSource | None = None
544 ) -> str | None:
545 """Logic to automatically work out appropriate current armed state"""
546 state: AlarmControlPanelState | None = None
547 existing_state: AlarmControlPanelState | None = None
548 must_change_state: bool = False
549 last_state_intervention: Intervention | None = None
550 active_calendar_event: CalendarEvent | None = None
552 if source is None and intervention is not None:
553 source = intervention.source
554 _LOGGER.debug(
555 "AUTOARM reset_armed_state(intervention=%s,source=%s)",
556 intervention,
557 source,
558 )
560 try:
561 existing_state = self.armed_state()
562 state = existing_state
563 if self.calendars:
564 active_calendar_event = self.active_calendar_event()
565 if active_calendar_event:
566 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active")
567 return existing_state
568 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL:
569 _LOGGER.debug(
570 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual"
571 )
572 return existing_state
573 if self.calendar_no_event_mode in AlarmControlPanelState:
574 # TODO: may be dupe logic with on_cal event
575 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode)
576 return await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), ChangeSource.CALENDAR)
577 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO:
578 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto")
579 else:
580 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode)
582 # TODO: expose as config ( for manual disarm override ) and condition logic
583 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING
584 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state:
585 _LOGGER.debug("AUTOARM Ignoring previous interventions")
586 else:
587 last_state_intervention = self.last_state_intervention()
588 if last_state_intervention:
589 _LOGGER.debug(
590 "AUTOARM Ignoring automated reset for %s set by %s at %s",
591 last_state_intervention.state,
592 last_state_intervention.source,
593 last_state_intervention.created_at,
594 )
595 return existing_state
596 state = self.determine_state()
597 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state:
598 state = await self.arm(state, source=source)
599 finally:
600 self.hass.states.async_set(
601 f"sensor.{DOMAIN}_last_calculation",
602 str(state is not None and state != existing_state),
603 attributes={
604 "new_state": str(state),
605 "old_state": str(existing_state),
606 "source": source,
607 "active_calendar_event": deobjectify(active_calendar_event),
608 "occupied": self.is_occupied(),
609 "night": self.is_night(),
610 "must_change_state": str(must_change_state),
611 "last_state_intervention": deobjectify(last_state_intervention),
612 "intervention": intervention.as_dict() if intervention else None,
613 "time": dt_util.now().isoformat(),
614 },
615 )
617 return state
619 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool:
620 if requested_at is not None and self.has_intervention_since(requested_at):
621 _LOGGER.debug(
622 "AUTOARM Cancelling delayed operation since subsequent manual action",
623 )
624 return True
625 return False
627 def determine_state(self) -> AlarmControlPanelState | None:
628 """Compute a new state using occupancy, sun and transition conditions"""
629 evaluated_state: AlarmControlPanelState | None = None
630 condition_vars: ConditionVariables = ConditionVariables(
631 self.is_occupied(),
632 self.is_night(),
633 state=self.armed_state(),
634 calendar_event=self.active_calendar_event(),
635 occupied_defaults=self.occupied_defaults,
636 at_home=self.at_home(),
637 not_home=self.not_home(),
638 )
639 for state, checker in self.transitions.items():
640 if self.hass_api.evaluate_condition(checker, condition_vars):
641 _LOGGER.debug("AUTOARM Computed state as % from condition", state)
642 evaluated_state = state
643 break
644 if evaluated_state is None:
645 return None
646 return AlarmControlPanelState(evaluated_state)
648 @callback
649 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None:
650 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
651 if self.is_intervention_since_request(requested_at):
652 return
653 await self.arm(**kwargs)
655 async def arm(
656 self, arming_state: AlarmControlPanelState | None = None, source: ChangeSource | None = None
657 ) -> AlarmControlPanelState | None:
658 """Change alarm panel state
660 Args:
661 ----
662 arming_state (str, optional): _description_. Defaults to None.
663 source (str,optional): Source of the change, for example 'calendar' or 'button'
665 Returns:
666 -------
667 str: New arming state
669 """
670 if self.rate_limiter.triggered():
671 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source)
672 return None
673 try:
674 self.arming_in_progress.set()
675 existing_state: AlarmControlPanelState | None = self.armed_state()
676 if arming_state != existing_state:
677 attrs: dict[str, str] = {}
678 panel_state: State | None = self.hass.states.get(self.alarm_panel)
679 if panel_state:
680 attrs.update(panel_state.attributes)
681 attrs[ATTR_CHANGED_BY] = f"{DOMAIN}.{source}"
682 self.hass.states.async_set(entity_id=self.alarm_panel, new_state=str(arming_state), attributes=attrs)
683 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source)
684 return arming_state
685 _LOGGER.debug("Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state)
686 return existing_state
687 except Exception as e:
688 _LOGGER.error("AUTOARM Failed to arm: %s", e)
689 self.record_runtime_error()
690 finally:
691 self.arming_in_progress.clear()
692 return None
694 async def notify(self, message: str, profile: str = "normal", title: str | None = None) -> None:
695 notify_service = None
696 try:
697 # separately merge base dict and data sub-dict as cheap and nasty semi-deep-merge
698 selected_profile = self.notify_profiles.get(profile)
699 base_profile = self.notify_profiles.get("common", {})
700 base_profile_data = base_profile.get("data", {})
701 merged_profile = dict(base_profile)
702 merged_profile_data = dict(base_profile_data)
703 if selected_profile:
704 selected_profile_data: dict = selected_profile.get("data", {})
705 merged_profile.update(selected_profile)
706 merged_profile_data.update(selected_profile_data)
707 merged_profile["data"] = merged_profile_data
708 notify_service = merged_profile.get("service", "").replace("notify.", "")
710 title = title or "Alarm Auto Arming"
711 if notify_service and merged_profile:
712 data = merged_profile.get("data", {})
713 await self.hass.services.async_call(
714 "notify",
715 notify_service,
716 service_data={"message": message, "title": title, "data": data},
717 )
718 else:
719 _LOGGER.debug("AUTOARM Skipped notification, service: %s, data: %s", notify_service, merged_profile)
721 except Exception:
722 self.record_runtime_error()
723 _LOGGER.exception("AUTOARM notify.%s failed", notify_service)
725 def schedule_state(
726 self,
727 trigger_time: dt.datetime,
728 state: AlarmControlPanelState | None,
729 intervention: Intervention | None,
730 source: ChangeSource | None = None,
731 ) -> None:
732 source = source or intervention.source if intervention else None
734 job: Callable
735 if state is None:
736 _LOGGER.debug("Delayed reset, triggered at: %s, source%s", trigger_time, source)
737 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now())
738 else:
739 _LOGGER.debug("Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source)
741 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now())
743 self.unsubscribes.append(
744 async_track_point_in_time(
745 self.hass,
746 job,
747 trigger_time,
748 )
749 )
751 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention:
752 intervention = Intervention(dt_util.now(), source, state)
753 self.interventions.append(intervention)
754 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", source, attributes=intervention.as_dict())
756 return intervention
758 def has_intervention_since(self, cutoff: dt.datetime) -> bool:
759 """Has there been a manual intervention since the cutoff time"""
760 if not self.interventions:
761 return False
762 return any(intervention.created_at > cutoff for intervention in self.interventions)
764 def last_state_intervention(self) -> Intervention | None:
765 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None]
766 if candidates:
767 return candidates[-1]
768 return None
770 @callback
771 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002
772 _LOGGER.debug("AUTOARM Sunrise")
773 now = dt_util.now() # uses Home Assistant's time zone setting
774 if not self.sunrise_cutoff or now.time() >= self.sunrise_cutoff:
775 # sun is up, and not earlier than cutoff
776 await self.reset_armed_state(source=ChangeSource.SUNRISE)
777 elif self.sunrise_cutoff and now.time() < self.sunrise_cutoff:
778 _LOGGER.debug(
779 "AUTOARM Rescheduling delayed sunrise action to %s",
780 self.sunrise_cutoff,
781 )
782 self.schedule_state(
783 dt.datetime.combine(now.date(), self.sunrise_cutoff, tzinfo=dt_util.DEFAULT_TIME_ZONE),
784 intervention=None,
785 state=None,
786 source=ChangeSource.SUNRISE,
787 )
789 @callback
790 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002
791 _LOGGER.debug("AUTOARM Sunset")
792 await self.reset_armed_state(source=ChangeSource.SUNSET)
794 @callback
795 async def on_mobile_action(self, event: Event) -> None:
796 _LOGGER.debug("AUTOARM Mobile Action: %s", event)
797 source: ChangeSource = ChangeSource.MOBILE
799 match event.data.get("action"):
800 case "ALARM_PANEL_DISARM":
801 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED)
802 await self.arm(AlarmControlPanelState.DISARMED, source=source)
803 case "ALARM_PANEL_RESET":
804 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
805 case "ALARM_PANEL_AWAY":
806 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY)
807 await self.arm(AlarmControlPanelState.ARMED_AWAY, source=source)
808 case _:
809 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data)
811 @callback
812 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None:
813 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event)
814 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state)
815 if delay:
816 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON)
817 await self.notify(
818 f"Alarm will be set to {state} in {delay}",
819 title=f"Arm set to {state} process starting",
820 )
821 else:
822 await self.arm(state, source=ChangeSource.BUTTON)
824 @callback
825 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None:
826 _LOGGER.debug("AUTOARM Reset Button: %s", event)
827 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None)
828 if delay:
829 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON)
831 await self.notify(
832 f"Alarm will be reset in {delay}",
833 title="Alarm reset wait initiated",
834 )
835 else:
836 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
838 @callback
839 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None:
840 """Listen for person state events
842 Args:
843 ----
844 event (Event[EventStateChangedData]): state change event
846 """
847 entity_id, old, new, new_attributes = self._extract_event(event)
848 if old == new:
849 _LOGGER.debug(
850 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s",
851 entity_id,
852 old,
853 new,
854 event,
855 new_attributes,
856 )
857 return
858 _LOGGER.debug(
859 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes
860 )
861 if new in self.occupied_delay:
862 self.schedule_state(
863 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY
864 )
865 else:
866 await self.reset_armed_state(source=ChangeSource.OCCUPANCY)
868 @callback
869 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None:
870 """Alarm Control Panel has been changed outside of AutoArm"""
871 entity_id, old, new, new_attributes = self._extract_event(event)
872 if new_attributes:
873 changed_by = new_attributes.get(ATTR_CHANGED_BY)
874 if changed_by and changed_by.startswith(f"{DOMAIN}."):
875 _LOGGER.debug(
876 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s",
877 entity_id,
878 event.event_type,
879 old,
880 new,
881 )
882 return
883 new_state = alarm_state_as_enum(new)
885 _LOGGER.info(
886 "AUTOARM Panel Change: %s,%s: %s-->%s",
887 entity_id,
888 event.event_type,
889 old,
890 new,
891 )
892 self.record_intervention(ChangeSource.ALARM_PANEL, new_state)
893 if new in ZOMBIE_STATES:
894 _LOGGER.warning("AUTOARM Dezombifying %s ...", new)
895 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION)
896 elif new != old:
897 message = f"Home Assistant alarm level now set from {old} to {new}"
898 await self.notify(message, title=f"Alarm now {new}", profile="quiet")
899 else:
900 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new)
902 async def on_calendar_event_start(self, event: TrackedCalendarEvent, triggered_at: dt.datetime) -> None:
903 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, triggered_at)
904 if event.arming_state != self.armed_state():
905 _LOGGER.info("AUTOARM Calendar event %s changing arming to %s at %s", event.id, event.arming_state, triggered_at)
906 await self.arm(arming_state=event.arming_state, source=ChangeSource.CALENDAR)
907 self.hass.states.async_set(
908 f"sensor.{DOMAIN}_last_calendar_event",
909 new_state=event.event.summary or str(event.id),
910 attributes={
911 "calendar": event.calendar_id,
912 "start": event.event.start_datetime_local,
913 "end": event.event.end_datetime_local,
914 "summary": event.event.summary,
915 "description": event.event.description,
916 "uid": event.event.uid,
917 },
918 )
920 async def on_calendar_event_end(self, event: TrackedCalendarEvent, ended_at: dt.datetime) -> None:
921 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, ended_at)
922 if any(cal.has_active_event() for cal in self.calendars):
923 _LOGGER.debug("AUTOARM No action on event end since other cal event active")
924 return
925 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO:
926 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", event.id)
927 # avoid having state locked in vacation by state calculator
928 await self.pending_state(source=ChangeSource.CALENDAR)
929 await self.reset_armed_state(source=ChangeSource.CALENDAR)
930 elif self.calendar_no_event_mode in AlarmControlPanelState:
931 _LOGGER.info(
932 "AUTOARM Calendar event %s ended, and returning to fixed state %s", event.id, self.calendar_no_event_mode
933 )
934 await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), source=ChangeSource.CALENDAR)
935 else:
936 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode")
937 await self.arm(event.previous_state, source=ChangeSource.CALENDAR)
939 @callback
940 async def housekeeping(self, triggered_at: dt.datetime) -> None:
941 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at)
942 now = dt_util.now()
943 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)]
944 for cal in self.calendars:
945 await cal.prune_events()
946 _LOGGER.debug("AUTOARM Housekeeping finished")