Coverage for custom_components/autoarm/autoarming.py: 86%
597 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-17 01:14 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-17 01:14 +0000
1import asyncio
2import datetime as dt
3import json
4import logging
5import re
6from collections.abc import Callable
7from dataclasses import dataclass
8from functools import partial
9from typing import TYPE_CHECKING, Any
11import homeassistant.util.dt as dt_util
12import voluptuous as vol
13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState
14from homeassistant.components.calendar import CalendarEvent
15from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN
16from homeassistant.components.sun.const import STATE_BELOW_HORIZON
17from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
18from homeassistant.const import (
19 CONF_CONDITIONS,
20 CONF_ENTITY_ID,
21 CONF_SERVICE,
22 EVENT_HOMEASSISTANT_STOP,
23 SERVICE_RELOAD,
24 STATE_HOME,
25)
26from homeassistant.core import (
27 Event,
28 EventStateChangedData,
29 HomeAssistant,
30 ServiceCall,
31 ServiceResponse,
32 State,
33 SupportsResponse,
34 callback,
35)
36from homeassistant.exceptions import ConditionError, ConfigEntryNotReady, HomeAssistantError
37from homeassistant.helpers import condition as condition
38from homeassistant.helpers import config_validation as cv
39from homeassistant.helpers import entity_platform
40from homeassistant.helpers import issue_registry as ir
41from homeassistant.helpers.event import (
42 async_track_point_in_time,
43 async_track_state_change_event,
44 async_track_sunrise,
45 async_track_sunset,
46 async_track_time_change,
47)
48from homeassistant.helpers.reload import (
49 async_integration_yaml_config,
50)
51from homeassistant.helpers.service import async_register_admin_service
52from homeassistant.helpers.typing import ConfigType
53from homeassistant.util.hass_dict import HassKey
55from custom_components.autoarm.hass_api import HomeAssistantAPI
56from custom_components.autoarm.notifier import Notifier
58from .calendar import TrackedCalendar
59from .config_flow import (
60 CONF_CALENDAR_ENTITIES,
61 CONF_NO_EVENT_MODE,
62 CONF_NOTIFY_ACTION,
63 CONF_NOTIFY_ENABLED,
64 CONF_NOTIFY_TARGETS,
65 CONF_OCCUPANCY_DEFAULT_DAY,
66 CONF_OCCUPANCY_DEFAULT_NIGHT,
67 CONF_PERSON_ENTITIES,
68 CONF_SUNRISE_EARLIEST,
69 CONF_SUNRISE_LATEST,
70 CONF_SUNSET_EARLIEST,
71 CONF_SUNSET_LATEST,
72 DEFAULT_NOTIFY_ACTION,
73)
74from .const import (
75 ATTR_RESET,
76 CONF_ALARM_PANEL,
77 CONF_BUTTONS,
78 CONF_CALENDAR_CONTROL,
79 CONF_CALENDAR_EVENT_STATES,
80 CONF_CALENDAR_NO_EVENT,
81 CONF_CALENDAR_POLL_INTERVAL,
82 CONF_CALENDARS,
83 CONF_DAY,
84 CONF_DELAY_TIME,
85 CONF_DIURNAL,
86 CONF_EARLIEST,
87 CONF_LATEST,
88 CONF_NIGHT,
89 CONF_NOTIFY,
90 CONF_OCCUPANCY,
91 CONF_OCCUPANCY_DEFAULT,
92 CONF_RATE_LIMIT,
93 CONF_RATE_LIMIT_CALLS,
94 CONF_RATE_LIMIT_PERIOD,
95 CONF_SUNRISE,
96 CONF_SUNSET,
97 CONF_TRANSITIONS,
98 CONFIG_SCHEMA,
99 DEFAULT_TRANSITIONS,
100 DOMAIN,
101 NO_CAL_EVENT_MODE_AUTO,
102 NO_CAL_EVENT_MODE_MANUAL,
103 NOTIFY_COMMON,
104 YAML_DATA_KEY,
105 ChangeSource,
106 ConditionVariables,
107)
108from .helpers import AppHealthTracker, ExtendedExtendedJSONEncoder, Limiter, alarm_state_as_enum, deobjectify, safe_state
110if TYPE_CHECKING:
111 from homeassistant.helpers.condition import ConditionCheckerType
113_LOGGER = logging.getLogger(__name__)
115OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS)
116EPHEMERAL_STATES = (
117 AlarmControlPanelState.PENDING,
118 AlarmControlPanelState.ARMING,
119 AlarmControlPanelState.DISARMING,
120 AlarmControlPanelState.TRIGGERED,
121)
122ZOMBIE_STATES = ("unknown", "unavailable")
123NS_MOBILE_ACTIONS = "mobile_actions"
124PLATFORMS = ["autoarm"]
126HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN)
129@dataclass
130class AutoArmData:
131 armer: "AlarmArmer"
132 other_data: dict[str, str | dict[str, str] | list[str] | int | float | bool | None]
135async def async_setup(
136 hass: HomeAssistant,
137 config: ConfigType,
138) -> bool:
139 _ = CONFIG_SCHEMA
140 yaml_config: ConfigType = config.get(DOMAIN, {})
141 if yaml_config or YAML_DATA_KEY not in hass.data:
142 hass.data[YAML_DATA_KEY] = yaml_config
144 has_alarm_panel = CONF_ALARM_PANEL in yaml_config
145 existing_entries = hass.config_entries.async_entries(DOMAIN)
147 if has_alarm_panel and not existing_entries:
148 _LOGGER.info("AUTOARM Triggering import of YAML configuration to ConfigEntry")
149 hass.async_create_task(hass.config_entries.flow.async_init(DOMAIN, context={"source": SOURCE_IMPORT}, data=yaml_config))
150 elif has_alarm_panel and existing_entries:
151 _LOGGER.warning("AUTOARM YAML core config present but ConfigEntry already exists; ignoring YAML core settings")
152 ir.async_create_issue(
153 hass,
154 DOMAIN,
155 "yaml_core_config_deprecated",
156 is_fixable=False,
157 severity=ir.IssueSeverity.WARNING,
158 translation_key="yaml_core_config_deprecated",
159 )
161 async def reload_service_handler(service_call: ServiceCall) -> None:
162 """Reload yaml entities."""
163 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data)
164 try:
165 fresh_config = await async_integration_yaml_config(hass, DOMAIN)
166 except HomeAssistantError as err:
167 raise HomeAssistantError(f"Failed to reload YAML configuration: {err}") from err
168 if fresh_config is not None and DOMAIN in fresh_config:
169 hass.data[YAML_DATA_KEY] = fresh_config[DOMAIN]
170 else:
171 hass.data[YAML_DATA_KEY] = {}
172 entries = hass.config_entries.async_entries(DOMAIN)
173 for entry in entries:
174 await hass.config_entries.async_reload(entry.entry_id)
176 async_register_admin_service(
177 hass,
178 DOMAIN,
179 SERVICE_RELOAD,
180 reload_service_handler,
181 )
183 def supplemental_action_enquire_configuration(_call: ServiceCall) -> ConfigType:
184 entries = hass.config_entries.async_entries(DOMAIN)
185 if not entries:
186 raise HomeAssistantError("No config entry found for AutoArm")
187 entry = entries[0]
188 stashed_yaml = hass.data.get(YAML_DATA_KEY, {})
189 data: ConfigType = {
190 CONF_ALARM_PANEL: entry.data.get(CONF_ALARM_PANEL),
191 CONF_DIURNAL: {
192 CONF_SUNRISE: {
193 CONF_EARLIEST: entry.options.get(CONF_SUNRISE_EARLIEST),
194 CONF_LATEST: entry.options.get(CONF_SUNRISE_LATEST),
195 },
196 CONF_SUNSET: {
197 CONF_EARLIEST: entry.options.get(CONF_SUNSET_EARLIEST),
198 CONF_LATEST: entry.options.get(CONF_SUNSET_LATEST),
199 },
200 },
201 CONF_CALENDAR_CONTROL: stashed_yaml.get(CONF_CALENDAR_CONTROL),
202 CONF_BUTTONS: stashed_yaml.get(CONF_BUTTONS, {}),
203 CONF_OCCUPANCY: {
204 CONF_ENTITY_ID: entry.options.get(CONF_PERSON_ENTITIES, []),
205 CONF_OCCUPANCY_DEFAULT: {
206 CONF_DAY: entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "armed_home"),
207 },
208 },
209 CONF_NOTIFY: {
210 CONF_SERVICE: entry.options.get(CONF_NOTIFY_ACTION)
211 or stashed_yaml.get(CONF_NOTIFY, {}).get(NOTIFY_COMMON, {}).get(CONF_SERVICE, DEFAULT_NOTIFY_ACTION),
212 "targets": entry.options.get(CONF_NOTIFY_TARGETS, []),
213 "profiles": stashed_yaml.get(CONF_NOTIFY, {}),
214 "enabled": entry.options.get(CONF_NOTIFY_ENABLED, True),
215 },
216 CONF_RATE_LIMIT: stashed_yaml.get(CONF_RATE_LIMIT, {}),
217 }
218 try:
219 jsonized: str = json.dumps(obj=data, cls=ExtendedExtendedJSONEncoder)
220 return json.loads(jsonized)
221 except Exception as err:
222 raise HomeAssistantError(f"Failed to serialize configuration: {err}") from err
224 hass.services.async_register(
225 DOMAIN,
226 "enquire_configuration",
227 supplemental_action_enquire_configuration,
228 supports_response=SupportsResponse.ONLY,
229 )
231 return True
234async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
235 """Set up Auto Arm from a config entry."""
236 yaml_config: ConfigType = hass.data.get(YAML_DATA_KEY, {})
237 try:
238 armer = _build_armer_from_entry(hass, entry, yaml_config)
239 hass.data[HASS_DATA_KEY] = AutoArmData(armer, {})
240 await armer.initialize()
241 except Exception as err:
242 raise ConfigEntryNotReady(f"Failed to initialize Auto Arm: {err}") from err
243 entry.async_on_unload(entry.add_update_listener(_async_update_listener))
244 return True
247async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: # noqa: ARG001
248 """Unload Auto Arm config entry."""
249 if HASS_DATA_KEY in hass.data:
250 hass.data[HASS_DATA_KEY].armer.shutdown()
251 del hass.data[HASS_DATA_KEY]
252 return True
255async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
256 """Handle options update by reloading the entry."""
257 await hass.config_entries.async_reload(entry.entry_id)
260def _build_armer_from_entry(hass: HomeAssistant, entry: ConfigEntry, yaml_config: ConfigType) -> "AlarmArmer":
261 """Build an AlarmArmer instance from ConfigEntry data/options merged with YAML."""
262 migrate(hass)
264 alarm_panel: str = entry.data[CONF_ALARM_PANEL]
265 person_entities: list[str] = entry.options.get(CONF_PERSON_ENTITIES, [])
266 calendar_entities: list[str] = entry.options.get(CONF_CALENDAR_ENTITIES, [])
267 occupancy_default_day: str = entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "disarmed")
268 occupancy_default_night: str | None = entry.options.get(CONF_OCCUPANCY_DEFAULT_NIGHT, "armed_night")
269 no_event_mode: str = entry.options.get(CONF_NO_EVENT_MODE, NO_CAL_EVENT_MODE_AUTO)
271 # Build occupancy config
272 yaml_occupancy = yaml_config.get(CONF_OCCUPANCY, {})
273 occupancy_defaults: dict[str, str] = {CONF_DAY: occupancy_default_day}
274 if occupancy_default_night:
275 occupancy_defaults[CONF_NIGHT] = occupancy_default_night
276 occupancy: ConfigType = {
277 CONF_ENTITY_ID: person_entities,
278 CONF_OCCUPANCY_DEFAULT: occupancy_defaults,
279 }
280 yaml_delay_time = yaml_occupancy.get(CONF_DELAY_TIME) if isinstance(yaml_occupancy, dict) else None
281 if yaml_delay_time:
282 occupancy[CONF_DELAY_TIME] = yaml_delay_time
284 # Build calendar config
285 yaml_calendar_control = yaml_config.get(CONF_CALENDAR_CONTROL, {})
286 yaml_calendars: list[ConfigType] = yaml_calendar_control.get(CONF_CALENDARS, []) if yaml_calendar_control else []
287 yaml_cal_by_entity: dict[str, ConfigType] = {cal[CONF_ENTITY_ID]: cal for cal in yaml_calendars if CONF_ENTITY_ID in cal}
289 calendar_list: list[ConfigType] = []
290 for cal_entity_id in calendar_entities:
291 yaml_override = yaml_cal_by_entity.get(cal_entity_id, {})
292 cal_config: ConfigType = {
293 CONF_ENTITY_ID: cal_entity_id,
294 CONF_CALENDAR_POLL_INTERVAL: yaml_override.get(CONF_CALENDAR_POLL_INTERVAL, 15),
295 CONF_CALENDAR_EVENT_STATES: yaml_override.get(CONF_CALENDAR_EVENT_STATES, _validated_default_calendar_mappings()),
296 }
297 calendar_list.append(cal_config)
299 calendar_config: ConfigType = {}
300 if calendar_list:
301 calendar_config = {
302 CONF_CALENDAR_NO_EVENT: no_event_mode,
303 CONF_CALENDARS: calendar_list,
304 }
306 # Build notify config: service from options overrides YAML when explicitly set
307 notify_profiles = yaml_config.get(CONF_NOTIFY, {})
309 # Build diurnal cutoffs: options take priority, YAML is fallback
310 yaml_diurnal = yaml_config.get(CONF_DIURNAL, {}) or {}
311 yaml_sunrise = yaml_diurnal.get(CONF_SUNRISE, {}) or {}
312 yaml_sunset = yaml_diurnal.get(CONF_SUNSET, {}) or {}
314 def _parse_time(option_key: str, yaml_fallback: dt.time | None) -> dt.time | None:
315 val = entry.options.get(option_key)
316 if val is not None:
317 return cv.time(val) if isinstance(val, str) else val
318 return yaml_fallback
320 return AlarmArmer(
321 hass,
322 alarm_panel=alarm_panel,
323 sunrise_earliest=_parse_time(CONF_SUNRISE_EARLIEST, yaml_sunrise.get(CONF_EARLIEST)),
324 sunrise_latest=_parse_time(CONF_SUNRISE_LATEST, yaml_sunrise.get(CONF_LATEST)),
325 sunset_earliest=_parse_time(CONF_SUNSET_EARLIEST, yaml_sunset.get(CONF_EARLIEST)),
326 sunset_latest=_parse_time(CONF_SUNSET_LATEST, yaml_sunset.get(CONF_LATEST)),
327 buttons=yaml_config.get(CONF_BUTTONS, {}),
328 occupancy=occupancy,
329 notify_profiles=notify_profiles,
330 notify_enabled=entry.options.get(CONF_NOTIFY_ENABLED, False),
331 notify_action=entry.options.get(CONF_NOTIFY_ACTION),
332 notify_targets=entry.options.get(CONF_NOTIFY_TARGETS, []),
333 rate_limit=yaml_config.get(CONF_RATE_LIMIT, {}),
334 calendar_config=calendar_config,
335 transitions=yaml_config.get(CONF_TRANSITIONS),
336 )
339def _validated_default_calendar_mappings() -> dict[str, list[re.Pattern[str]]]:
340 """Build default calendar event state mappings with compiled regex patterns.
342 Mirrors the schema validation that CALENDAR_SCHEMA applies (ensure_list + is_regex).
343 """
344 from .const import DEFAULT_CALENDAR_MAPPINGS
346 result: dict[str, list[re.Pattern[str]]] = {}
347 for state, patterns in DEFAULT_CALENDAR_MAPPINGS.items():
348 state_str = str(state)
349 if isinstance(patterns, str):
350 patterns = [patterns]
351 result[state_str] = [re.compile(p) if isinstance(p, str) else p for p in patterns]
352 return result
355def migrate(hass: HomeAssistant) -> None:
356 for entity_id in (
357 "autoarm.configured",
358 "autoarm.last_calendar_event",
359 "autoarm.last_intervention",
360 "autoarm.initialized",
361 "autoarm.last_calculation",
362 ):
363 try:
364 if hass.states.get(entity_id):
365 _LOGGER.info("AUTOARM Migration removing legacy entity_id: %s", entity_id)
366 hass.states.async_remove(entity_id)
367 except Exception as e:
368 _LOGGER.warning("AUTOARM Migration fail for %s:%s", entity_id, e)
371def unlisten(listener: Callable[[], None] | None) -> None:
372 if listener:
373 try:
374 listener()
375 except Exception as e:
376 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e)
379@dataclass
380class Intervention:
381 """Record of a manual intervention, such as a button push, mobile action or alarm panel change"""
383 created_at: dt.datetime
384 source: ChangeSource
385 state: AlarmControlPanelState | None
387 def as_dict(self) -> dict[str, str | None]:
388 return {
389 "created_at": self.created_at.isoformat(),
390 "source": str(self.source),
391 "state": str(self.state) if self.state is not None else None,
392 }
395class AlarmArmer:
396 def __init__(
397 self,
398 hass: HomeAssistant,
399 alarm_panel: str,
400 buttons: dict[str, ConfigType] | None = None,
401 occupancy: ConfigType | None = None,
402 actions: list[str] | None = None,
403 notify_enabled: bool = True,
404 notify_action: str | None = None,
405 notify_targets: list[str] | None = None,
406 notify_profiles: ConfigType | None = None,
407 sunrise_earliest: dt.time | None = None,
408 sunrise_latest: dt.time | None = None,
409 sunset_earliest: dt.time | None = None,
410 sunset_latest: dt.time | None = None,
411 rate_limit: ConfigType | None = None,
412 calendar_config: ConfigType | None = None,
413 transitions: dict[str, dict[str, list[ConfigType]]] | None = None,
414 ) -> None:
415 occupancy = occupancy or {}
416 rate_limit = rate_limit or {}
418 self.hass: HomeAssistant = hass
419 self.app_health_tracker: AppHealthTracker = AppHealthTracker(hass)
420 if notify_enabled and not notify_profiles and not notify_action:
421 _LOGGER.warning("AUTOARM Notification disabled - no config")
422 notify_enabled = False
423 if notify_enabled:
424 self.notifier: Notifier | None = Notifier(
425 notify_profiles, hass, self.app_health_tracker, notify_action, notify_targets
426 )
427 else:
428 self.notifier = None
429 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone)
430 calendar_config = calendar_config or {}
431 self.calendar_configs: list[ConfigType] = calendar_config.get(CONF_CALENDARS, []) or []
432 self.calendars: list[TrackedCalendar] = []
433 self.calendar_no_event_mode: str | None = calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO)
434 self.alarm_panel: str = alarm_panel
435 self.sunrise_earliest: dt.time | None = sunrise_earliest
436 self.sunrise_latest: dt.time | None = sunrise_latest
437 self.sunset_earliest: dt.time | None = sunset_earliest
438 self.sunset_latest: dt.time | None = sunset_latest
439 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, [])
440 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get(
441 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME}
442 )
443 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {})
444 self.buttons: ConfigType = buttons or {}
446 self.actions: list[str] = actions or []
447 self.unsubscribes: list[Callable[[], None]] = []
448 self.pre_pending_state: AlarmControlPanelState | None = None
449 self.button_device: dict[str, str] = {}
450 self.arming_in_progress: asyncio.Event = asyncio.Event()
452 self.rate_limiter: Limiter = Limiter(
453 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)),
454 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5),
455 )
457 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass)
458 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {}
459 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {}
461 self.interventions: list[Intervention] = []
462 self.intervention_ttl: int = 60
464 async def initialize(self) -> None:
465 """Async initialization"""
466 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars))
468 self.initialize_alarm_panel()
469 await self.initialize_calendar()
470 await self.initialize_logic()
471 self.initialize_diurnal()
472 self.initialize_occupancy()
473 self.initialize_buttons()
474 self.initialize_integration()
475 self.initialize_housekeeping()
476 self.initialize_home_assistant()
477 await self.reset_armed_state(source=ChangeSource.STARTUP)
479 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state())
481 def initialize_home_assistant(self) -> None:
482 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once(
483 EVENT_HOMEASSISTANT_STOP, self.async_shutdown
484 )
485 self.app_health_tracker.app_initialized()
486 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calculation", "unavailable", attributes={})
488 self.hass.services.async_register(
489 DOMAIN,
490 "reset_state",
491 self.reset_service,
492 supports_response=SupportsResponse.OPTIONAL,
493 )
495 async def reset_service(self, _call: ServiceCall) -> ServiceResponse:
496 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None))
497 return {"change": new_state or "NO_CHANGE"}
499 def initialize_integration(self) -> None:
500 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", "unavailable", attributes={})
502 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
504 def initialize_alarm_panel(self) -> None:
505 """Set up automation for Home Assistant alarm panel
507 See https://www.home-assistant.io/integrations/alarm_control_panel/
509 Succeeds even if control panel has not yet started, listener will pick up events when it does
510 """
511 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change))
512 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel)
514 def initialize_housekeeping(self) -> None:
515 self.unsubscribes.append(
516 async_track_time_change(
517 self.hass,
518 action=self.housekeeping,
519 minute=0,
520 )
521 )
523 def initialize_diurnal(self) -> None:
524 # events API expects a function, however underlying HassJob is fine with coroutines
525 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore
526 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore
527 if self.sunrise_latest:
528 self.unsubscribes.append(
529 async_track_time_change(
530 self.hass,
531 self.on_sunrise_latest,
532 hour=self.sunrise_latest.hour,
533 minute=self.sunrise_latest.minute,
534 second=self.sunrise_latest.second,
535 )
536 )
537 if self.sunset_latest:
538 self.unsubscribes.append(
539 async_track_time_change(
540 self.hass,
541 self.on_sunset_latest,
542 hour=self.sunset_latest.hour,
543 minute=self.sunset_latest.minute,
544 second=self.sunset_latest.second,
545 )
546 )
548 def initialize_occupancy(self) -> None:
549 """Configure occupants, and listen for changes in their state"""
550 if self.occupants:
551 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants))
552 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change))
553 else:
554 _LOGGER.info("AUTOARM Occupancy not configured")
556 def initialize_buttons(self) -> None:
557 """Initialize (optional) physical alarm state control buttons"""
559 def setup_button(state_name: str, button_entity: str, cb: Callable) -> None:
560 self.button_device[state_name] = button_entity
561 if self.button_device[state_name]:
562 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb))
564 _LOGGER.debug(
565 "AUTOARM Configured %s button for %s",
566 state_name,
567 self.button_device[state_name],
568 )
570 for button_use, button_config in self.buttons.items():
571 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME)
572 for entity_id in button_config[CONF_ENTITY_ID]:
573 if button_use == ATTR_RESET:
574 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay))
575 else:
576 setup_button(
577 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay)
578 )
580 async def initialize_calendar(self) -> None:
581 """Configure calendar polling (optional)"""
582 stage: str = "calendar"
583 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calendar_event", "unavailable", attributes={})
584 if not self.calendar_configs:
585 return
586 try:
587 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN)
588 if platforms:
589 platform: entity_platform.EntityPlatform = platforms[0]
590 else:
591 self.app_health_tracker.record_initialization_error(stage)
592 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant")
593 return
594 except Exception as _e:
595 self.app_health_tracker.record_initialization_error(stage)
596 _LOGGER.exception("AUTOARM Unable to access calendar platform")
597 return
598 for calendar_config in self.calendar_configs:
599 tracked_calendar = TrackedCalendar(
600 self.hass, calendar_config, self.calendar_no_event_mode, self, self.app_health_tracker
601 )
602 await tracked_calendar.initialize(platform)
603 self.calendars.append(tracked_calendar)
605 async def initialize_logic(self) -> None:
606 stage: str = "logic"
607 for state_str, raw_condition in DEFAULT_TRANSITIONS.items():
608 if state_str not in self.transition_config:
609 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str)
610 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)}
612 for state_str, transition_config in self.transition_config.items():
613 error: str = ""
614 condition_config = transition_config.get(CONF_CONDITIONS)
615 if condition_config is None:
616 error = "Empty conditions"
617 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition")
618 else:
619 try:
620 state = AlarmControlPanelState(state_str)
621 cond: ConditionCheckerType | None = await self.hass_api.build_condition(
622 condition_config, strict=True, validate=True, name=state_str
623 )
625 if cond:
626 # re-run without strict wrapper
627 cond = await self.hass_api.build_condition(condition_config, name=state_str)
628 if cond:
629 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}")
630 self.transitions[state] = cond
631 else:
632 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}")
633 error = "Condition validation failed"
634 except ValueError as ve:
635 self.app_health_tracker.record_initialization_error(stage)
636 error = f"Invalid state {ve}"
637 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}")
638 except vol.Invalid as vi:
639 self.app_health_tracker.record_initialization_error(stage)
640 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}")
641 error = f"Schema error {vi}"
642 except ConditionError as ce:
643 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}")
644 if hasattr(ce, "message"):
645 error = ce.message # type: ignore
646 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore[attr-defined]
647 error = ce.error.message # type: ignore
648 else:
649 error = str(ce)
650 except Exception as e:
651 self.app_health_tracker.record_initialization_error(stage)
652 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config)
653 error = f"Unknown exception {e}"
654 if error:
655 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}")
656 self.hass_api.raise_issue(
657 f"transition_condition_{state_str}",
658 is_fixable=False,
659 issue_key="transition_condition",
660 issue_map={"state": state_str, "error": error},
661 severity=ir.IssueSeverity.ERROR,
662 )
664 async def async_shutdown(self, _event: Event) -> None:
665 _LOGGER.info("AUTOARM shut down event received")
666 self.stop_listener = None
667 self.shutdown()
669 def shutdown(self) -> None:
670 _LOGGER.info("AUTOARM shutting down")
671 for calendar in self.calendars:
672 calendar.shutdown()
673 while self.unsubscribes:
674 unlisten(self.unsubscribes.pop())
675 unlisten(self.stop_listener)
676 self.stop_listener = None
677 _LOGGER.info("AUTOARM shut down")
679 def active_calendar_event(self) -> CalendarEvent | None:
680 events: list[CalendarEvent] = []
681 for cal in self.calendars:
682 events.extend(cal.active_events())
683 if events:
684 # TODO: consider sorting events to LIFO
685 return events[0]
686 return None
688 def has_active_calendar_event(self) -> bool:
689 return any(cal.has_active_event() for cal in self.calendars)
691 def is_occupied(self) -> bool | None:
692 """Ternary - true at least one person entity has state home, false none of them, null if no occupants defined"""
693 if self.occupants:
694 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants)
695 return None
697 def at_home(self) -> list[str] | None:
698 if self.occupants:
699 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME]
700 return None
702 def not_home(self) -> list[str] | None:
703 if self.occupants:
704 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME]
705 return None
707 def is_unoccupied(self) -> bool | None:
708 """Ternary - false at least one person entity has state home, true none of them, null if no occupants defined"""
709 if self.occupants:
710 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants)
711 return None
713 def is_night(self) -> bool:
714 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON
716 def armed_state(self) -> AlarmControlPanelState:
717 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel))
718 alarm_state = alarm_state_as_enum(raw_state)
719 if alarm_state is None:
720 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING")
721 return AlarmControlPanelState.PENDING
722 return alarm_state
724 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, str]]:
725 entity_id = old = new = None
726 new_attributes: dict[str, str] = {}
727 if event and event.data:
728 entity_id = event.data.get("entity_id")
729 old_obj = event.data.get("old_state")
730 if old_obj:
731 old = old_obj.state
732 new_obj = event.data.get("new_state")
733 if new_obj:
734 new = new_obj.state
735 new_attributes = new_obj.attributes
736 return entity_id, old, new, new_attributes
738 async def pending_state(self, source: ChangeSource | None) -> None:
739 self.pre_pending_state = self.armed_state()
740 await self.arm(AlarmControlPanelState.PENDING, source=source)
742 @callback
743 async def delayed_reset_armed_state(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None:
744 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
745 if self.is_intervention_since_request(requested_at):
746 return
747 await self.reset_armed_state(**kwargs)
749 async def reset_armed_state(
750 self, intervention: Intervention | None = None, source: ChangeSource | None = None
751 ) -> str | None:
752 """Logic to automatically work out appropriate current armed state"""
753 state: AlarmControlPanelState | None = None
754 existing_state: AlarmControlPanelState | None = None
755 must_change_state: bool = False
756 last_state_intervention: Intervention | None = None
757 active_calendar_event: CalendarEvent | None = None
759 if source is None and intervention is not None:
760 source = intervention.source
761 _LOGGER.debug(
762 "AUTOARM reset_armed_state(intervention=%s,source=%s)",
763 intervention,
764 source,
765 )
767 try:
768 existing_state = self.armed_state()
769 state = existing_state
770 if self.calendars:
771 active_calendar_event = self.active_calendar_event()
772 if active_calendar_event:
773 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active")
774 return existing_state
775 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL:
776 _LOGGER.debug(
777 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual"
778 )
779 return existing_state
780 if self.calendar_no_event_mode in AlarmControlPanelState:
781 # TODO: may be dupe logic with on_cal event
782 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode)
783 return await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), ChangeSource.CALENDAR)
784 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO:
785 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto")
786 else:
787 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode)
789 # TODO: expose as config ( for manual disarm override ) and condition logic
790 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING
791 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state:
792 _LOGGER.debug("AUTOARM Ignoring previous interventions")
793 else:
794 last_state_intervention = self.last_state_intervention()
795 if last_state_intervention:
796 _LOGGER.debug(
797 "AUTOARM Ignoring automated reset for %s set by %s at %s",
798 last_state_intervention.state,
799 last_state_intervention.source,
800 last_state_intervention.created_at,
801 )
802 return existing_state
803 state = self.determine_state()
804 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state:
805 state = await self.arm(state, source=source)
806 finally:
807 self.hass.states.async_set(
808 f"sensor.{DOMAIN}_last_calculation",
809 str(state is not None and state != existing_state),
810 attributes={
811 "new_state": str(state),
812 "old_state": str(existing_state),
813 "source": source,
814 "active_calendar_event": deobjectify(active_calendar_event),
815 "occupied": self.is_occupied(),
816 "night": self.is_night(),
817 "must_change_state": str(must_change_state),
818 "last_state_intervention": deobjectify(last_state_intervention),
819 "intervention": intervention.as_dict() if intervention else None,
820 "time": dt_util.now().isoformat(),
821 },
822 )
824 return state
826 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool:
827 if requested_at is not None and self.has_intervention_since(requested_at):
828 _LOGGER.debug(
829 "AUTOARM Cancelling delayed operation since subsequent manual action",
830 )
831 return True
832 return False
834 def determine_state(self) -> AlarmControlPanelState | None:
835 """Compute a new state using occupancy, sun and transition conditions"""
836 evaluated_state: AlarmControlPanelState | None = None
837 condition_vars: ConditionVariables = ConditionVariables(
838 occupied=self.is_occupied(),
839 unoccupied=self.is_unoccupied(),
840 night=self.is_night(),
841 state=self.armed_state(),
842 calendar_event=self.active_calendar_event(),
843 occupied_defaults=self.occupied_defaults,
844 at_home=self.at_home(),
845 not_home=self.not_home(),
846 )
847 for state, checker in self.transitions.items():
848 if self.hass_api.evaluate_condition(checker, condition_vars):
849 _LOGGER.debug("AUTOARM Computed state as %s from condition", state)
850 evaluated_state = state
851 break
852 if evaluated_state is None:
853 return None
854 return AlarmControlPanelState(evaluated_state)
856 @callback
857 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs: Any) -> None:
858 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
859 if self.is_intervention_since_request(requested_at):
860 return
861 await self.arm(**kwargs)
863 async def arm(
864 self, arming_state: AlarmControlPanelState | None, source: ChangeSource | None = None
865 ) -> AlarmControlPanelState | None:
866 """Change alarm panel state
868 Args:
869 ----
870 arming_state (str, optional): _description_. Defaults to None.
871 source (str,optional): Source of the change, for example 'calendar' or 'button'
873 Returns:
874 -------
875 str: New arming state
877 """
878 if arming_state is None:
879 return None
880 if self.armed_state() == arming_state:
881 return None
882 if self.rate_limiter.triggered():
883 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source)
884 return None
885 try:
886 self.arming_in_progress.set()
887 existing_state: AlarmControlPanelState | None = self.armed_state()
888 if arming_state != existing_state:
889 attrs: dict[str, str] = {}
890 panel_state: State | None = self.hass.states.get(self.alarm_panel)
891 if panel_state:
892 attrs.update(panel_state.attributes)
893 attrs[ATTR_CHANGED_BY] = f"{DOMAIN}.{source}"
894 self.hass.states.async_set(entity_id=self.alarm_panel, new_state=str(arming_state), attributes=attrs)
895 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source)
896 if self.notifier and source and arming_state:
897 await self.notifier.notify(source=source, from_state=existing_state, to_state=arming_state)
898 return arming_state
899 _LOGGER.debug("Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state)
900 return existing_state
901 except Exception as e:
902 _LOGGER.error("AUTOARM Failed to arm: %s", e)
903 self.app_health_tracker.record_runtime_error()
904 finally:
905 self.arming_in_progress.clear()
906 return None
908 def schedule_state(
909 self,
910 trigger_time: dt.datetime,
911 state: AlarmControlPanelState | None,
912 intervention: Intervention | None,
913 source: ChangeSource | None = None,
914 ) -> None:
915 source = source or intervention.source if intervention else None
917 job: Callable
918 if state is None:
919 _LOGGER.debug("Delayed reset, triggered at: %s, source%s", trigger_time, source)
920 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now())
921 else:
922 _LOGGER.debug("Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source)
924 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now())
926 self.unsubscribes.append(
927 async_track_point_in_time(
928 self.hass,
929 job,
930 trigger_time,
931 )
932 )
934 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention:
935 intervention = Intervention(dt_util.now(), source, state)
936 self.interventions.append(intervention)
937 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", source, attributes=intervention.as_dict())
939 return intervention
941 def has_intervention_since(self, cutoff: dt.datetime) -> bool:
942 """Has there been a manual intervention since the cutoff time"""
943 if not self.interventions:
944 return False
945 return any(intervention.created_at > cutoff for intervention in self.interventions)
947 def last_state_intervention(self) -> Intervention | None:
948 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None]
949 if candidates:
950 return candidates[-1]
951 return None
953 @callback
954 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002
955 _LOGGER.debug("AUTOARM Sunrise")
956 now = dt_util.now()
957 if not self.sunrise_earliest or now.time() >= self.sunrise_earliest:
958 await self.reset_armed_state(source=ChangeSource.SUNRISE)
959 else:
960 _LOGGER.debug("AUTOARM Rescheduling delayed sunrise action to %s", self.sunrise_earliest)
961 self.schedule_state(
962 dt.datetime.combine(now.date(), self.sunrise_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE),
963 intervention=None,
964 state=None,
965 source=ChangeSource.SUNRISE,
966 )
968 @callback
969 async def on_sunrise_latest(self, *args: Any) -> None: # noqa: ARG002
970 _LOGGER.debug("AUTOARM Sunrise latest cutoff reached")
971 await self.reset_armed_state(source=ChangeSource.SUNRISE)
973 @callback
974 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002
975 _LOGGER.debug("AUTOARM Sunset")
976 now = dt_util.now()
977 if not self.sunset_earliest or now.time() >= self.sunset_earliest:
978 await self.reset_armed_state(source=ChangeSource.SUNSET)
979 else:
980 _LOGGER.debug("AUTOARM Rescheduling delayed sunset action to %s", self.sunset_earliest)
981 self.schedule_state(
982 dt.datetime.combine(now.date(), self.sunset_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE),
983 intervention=None,
984 state=None,
985 source=ChangeSource.SUNSET,
986 )
988 @callback
989 async def on_sunset_latest(self, *args: Any) -> None: # noqa: ARG002
990 _LOGGER.debug("AUTOARM Sunset latest cutoff reached")
991 await self.reset_armed_state(source=ChangeSource.SUNSET)
993 @callback
994 async def on_mobile_action(self, event: Event) -> None:
995 _LOGGER.debug("AUTOARM Mobile Action: %s", event)
996 source: ChangeSource = ChangeSource.MOBILE
998 match event.data.get("action"):
999 case "ALARM_PANEL_DISARM":
1000 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED)
1001 await self.arm(AlarmControlPanelState.DISARMED, source=source)
1002 case "ALARM_PANEL_RESET":
1003 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
1004 case "ALARM_PANEL_AWAY":
1005 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY)
1006 await self.arm(AlarmControlPanelState.ARMED_AWAY, source=source)
1007 case _:
1008 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data)
1010 @callback
1011 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None:
1012 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event)
1013 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state)
1014 if delay:
1015 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON)
1016 if self.notifier:
1017 await self.notifier.notify(
1018 ChangeSource.BUTTON,
1019 from_state=self.armed_state(),
1020 to_state=state,
1021 message=f"Alarm will be set to {state} in {delay}",
1022 title=f"Arm set to {state} process starting",
1023 )
1024 else:
1025 await self.arm(state, source=ChangeSource.BUTTON)
1027 @callback
1028 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None:
1029 _LOGGER.debug("AUTOARM Reset Button: %s", event)
1030 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None)
1031 if delay:
1032 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON)
1033 if self.notifier:
1034 await self.notifier.notify(
1035 ChangeSource.BUTTON,
1036 message=f"Alarm will be reset in {delay}",
1037 title="Alarm reset wait initiated",
1038 )
1039 else:
1040 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
1042 @callback
1043 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None:
1044 """Listen for person state events
1046 Args:
1047 ----
1048 event (Event[EventStateChangedData]): state change event
1050 """
1051 entity_id, old, new, new_attributes = self._extract_event(event)
1052 if old == new:
1053 _LOGGER.debug(
1054 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s",
1055 entity_id,
1056 old,
1057 new,
1058 event,
1059 new_attributes,
1060 )
1061 return
1062 _LOGGER.debug(
1063 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes
1064 )
1065 if new in self.occupied_delay:
1066 self.schedule_state(
1067 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY
1068 )
1069 else:
1070 await self.reset_armed_state(source=ChangeSource.OCCUPANCY)
1072 @callback
1073 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None:
1074 """Alarm Control Panel has been changed outside of AutoArm"""
1075 entity_id, old, new, new_attributes = self._extract_event(event)
1076 if new_attributes:
1077 changed_by = new_attributes.get(ATTR_CHANGED_BY)
1078 if changed_by and changed_by.startswith(f"{DOMAIN}."):
1079 _LOGGER.debug(
1080 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s",
1081 entity_id,
1082 event.event_type,
1083 old,
1084 new,
1085 )
1086 return
1087 new_state: AlarmControlPanelState | None = alarm_state_as_enum(new)
1088 old_state: AlarmControlPanelState | None = alarm_state_as_enum(old)
1090 _LOGGER.info(
1091 "AUTOARM Panel Change: %s,%s: %s-->%s",
1092 entity_id,
1093 event.event_type,
1094 old,
1095 new,
1096 )
1097 self.record_intervention(ChangeSource.ALARM_PANEL, new_state)
1098 if new in ZOMBIE_STATES:
1099 _LOGGER.warning("AUTOARM Dezombifying %s ...", new)
1100 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION)
1101 elif new != old:
1102 if self.notifier:
1103 await self.notifier.notify(ChangeSource.ALARM_PANEL, old_state, new_state)
1104 else:
1105 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new)
1107 @callback
1108 async def housekeeping(self, triggered_at: dt.datetime) -> None:
1109 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at)
1110 now = dt_util.now()
1111 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)]
1112 for cal in self.calendars:
1113 await cal.prune_events()
1114 _LOGGER.debug("AUTOARM Housekeeping finished")