Coverage for custom_components/autoarm/autoarming.py: 86%
618 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-05-22 21:57 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-05-22 21:57 +0000
1import asyncio
2import datetime as dt
3import json
4import logging
5import re
6from collections.abc import Callable, Coroutine
7from dataclasses import dataclass
8from functools import partial
9from typing import TYPE_CHECKING, Any, cast
11import homeassistant.util.dt as dt_util
12import voluptuous as vol
13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState
14from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN
15from homeassistant.components.sun.const import STATE_BELOW_HORIZON
16from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
17from homeassistant.const import (
18 CONF_CONDITIONS,
19 CONF_DELAY_TIME,
20 CONF_ENTITY_ID,
21 CONF_SERVICE,
22 EVENT_HOMEASSISTANT_STOP,
23 SERVICE_RELOAD,
24 STATE_HOME,
25)
26from homeassistant.core import (
27 Event,
28 EventStateChangedData,
29 HomeAssistant,
30 ServiceCall,
31 ServiceResponse,
32 State,
33 SupportsResponse,
34 callback,
35)
36from homeassistant.exceptions import ConditionError, ConfigEntryNotReady, HomeAssistantError
37from homeassistant.helpers import condition as condition
38from homeassistant.helpers import config_validation as cv
39from homeassistant.helpers import entity_platform
40from homeassistant.helpers import issue_registry as ir
41from homeassistant.helpers.event import (
42 async_track_point_in_time,
43 async_track_state_change_event,
44 async_track_sunrise,
45 async_track_sunset,
46 async_track_time_change,
47)
48from homeassistant.helpers.reload import (
49 async_integration_yaml_config,
50)
51from homeassistant.helpers.service import async_register_admin_service
52from homeassistant.helpers.typing import ConfigType
53from homeassistant.util.hass_dict import HassKey
55from custom_components.autoarm.hass_api import HomeAssistantAPI
56from custom_components.autoarm.notifier import Notifier
58from .calendar_events import TrackedCalendar, TrackedCalendarEvent
59from .config_flow import (
60 CONF_CALENDAR_ENTITIES,
61 CONF_CALENDAR_OCCUPANCY_OVERRIDE_STATES,
62 CONF_NO_EVENT_MODE,
63 CONF_NOTIFY_ACTION,
64 CONF_NOTIFY_ENABLED,
65 CONF_NOTIFY_TARGETS,
66 CONF_OCCUPANCY_DEFAULT_DAY,
67 CONF_OCCUPANCY_DEFAULT_NIGHT,
68 CONF_PERSON_ENTITIES,
69 CONF_SUNRISE_EARLIEST,
70 CONF_SUNRISE_LATEST,
71 CONF_SUNSET_EARLIEST,
72 CONF_SUNSET_LATEST,
73 DEFAULT_CALENDAR_OCCUPANCY_OVERRIDE_STATES,
74 DEFAULT_NOTIFY_ACTION,
75)
76from .const import (
77 ATTR_RESET,
78 CONF_ALARM_PANEL,
79 CONF_BUTTONS,
80 CONF_CALENDAR_CONTROL,
81 CONF_CALENDAR_EVENT_STATES,
82 CONF_CALENDAR_NO_EVENT,
83 CONF_CALENDAR_POLL_INTERVAL,
84 CONF_CALENDARS,
85 CONF_DAY,
86 CONF_DIURNAL,
87 CONF_EARLIEST,
88 CONF_LATEST,
89 CONF_NIGHT,
90 CONF_NOTIFY,
91 CONF_OCCUPANCY,
92 CONF_OCCUPANCY_DEFAULT,
93 CONF_RATE_LIMIT,
94 CONF_RATE_LIMIT_CALLS,
95 CONF_RATE_LIMIT_PERIOD,
96 CONF_SUNRISE,
97 CONF_SUNSET,
98 CONF_TRANSITIONS,
99 CONFIG_SCHEMA,
100 DEFAULT_TRANSITIONS,
101 DOMAIN,
102 NO_CAL_EVENT_MODE_AUTO,
103 NO_CAL_EVENT_MODE_MANUAL,
104 NOTIFY_COMMON,
105 YAML_DATA_KEY,
106 ChangeSource,
107 ConditionVariables,
108)
109from .helpers import (
110 AppHealthTracker,
111 ExtendedExtendedJSONEncoder,
112 Limiter,
113 alarm_state_as_enum,
114 change_source_as_enum,
115 deobjectify,
116 safe_state,
117)
119if TYPE_CHECKING:
120 from collections.abc import Mapping
122 ConditionCheckerType = Callable[[Mapping[str, Any] | None], bool]
124_LOGGER = logging.getLogger(__name__)
126OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS)
127EPHEMERAL_STATES = (
128 AlarmControlPanelState.PENDING,
129 AlarmControlPanelState.ARMING,
130 AlarmControlPanelState.DISARMING,
131 AlarmControlPanelState.TRIGGERED,
132)
133ZOMBIE_STATES = ("unknown", "unavailable")
134NS_MOBILE_ACTIONS = "mobile_actions"
135PLATFORMS = ["autoarm"]
137HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN)
140@dataclass
141class AutoArmData:
142 armer: "AlarmArmer"
143 other_data: dict[str, str | dict[str, str] | list[str] | int | float | bool | None]
146async def async_setup( # noqa: RUF029
147 hass: HomeAssistant,
148 config: ConfigType,
149) -> bool:
150 _ = CONFIG_SCHEMA
151 yaml_config: ConfigType = config.get(DOMAIN, {})
152 if yaml_config or YAML_DATA_KEY not in hass.data:
153 hass.data[YAML_DATA_KEY] = yaml_config
155 has_alarm_panel = CONF_ALARM_PANEL in yaml_config
156 existing_entries = hass.config_entries.async_entries(DOMAIN)
158 if has_alarm_panel and not existing_entries:
159 _LOGGER.info("AUTOARM Triggering import of YAML configuration to ConfigEntry")
160 hass.async_create_task(hass.config_entries.flow.async_init(DOMAIN, context={"source": SOURCE_IMPORT}, data=yaml_config))
161 elif has_alarm_panel and existing_entries:
162 _LOGGER.warning("AUTOARM YAML core config present but ConfigEntry already exists; ignoring YAML core settings")
163 ir.async_create_issue(
164 hass,
165 DOMAIN,
166 "yaml_core_config_deprecated",
167 is_fixable=False,
168 severity=ir.IssueSeverity.WARNING,
169 translation_key="yaml_core_config_deprecated",
170 )
172 async def reload_service_handler(service_call: ServiceCall) -> None:
173 """Reload yaml entities."""
174 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data)
175 try:
176 fresh_config = await async_integration_yaml_config(hass, DOMAIN)
177 except HomeAssistantError as err:
178 raise HomeAssistantError(f"Failed to reload YAML configuration: {err}") from err
179 if fresh_config is not None and DOMAIN in fresh_config:
180 hass.data[YAML_DATA_KEY] = fresh_config[DOMAIN]
181 else:
182 hass.data[YAML_DATA_KEY] = {}
183 entries = hass.config_entries.async_entries(DOMAIN)
184 for entry in entries:
185 await hass.config_entries.async_reload(entry.entry_id)
187 async_register_admin_service(
188 hass,
189 DOMAIN,
190 SERVICE_RELOAD,
191 reload_service_handler,
192 )
194 def supplemental_action_enquire_configuration(_call: ServiceCall) -> ConfigType:
195 entries = hass.config_entries.async_entries(DOMAIN)
196 if not entries:
197 raise HomeAssistantError("No config entry found for AutoArm")
198 entry = entries[0]
199 stashed_yaml = hass.data.get(YAML_DATA_KEY, {})
200 data: ConfigType = {
201 CONF_ALARM_PANEL: entry.data.get(CONF_ALARM_PANEL),
202 CONF_DIURNAL: {
203 CONF_SUNRISE: {
204 CONF_EARLIEST: entry.options.get(CONF_SUNRISE_EARLIEST),
205 CONF_LATEST: entry.options.get(CONF_SUNRISE_LATEST),
206 },
207 CONF_SUNSET: {
208 CONF_EARLIEST: entry.options.get(CONF_SUNSET_EARLIEST),
209 CONF_LATEST: entry.options.get(CONF_SUNSET_LATEST),
210 },
211 },
212 CONF_CALENDAR_CONTROL: stashed_yaml.get(CONF_CALENDAR_CONTROL),
213 CONF_BUTTONS: stashed_yaml.get(CONF_BUTTONS, {}),
214 CONF_OCCUPANCY: {
215 CONF_ENTITY_ID: entry.options.get(CONF_PERSON_ENTITIES, []),
216 CONF_OCCUPANCY_DEFAULT: {
217 CONF_DAY: entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "armed_home"),
218 },
219 },
220 CONF_NOTIFY: {
221 CONF_SERVICE: entry.options.get(CONF_NOTIFY_ACTION)
222 or stashed_yaml.get(CONF_NOTIFY, {}).get(NOTIFY_COMMON, {}).get(CONF_SERVICE, DEFAULT_NOTIFY_ACTION),
223 "targets": entry.options.get(CONF_NOTIFY_TARGETS, []),
224 "profiles": stashed_yaml.get(CONF_NOTIFY, {}),
225 "enabled": entry.options.get(CONF_NOTIFY_ENABLED, True),
226 },
227 CONF_RATE_LIMIT: stashed_yaml.get(CONF_RATE_LIMIT, {}),
228 }
229 try:
230 jsonized: str = json.dumps(obj=data, cls=ExtendedExtendedJSONEncoder)
231 return cast("dict[str,Any]", json.loads(jsonized))
232 except Exception as err:
233 raise HomeAssistantError(f"Failed to serialize configuration: {err}") from err
235 hass.services.async_register(
236 DOMAIN,
237 "enquire_configuration",
238 supplemental_action_enquire_configuration,
239 supports_response=SupportsResponse.ONLY,
240 )
242 return True
245async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
246 """Set up Auto Arm from a config entry."""
247 yaml_config: ConfigType = hass.data.get(YAML_DATA_KEY, {})
248 try:
249 armer = _build_armer_from_entry(hass, entry, yaml_config)
250 hass.data[HASS_DATA_KEY] = AutoArmData(armer, {})
251 await armer.initialize()
252 except Exception as err:
253 raise ConfigEntryNotReady(f"Failed to initialize Auto Arm: {err}") from err
254 entry.async_on_unload(entry.add_update_listener(_async_update_listener))
255 return True
258async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: # noqa: ARG001, RUF029
259 """Unload Auto Arm config entry."""
260 if HASS_DATA_KEY in hass.data:
261 hass.data[HASS_DATA_KEY].armer.shutdown()
262 del hass.data[HASS_DATA_KEY]
263 return True
266async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
267 """Handle options update by reloading the entry."""
268 await hass.config_entries.async_reload(entry.entry_id)
271def _build_armer_from_entry(hass: HomeAssistant, entry: ConfigEntry, yaml_config: ConfigType) -> "AlarmArmer":
272 """Build an AlarmArmer instance from ConfigEntry data/options merged with YAML."""
273 migrate(hass)
275 alarm_panel: str = entry.data[CONF_ALARM_PANEL]
276 person_entities: list[str] = entry.options.get(CONF_PERSON_ENTITIES, [])
277 calendar_entities: list[str] = entry.options.get(CONF_CALENDAR_ENTITIES, [])
278 occupancy_default_day: str = entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "disarmed")
279 occupancy_default_night: str | None = entry.options.get(CONF_OCCUPANCY_DEFAULT_NIGHT, "armed_night")
280 no_event_mode: str = entry.options.get(CONF_NO_EVENT_MODE, NO_CAL_EVENT_MODE_AUTO)
282 # Build occupancy config
283 yaml_occupancy = yaml_config.get(CONF_OCCUPANCY, {})
284 occupancy_defaults: dict[str, str] = {CONF_DAY: occupancy_default_day}
285 if occupancy_default_night:
286 occupancy_defaults[CONF_NIGHT] = occupancy_default_night
287 occupancy: ConfigType = {
288 CONF_ENTITY_ID: person_entities,
289 CONF_OCCUPANCY_DEFAULT: occupancy_defaults,
290 }
291 yaml_delay_time = yaml_occupancy.get(CONF_DELAY_TIME) if isinstance(yaml_occupancy, dict) else None
292 if yaml_delay_time:
293 occupancy[CONF_DELAY_TIME] = yaml_delay_time
295 # Build calendar config
296 yaml_calendar_control = yaml_config.get(CONF_CALENDAR_CONTROL, {})
297 yaml_calendars: list[ConfigType] = yaml_calendar_control.get(CONF_CALENDARS, []) if yaml_calendar_control else []
298 yaml_cal_by_entity: dict[str, ConfigType] = {cal[CONF_ENTITY_ID]: cal for cal in yaml_calendars if CONF_ENTITY_ID in cal}
300 calendar_list: list[ConfigType] = []
301 for cal_entity_id in calendar_entities:
302 yaml_override = yaml_cal_by_entity.get(cal_entity_id, {})
303 cal_config: ConfigType = {
304 CONF_ENTITY_ID: cal_entity_id,
305 CONF_CALENDAR_POLL_INTERVAL: yaml_override.get(CONF_CALENDAR_POLL_INTERVAL, 15),
306 CONF_CALENDAR_EVENT_STATES: yaml_override.get(CONF_CALENDAR_EVENT_STATES, _validated_default_calendar_mappings()),
307 }
308 calendar_list.append(cal_config)
310 calendar_config: ConfigType = {}
311 if calendar_list:
312 calendar_config = {
313 CONF_CALENDAR_NO_EVENT: no_event_mode,
314 CONF_CALENDARS: calendar_list,
315 }
317 # Build notify config: service from options overrides YAML when explicitly set
318 notify_profiles = yaml_config.get(CONF_NOTIFY, {})
320 # Build diurnal cutoffs: options take priority, YAML is fallback
321 yaml_diurnal = yaml_config.get(CONF_DIURNAL, {}) or {}
322 yaml_sunrise = yaml_diurnal.get(CONF_SUNRISE, {}) or {}
323 yaml_sunset = yaml_diurnal.get(CONF_SUNSET, {}) or {}
325 def _parse_time(option_key: str, yaml_fallback: dt.time | None) -> dt.time | None:
326 val = entry.options.get(option_key)
327 if val is not None:
328 return cv.time(val) if isinstance(val, str) else val
329 return yaml_fallback
331 return AlarmArmer(
332 hass,
333 alarm_panel=alarm_panel,
334 sunrise_earliest=_parse_time(CONF_SUNRISE_EARLIEST, yaml_sunrise.get(CONF_EARLIEST)),
335 sunrise_latest=_parse_time(CONF_SUNRISE_LATEST, yaml_sunrise.get(CONF_LATEST)),
336 sunset_earliest=_parse_time(CONF_SUNSET_EARLIEST, yaml_sunset.get(CONF_EARLIEST)),
337 sunset_latest=_parse_time(CONF_SUNSET_LATEST, yaml_sunset.get(CONF_LATEST)),
338 buttons=yaml_config.get(CONF_BUTTONS, {}),
339 occupancy=occupancy,
340 notify_profiles=notify_profiles,
341 notify_enabled=entry.options.get(CONF_NOTIFY_ENABLED, False),
342 notify_action=entry.options.get(CONF_NOTIFY_ACTION),
343 notify_targets=entry.options.get(CONF_NOTIFY_TARGETS, []),
344 rate_limit=yaml_config.get(CONF_RATE_LIMIT, {}),
345 calendar_config=calendar_config,
346 transitions=yaml_config.get(CONF_TRANSITIONS),
347 calendar_occupancy_override_states=entry.options.get(
348 CONF_CALENDAR_OCCUPANCY_OVERRIDE_STATES, DEFAULT_CALENDAR_OCCUPANCY_OVERRIDE_STATES
349 ),
350 )
353def _validated_default_calendar_mappings() -> dict[str, list[re.Pattern[str]]]:
354 """Build default calendar event state mappings with compiled regex patterns.
356 Mirrors the schema validation that CALENDAR_SCHEMA applies (ensure_list + is_regex).
357 """
358 from .const import DEFAULT_CALENDAR_MAPPINGS
360 result: dict[str, list[re.Pattern[str]]] = {}
361 for state, patterns in DEFAULT_CALENDAR_MAPPINGS.items():
362 state_str = str(state)
363 if isinstance(patterns, str):
364 patterns = [patterns]
365 result[state_str] = [re.compile(p) if isinstance(p, str) else p for p in patterns]
366 return result
369def migrate(hass: HomeAssistant) -> None:
370 for entity_id in (
371 "autoarm.configured",
372 "autoarm.last_calendar_event",
373 "autoarm.last_intervention",
374 "autoarm.initialized",
375 "autoarm.last_calculation",
376 ):
377 try:
378 if hass.states.get(entity_id):
379 _LOGGER.info("AUTOARM Migration removing legacy entity_id: %s", entity_id)
380 hass.states.async_remove(entity_id)
381 except Exception as e:
382 _LOGGER.warning("AUTOARM Migration fail for %s:%s", entity_id, e)
385def unlisten(listener: Callable[[], None] | None) -> None:
386 if listener:
387 try:
388 listener()
389 except Exception as e:
390 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e)
393@dataclass
394class Intervention:
395 """Record of a manual intervention, such as a button push, mobile action or alarm panel change"""
397 created_at: dt.datetime
398 source: ChangeSource
399 state: AlarmControlPanelState | None
401 def as_dict(self) -> dict[str, str | None]:
402 return {
403 "created_at": self.created_at.isoformat(),
404 "source": str(self.source),
405 "state": str(self.state) if self.state is not None else None,
406 }
409@dataclass
410class AlarmStateWithAttributes:
411 state: AlarmControlPanelState
412 source: ChangeSource
413 attributes: dict[str, str]
416class AlarmArmer:
417 def __init__(
418 self,
419 hass: HomeAssistant,
420 alarm_panel: str,
421 buttons: dict[str, ConfigType] | None = None,
422 occupancy: ConfigType | None = None,
423 actions: list[str] | None = None,
424 notify_enabled: bool = True,
425 notify_action: str | None = None,
426 notify_targets: list[str] | None = None,
427 notify_profiles: ConfigType | None = None,
428 sunrise_earliest: dt.time | None = None,
429 sunrise_latest: dt.time | None = None,
430 sunset_earliest: dt.time | None = None,
431 sunset_latest: dt.time | None = None,
432 rate_limit: ConfigType | None = None,
433 calendar_config: ConfigType | None = None,
434 transitions: dict[str, dict[str, list[ConfigType]]] | None = None,
435 calendar_occupancy_override_states: list[str] | None = None,
436 ) -> None:
437 occupancy = occupancy or {}
438 rate_limit = rate_limit or {}
440 self.hass: HomeAssistant = hass
441 self.app_health_tracker: AppHealthTracker = AppHealthTracker(hass)
442 if notify_enabled and not notify_profiles and not notify_action:
443 _LOGGER.warning("AUTOARM Notification disabled - no config")
444 notify_enabled = False
445 if notify_enabled:
446 self.notifier: Notifier | None = Notifier(
447 notify_profiles, hass, self.app_health_tracker, notify_action, notify_targets
448 )
449 else:
450 self.notifier = None
451 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone)
452 calendar_config = calendar_config or {}
453 self.calendar_configs: list[ConfigType] = calendar_config.get(CONF_CALENDARS, []) or []
454 self.calendars: list[TrackedCalendar] = []
455 self.calendar_no_event_mode: str | None = calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO)
456 self.calendar_occupancy_override_states: list[str] = (
457 calendar_occupancy_override_states
458 if calendar_occupancy_override_states is not None
459 else DEFAULT_CALENDAR_OCCUPANCY_OVERRIDE_STATES
460 )
461 self.alarm_panel: str = alarm_panel
462 self.sunrise_earliest: dt.time | None = sunrise_earliest
463 self.sunrise_latest: dt.time | None = sunrise_latest
464 self.sunset_earliest: dt.time | None = sunset_earliest
465 self.sunset_latest: dt.time | None = sunset_latest
466 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, [])
467 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get(
468 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME}
469 )
470 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {})
471 self.buttons: ConfigType = buttons or {}
473 self.actions: list[str] = actions or []
474 self.unsubscribes: list[Callable[[], None]] = []
475 self.pre_pending_state: AlarmControlPanelState | None = None
476 self.button_device: dict[str, str] = {}
477 self.arming_in_progress: asyncio.Event = asyncio.Event()
479 self.rate_limiter: Limiter = Limiter(
480 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)),
481 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5),
482 )
484 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass)
485 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {}
486 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {}
488 self.interventions: list[Intervention] = []
489 self.intervention_ttl: int = 60
491 async def initialize(self) -> None:
492 """Async initialization"""
493 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars))
495 self.initialize_alarm_panel()
496 await self.initialize_calendar()
497 await self.initialize_logic()
498 self.initialize_diurnal()
499 self.initialize_occupancy()
500 self.initialize_buttons()
501 self.initialize_integration()
502 self.initialize_housekeeping()
503 self.initialize_home_assistant()
504 await self.reset_armed_state(source=ChangeSource.STARTUP)
506 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state())
508 def initialize_home_assistant(self) -> None:
509 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once(
510 EVENT_HOMEASSISTANT_STOP, self.async_shutdown
511 )
512 self.app_health_tracker.app_initialized()
513 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calculation", "unavailable", attributes={})
515 self.hass.services.async_register(
516 DOMAIN,
517 "reset_state",
518 self.reset_service,
519 supports_response=SupportsResponse.OPTIONAL,
520 )
522 async def reset_service(self, _call: ServiceCall) -> ServiceResponse:
523 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None))
524 return {"change": new_state or "NO_CHANGE"}
526 def initialize_integration(self) -> None:
527 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", "unavailable", attributes={})
529 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
531 def initialize_alarm_panel(self) -> None:
532 """Set up automation for Home Assistant alarm panel
534 See https://www.home-assistant.io/integrations/alarm_control_panel/
536 Succeeds even if control panel has not yet started, listener will pick up events when it does
537 """
538 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change))
539 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel)
541 def initialize_housekeeping(self) -> None:
542 self.unsubscribes.append(
543 async_track_time_change(
544 self.hass,
545 action=self.housekeeping,
546 minute=0,
547 )
548 )
550 def initialize_diurnal(self) -> None:
551 # events API expects a function, however underlying HassJob is fine with coroutines
552 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore
553 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore
554 if self.sunrise_latest:
555 self.unsubscribes.append(
556 async_track_time_change(
557 self.hass,
558 self.on_sunrise_latest,
559 hour=self.sunrise_latest.hour,
560 minute=self.sunrise_latest.minute,
561 second=self.sunrise_latest.second,
562 )
563 )
564 if self.sunset_latest:
565 self.unsubscribes.append(
566 async_track_time_change(
567 self.hass,
568 self.on_sunset_latest,
569 hour=self.sunset_latest.hour,
570 minute=self.sunset_latest.minute,
571 second=self.sunset_latest.second,
572 )
573 )
575 def initialize_occupancy(self) -> None:
576 """Configure occupants, and listen for changes in their state"""
577 if self.occupants:
578 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants))
579 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change))
580 else:
581 _LOGGER.info("AUTOARM Occupancy not configured")
583 def initialize_buttons(self) -> None:
584 """Initialize (optional) physical alarm state control buttons"""
586 def setup_button(state_name: str, button_entity: str, cb: Callable[..., Coroutine[Any, Any, None]]) -> None:
587 self.button_device[state_name] = button_entity
588 if self.button_device[state_name]:
589 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb))
591 _LOGGER.debug(
592 "AUTOARM Configured %s button for %s",
593 state_name,
594 self.button_device[state_name],
595 )
597 for button_use, button_config in self.buttons.items():
598 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME)
599 for entity_id in button_config[CONF_ENTITY_ID]:
600 if button_use == ATTR_RESET:
601 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay))
602 else:
603 setup_button(
604 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay)
605 )
607 async def initialize_calendar(self) -> None:
608 """Configure calendar polling (optional)"""
609 stage: str = "calendar"
610 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calendar_event", "unavailable", attributes={})
611 if not self.calendar_configs:
612 return
613 try:
614 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN)
615 if platforms:
616 platform: entity_platform.EntityPlatform = platforms[0]
617 else:
618 self.app_health_tracker.record_initialization_error(stage)
619 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant")
620 return
621 except Exception as _e:
622 self.app_health_tracker.record_initialization_error(stage)
623 _LOGGER.exception("AUTOARM Unable to access calendar platform")
624 return
625 for calendar_config in self.calendar_configs:
626 tracked_calendar = TrackedCalendar(
627 self.hass, calendar_config, self.calendar_no_event_mode, self, self.app_health_tracker
628 )
629 await tracked_calendar.initialize(platform)
630 self.calendars.append(tracked_calendar)
632 async def initialize_logic(self) -> None:
633 stage: str = "logic"
634 for state_str, raw_condition in DEFAULT_TRANSITIONS.items():
635 if state_str not in self.transition_config:
636 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str)
637 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)}
639 for state_str, transition_config in self.transition_config.items():
640 error: str = ""
641 condition_config = transition_config.get(CONF_CONDITIONS)
642 if condition_config is None:
643 error = "Empty conditions"
644 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition")
645 else:
646 try:
647 state = AlarmControlPanelState(state_str)
648 cond: ConditionCheckerType | None = await self.hass_api.build_condition(
649 condition_config, strict=True, validate=True, name=state_str
650 )
652 if cond:
653 # re-run without strict wrapper
654 cond = await self.hass_api.build_condition(condition_config, name=state_str)
655 if cond:
656 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}")
657 self.transitions[state] = cond
658 else:
659 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}")
660 error = "Condition validation failed"
661 except ValueError as ve:
662 self.app_health_tracker.record_initialization_error(stage)
663 error = f"Invalid state {ve}"
664 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}")
665 except vol.Invalid as vi:
666 self.app_health_tracker.record_initialization_error(stage)
667 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}")
668 error = f"Schema error {vi}"
669 except ConditionError as ce:
670 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}")
671 if hasattr(ce, "message"):
672 error = ce.message # type: ignore[attr-defined,unused-ignore]
673 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore[attr-defined,unused-ignore]
674 error = ce.error.message # type: ignore[attr-defined,unused-ignore]
675 else:
676 error = str(ce)
677 except Exception as e:
678 self.app_health_tracker.record_initialization_error(stage)
679 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config)
680 error = f"Unknown exception {e}"
681 if error:
682 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}")
683 self.hass_api.raise_issue(
684 f"transition_condition_{state_str}",
685 is_fixable=False,
686 issue_key="transition_condition",
687 issue_map={"state": state_str, "error": error},
688 severity=ir.IssueSeverity.ERROR,
689 )
691 async def async_shutdown(self, _event: Event) -> None:
692 _LOGGER.info("AUTOARM shut down event received")
693 self.stop_listener = None
694 self.shutdown()
696 def shutdown(self) -> None:
697 _LOGGER.info("AUTOARM shutting down")
698 for calendar in self.calendars:
699 calendar.shutdown()
700 while self.unsubscribes:
701 unlisten(self.unsubscribes.pop())
702 unlisten(self.stop_listener)
703 self.stop_listener = None
704 _LOGGER.info("AUTOARM shut down")
706 def active_calendar_event(self) -> TrackedCalendarEvent | None:
707 events: list[TrackedCalendarEvent] = []
708 for cal in self.calendars:
709 events.extend(cal.active_events())
710 if events:
711 # TODO: consider sorting events to LIFO
712 return events[0]
713 return None
715 def has_active_calendar_event(self) -> bool:
716 return any(cal.has_active_event() for cal in self.calendars)
718 def is_occupied(self) -> bool | None:
719 """Ternary - true at least one person entity has state home, false none of them, null if no occupants defined"""
720 if self.occupants:
721 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants)
722 return None
724 def at_home(self) -> list[str] | None:
725 if self.occupants:
726 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME]
727 return None
729 def not_home(self) -> list[str] | None:
730 if self.occupants:
731 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME]
732 return None
734 def is_unoccupied(self) -> bool | None:
735 """Ternary - false at least one person entity has state home, true none of them, null if no occupants defined"""
736 if self.occupants:
737 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants)
738 return None
740 def is_night(self) -> bool:
741 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON
743 def armed_state(self) -> AlarmControlPanelState:
744 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel))
745 alarm_state: AlarmControlPanelState | None = alarm_state_as_enum(raw_state)
746 if alarm_state is None:
747 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING")
748 return AlarmControlPanelState.PENDING
749 return alarm_state
751 def current_state(self) -> AlarmStateWithAttributes:
752 state: State | None = self.hass.states.get(self.alarm_panel)
753 source: ChangeSource | None = None
754 alarm_state: AlarmControlPanelState | None = (
755 alarm_state_as_enum(state.state) if state and state.state is not None else None
756 )
758 if state and state.attributes and state.attributes.get(ATTR_CHANGED_BY):
759 source = change_source_as_enum(state.attributes[ATTR_CHANGED_BY].split("_", 1)[-1])
760 return AlarmStateWithAttributes(
761 state=alarm_state or AlarmControlPanelState.PENDING,
762 source=source or ChangeSource.UNKNOWN,
763 attributes=state.attributes if state else {},
764 )
766 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, str]]:
767 entity_id = old = new = None
768 new_attributes: dict[str, str] = {}
769 if event and event.data:
770 entity_id = event.data.get("entity_id")
771 old_obj = event.data.get("old_state")
772 if old_obj:
773 old = old_obj.state
774 new_obj = event.data.get("new_state")
775 if new_obj:
776 new = new_obj.state
777 new_attributes = new_obj.attributes
778 return entity_id, old, new, new_attributes
780 async def pending_state(self, source: ChangeSource | None, change_context: dict[str, Any] | None = None) -> None:
781 self.pre_pending_state = self.armed_state()
782 change_context = change_context or {}
783 change_context.update({
784 "source": str(source),
785 "original_caller": change_context.get("caller"),
786 "caller": "pending_state",
787 "pre_pending_state": self.pre_pending_state,
788 })
789 await self.arm(
790 AlarmControlPanelState.PENDING,
791 source=source,
792 change_context=change_context,
793 )
795 @callback
796 async def delayed_reset_armed_state(
797 self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs: Any
798 ) -> None:
799 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
800 if self.is_intervention_since_request(requested_at):
801 return
802 await self.reset_armed_state(**kwargs)
804 async def reset_armed_state(
805 self, intervention: Intervention | None = None, source: ChangeSource | None = None
806 ) -> str | None:
807 """Logic to automatically work out appropriate current armed state"""
808 state: AlarmControlPanelState | None = None
809 existing_state: AlarmControlPanelState | None = None
810 must_change_state: bool = False
811 last_state_intervention: Intervention | None = None
812 active_calendar_event: TrackedCalendarEvent | None = None
814 if source is None and intervention is not None:
815 source = intervention.source
816 _LOGGER.debug(
817 "AUTOARM reset_armed_state(intervention=%s,source=%s)",
818 intervention,
819 source,
820 )
821 reset_decision: str = "no_change"
822 try:
823 existing_state = self.armed_state()
824 state = existing_state
825 if self.calendars:
826 active_calendar_event = self.active_calendar_event()
827 if active_calendar_event:
828 cal_state: AlarmControlPanelState = active_calendar_event.arming_state
829 if (
830 source == ChangeSource.OCCUPANCY
831 and cal_state is not None
832 and active_calendar_event.is_recurring()
833 and str(cal_state) in self.calendar_occupancy_override_states
834 ):
835 _LOGGER.debug("AUTOARM Allowing occupancy reset for recurring overridable calendar event %s", cal_state)
836 else:
837 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active")
838 reset_decision = "ignore_for_active_calendar_event"
839 return existing_state
840 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL:
841 _LOGGER.debug(
842 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual"
843 )
844 reset_decision = "ignore_for_calendar_manual_default"
845 return existing_state
846 if self.calendar_no_event_mode in AlarmControlPanelState:
847 # TODO: may be dupe logic with on_cal event
848 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode)
849 reset_decision = "reset_on_calendar_event_end"
850 return await self.arm(
851 alarm_state_as_enum(self.calendar_no_event_mode),
852 source=ChangeSource.CALENDAR,
853 change_context={
854 "reset_decision": reset_decision,
855 "calendar_no_event_mode": self.calendar_no_event_mode,
856 "caller": "reset_armed_state",
857 },
858 )
859 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO:
860 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto")
861 else:
862 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode)
864 # TODO: expose as config ( for manual disarm override ) and condition logic
865 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING
866 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state:
867 _LOGGER.debug("AUTOARM Ignoring previous interventions")
868 else:
869 last_state_intervention = self.last_state_intervention()
870 if last_state_intervention:
871 _LOGGER.debug(
872 "AUTOARM Ignoring automated reset for %s set by %s at %s",
873 last_state_intervention.state,
874 last_state_intervention.source,
875 last_state_intervention.created_at,
876 )
877 reset_decision = "ignore_after_manual_intervention"
878 return existing_state
879 state = self.determine_state()
880 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state:
881 reset_decision = "change_state"
882 state = await self.arm(
883 state, source=source, change_context={"reset_decision": reset_decision, "caller": "reset_armed_state"}
884 )
886 finally:
887 self.hass.states.async_set(
888 f"sensor.{DOMAIN}_last_calculation",
889 str(state is not None and state != existing_state),
890 attributes={
891 "new_state": str(state),
892 "old_state": str(existing_state),
893 "source": str(source),
894 "active_calendar_event": deobjectify(active_calendar_event.event) if active_calendar_event else None,
895 "occupied": self.is_occupied(),
896 "night": self.is_night(),
897 "must_change_state": str(must_change_state),
898 "last_state_intervention": deobjectify(last_state_intervention),
899 "intervention": intervention.as_dict() if intervention else None,
900 "time": dt_util.now().isoformat(),
901 "reset_decision": reset_decision,
902 },
903 )
905 return state
907 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool:
908 if requested_at is not None and self.has_intervention_since(requested_at):
909 _LOGGER.debug(
910 "AUTOARM Cancelling delayed operation since subsequent manual action",
911 )
912 return True
913 return False
915 def determine_state(self) -> AlarmControlPanelState | None:
916 """Compute a new state using occupancy, sun and transition conditions"""
917 evaluated_state: AlarmControlPanelState | None = None
918 active_calendar_event: TrackedCalendarEvent | None = self.active_calendar_event()
919 condition_vars: ConditionVariables = ConditionVariables(
920 occupied=self.is_occupied(),
921 unoccupied=self.is_unoccupied(),
922 night=self.is_night(),
923 state=self.armed_state(),
924 calendar_event=active_calendar_event.event if active_calendar_event else None,
925 occupied_defaults=self.occupied_defaults,
926 at_home=self.at_home(),
927 not_home=self.not_home(),
928 )
929 for state, checker in self.transitions.items():
930 if self.hass_api.evaluate_condition(checker, condition_vars):
931 _LOGGER.debug("AUTOARM Computed state as %s from condition", state)
932 evaluated_state = state
933 break
934 if evaluated_state is None:
935 return None
936 return AlarmControlPanelState(evaluated_state)
938 @callback
939 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs: Any) -> None:
940 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
941 if self.is_intervention_since_request(requested_at):
942 return
943 await self.arm(**kwargs)
945 async def arm(
946 self,
947 arming_state: AlarmControlPanelState | None,
948 source: ChangeSource | None = None,
949 change_context: dict[str, Any] | None = None,
950 ) -> AlarmControlPanelState | None:
951 """Change alarm panel state
953 Args:
954 ----
955 arming_state (str, optional): _description_. Defaults to None.
956 source (str,optional): Source of the change, for example 'calendar' or 'button'
957 change_context (dict,optional): Detailed context for the reason arm triggered
959 Returns:
960 -------
961 str: New arming state
963 """
964 _LOGGER.debug("AUTOARM arm(arming_state=%s,source=%s,change_context=%s", arming_state, source, change_context)
965 if arming_state is None:
966 return None
967 if self.armed_state() == arming_state:
968 return None
969 if self.arming_in_progress.is_set():
970 _LOGGER.warning("AUTOARM arming already in progress, skipping for %s", source)
971 return None
972 if self.rate_limiter.triggered():
973 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source)
974 return None
975 try:
976 self.arming_in_progress.set()
977 existing_state: AlarmControlPanelState | None = self.armed_state()
978 if arming_state != existing_state:
979 attrs: dict[str, str] = {}
980 panel_state: State | None = self.hass.states.get(self.alarm_panel)
981 if panel_state:
982 attrs.update(panel_state.attributes)
983 attrs[ATTR_CHANGED_BY] = f"{DOMAIN}.{source}"
984 self.hass.states.async_set(entity_id=self.alarm_panel, new_state=str(arming_state), attributes=attrs)
986 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source)
987 if self.notifier and source and arming_state:
988 await self.notifier.notify(source=source, from_state=existing_state, to_state=arming_state)
990 self.hass_api.fire_event(
991 event_name="change",
992 event_data={
993 "panel": self.alarm_panel,
994 "panel_state": panel_state,
995 "original_state": existing_state,
996 "new_state": arming_state,
997 "change_source": source,
998 "occupied": self.is_occupied(),
999 "night": self.is_night(),
1000 "context": change_context or {},
1001 },
1002 )
1003 return arming_state
1004 _LOGGER.debug("AUTOARM Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state)
1005 return existing_state
1006 except Exception as e:
1007 _LOGGER.error("AUTOARM Failed to arm: %s", e)
1008 self.app_health_tracker.record_runtime_error()
1009 finally:
1010 self.arming_in_progress.clear()
1011 return None
1013 def schedule_state(
1014 self,
1015 trigger_time: dt.datetime,
1016 state: AlarmControlPanelState | None,
1017 intervention: Intervention | None,
1018 source: ChangeSource | None = None,
1019 ) -> None:
1020 source = source or intervention.source if intervention else None
1022 job: Callable[[dt.datetime], Coroutine[Any, Any, None] | None]
1023 if state is None:
1024 _LOGGER.debug("AUTOARM Delayed reset, triggered at: %s, source%s", trigger_time, source)
1025 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now())
1026 else:
1027 _LOGGER.debug("AUTOARM Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source)
1029 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now())
1031 self.unsubscribes.append(
1032 async_track_point_in_time(
1033 self.hass,
1034 job,
1035 trigger_time,
1036 )
1037 )
1039 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention:
1040 intervention = Intervention(dt_util.now(), source, state)
1041 self.interventions.append(intervention)
1042 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", source, attributes=intervention.as_dict())
1044 return intervention
1046 def has_intervention_since(self, cutoff: dt.datetime) -> bool:
1047 """Has there been a manual intervention since the cutoff time"""
1048 if not self.interventions:
1049 return False
1050 return any(intervention.created_at > cutoff for intervention in self.interventions)
1052 def last_state_intervention(self) -> Intervention | None:
1053 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None]
1054 if candidates:
1055 return candidates[-1]
1056 return None
1058 @callback
1059 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002
1060 _LOGGER.debug("AUTOARM Sunrise")
1061 now = dt_util.now()
1062 if not self.sunrise_earliest or now.time() >= self.sunrise_earliest:
1063 await self.reset_armed_state(source=ChangeSource.SUNRISE)
1064 else:
1065 _LOGGER.debug("AUTOARM Rescheduling delayed sunrise action to %s", self.sunrise_earliest)
1066 self.schedule_state(
1067 dt.datetime.combine(now.date(), self.sunrise_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE),
1068 intervention=None,
1069 state=None,
1070 source=ChangeSource.SUNRISE,
1071 )
1073 @callback
1074 async def on_sunrise_latest(self, *args: Any) -> None: # noqa: ARG002
1075 _LOGGER.debug("AUTOARM Sunrise latest cutoff reached")
1076 await self.reset_armed_state(source=ChangeSource.SUNRISE)
1078 @callback
1079 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002
1080 _LOGGER.debug("AUTOARM Sunset")
1081 now = dt_util.now()
1082 if not self.sunset_earliest or now.time() >= self.sunset_earliest:
1083 await self.reset_armed_state(source=ChangeSource.SUNSET)
1084 else:
1085 _LOGGER.debug("AUTOARM Rescheduling delayed sunset action to %s", self.sunset_earliest)
1086 self.schedule_state(
1087 dt.datetime.combine(now.date(), self.sunset_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE),
1088 intervention=None,
1089 state=None,
1090 source=ChangeSource.SUNSET,
1091 )
1093 @callback
1094 async def on_sunset_latest(self, *args: Any) -> None: # noqa: ARG002
1095 _LOGGER.debug("AUTOARM Sunset latest cutoff reached")
1096 await self.reset_armed_state(source=ChangeSource.SUNSET)
1098 @callback
1099 async def on_mobile_action(self, event: Event) -> None:
1100 _LOGGER.debug("AUTOARM Mobile Action: %s", event)
1101 source: ChangeSource = ChangeSource.MOBILE
1103 match event.data.get("action"):
1104 case "ALARM_PANEL_DISARM":
1105 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED)
1106 await self.arm(
1107 AlarmControlPanelState.DISARMED,
1108 source=source,
1109 change_context={"caller": "on_mobile_action", "event_data": event.data, "event_type": event.event_type},
1110 )
1111 case "ALARM_PANEL_RESET":
1112 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
1113 case "ALARM_PANEL_AWAY":
1114 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY)
1115 await self.arm(
1116 AlarmControlPanelState.ARMED_AWAY,
1117 source=source,
1118 change_context={"caller": "on_mobile_action", "event_data": event.data, "event_type": event.event_type},
1119 )
1120 case _:
1121 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data)
1123 @callback
1124 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None:
1125 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event)
1126 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state)
1127 if delay:
1128 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON)
1129 if self.notifier:
1130 await self.notifier.notify(
1131 ChangeSource.BUTTON,
1132 from_state=self.armed_state(),
1133 to_state=state,
1134 message=f"Alarm will be set to {state} in {delay}",
1135 title=f"Arm set to {state} process starting",
1136 )
1137 else:
1138 await self.arm(
1139 state,
1140 source=ChangeSource.BUTTON,
1141 change_context={
1142 "caller": "on_alarm_state_button",
1143 "event_data": event.data,
1144 "delay": str(delay),
1145 "event_type": event.event_type,
1146 },
1147 )
1149 @callback
1150 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None:
1151 _LOGGER.debug("AUTOARM Reset Button: %s", event)
1152 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None)
1153 if delay:
1154 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON)
1155 if self.notifier:
1156 await self.notifier.notify(
1157 ChangeSource.BUTTON,
1158 message=f"Alarm will be reset in {delay}",
1159 title="Alarm reset wait initiated",
1160 )
1161 else:
1162 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
1164 @callback
1165 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None:
1166 """Listen for person state events
1168 Args:
1169 ----
1170 event (Event[EventStateChangedData]): state change event
1172 """
1173 entity_id, old, new, new_attributes = self._extract_event(event)
1174 if old == new:
1175 _LOGGER.debug(
1176 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s",
1177 entity_id,
1178 old,
1179 new,
1180 event,
1181 new_attributes,
1182 )
1183 return
1184 _LOGGER.debug(
1185 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes
1186 )
1187 if new in self.occupied_delay:
1188 self.schedule_state(
1189 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY
1190 )
1191 else:
1192 await self.reset_armed_state(source=ChangeSource.OCCUPANCY)
1194 @callback
1195 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None:
1196 """Alarm Control Panel has been changed outside of AutoArm"""
1197 entity_id, old, new, new_attributes = self._extract_event(event)
1198 if new_attributes:
1199 changed_by = new_attributes.get(ATTR_CHANGED_BY)
1200 if changed_by and changed_by.startswith(f"{DOMAIN}."):
1201 _LOGGER.debug(
1202 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s",
1203 entity_id,
1204 event.event_type,
1205 old,
1206 new,
1207 )
1208 return
1209 new_state: AlarmControlPanelState | None = alarm_state_as_enum(new)
1210 old_state: AlarmControlPanelState | None = alarm_state_as_enum(old)
1212 _LOGGER.info(
1213 "AUTOARM Panel Change: %s,%s: %s-->%s",
1214 entity_id,
1215 event.event_type,
1216 old,
1217 new,
1218 )
1219 self.record_intervention(ChangeSource.ALARM_PANEL, new_state)
1220 if new in ZOMBIE_STATES:
1221 _LOGGER.warning("AUTOARM Dezombifying %s ...", new)
1222 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION)
1223 elif new != old:
1224 if self.notifier:
1225 await self.notifier.notify(ChangeSource.ALARM_PANEL, old_state, new_state)
1226 else:
1227 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new)
1229 @callback
1230 async def housekeeping(self, triggered_at: dt.datetime) -> None:
1231 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at)
1232 now = dt_util.now()
1233 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)]
1234 for cal in self.calendars:
1235 await cal.prune_events()
1236 _LOGGER.debug("AUTOARM Housekeeping finished")