Coverage for custom_components/autoarm/autoarming.py: 88%
512 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-08 20:27 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-08 20:27 +0000
1import asyncio
2import contextlib
3import datetime as dt
4import json
5import logging
6from collections.abc import Callable
7from dataclasses import dataclass
8from functools import partial
9from typing import TYPE_CHECKING, Any
11import homeassistant.util.dt as dt_util
12import voluptuous as vol
13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState
14from homeassistant.components.calendar import CalendarEvent
15from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN
16from homeassistant.components.sun.const import STATE_BELOW_HORIZON
17from homeassistant.const import CONF_CONDITIONS, CONF_ENTITY_ID, EVENT_HOMEASSISTANT_STOP, SERVICE_RELOAD, STATE_HOME
18from homeassistant.core import (
19 Event,
20 EventStateChangedData,
21 HomeAssistant,
22 ServiceCall,
23 ServiceResponse,
24 SupportsResponse,
25 callback,
26)
27from homeassistant.exceptions import ConditionError, HomeAssistantError
28from homeassistant.helpers import condition as condition
29from homeassistant.helpers import config_validation as cv
30from homeassistant.helpers import entity_platform
31from homeassistant.helpers import issue_registry as ir
32from homeassistant.helpers.event import (
33 async_track_point_in_time,
34 async_track_state_change_event,
35 async_track_sunrise,
36 async_track_sunset,
37 async_track_time_change,
38)
39from homeassistant.helpers.json import ExtendedJSONEncoder
40from homeassistant.helpers.reload import (
41 async_integration_yaml_config,
42)
43from homeassistant.helpers.service import async_register_admin_service
44from homeassistant.helpers.typing import ConfigType
45from homeassistant.util.hass_dict import HassKey
47from custom_components.autoarm.hass_api import HomeAssistantAPI
49from .calendar import TrackedCalendar, TrackedCalendarEvent
50from .const import (
51 ATTR_RESET,
52 CONF_ALARM_PANEL,
53 CONF_BUTTONS,
54 CONF_CALENDAR_CONTROL,
55 CONF_CALENDAR_NO_EVENT,
56 CONF_CALENDARS,
57 CONF_DAY,
58 CONF_DELAY_TIME,
59 CONF_DIURNAL,
60 CONF_EARLIEST,
61 CONF_NOTIFY,
62 CONF_OCCUPANCY,
63 CONF_OCCUPANCY_DEFAULT,
64 CONF_RATE_LIMIT,
65 CONF_RATE_LIMIT_CALLS,
66 CONF_RATE_LIMIT_PERIOD,
67 CONF_SUNRISE,
68 CONF_TRANSITIONS,
69 CONFIG_SCHEMA,
70 DEFAULT_TRANSITIONS,
71 DOMAIN,
72 NO_CAL_EVENT_MODE_AUTO,
73 NO_CAL_EVENT_MODE_MANUAL,
74 ChangeSource,
75 ConditionVariables,
76)
77from .helpers import Limiter, alarm_state_as_enum, deobjectify, safe_state
79if TYPE_CHECKING:
80 from homeassistant.helpers.condition import ConditionCheckerType
82_LOGGER = logging.getLogger(__name__)
84OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS)
85EPHEMERAL_STATES = (
86 AlarmControlPanelState.PENDING,
87 AlarmControlPanelState.ARMING,
88 AlarmControlPanelState.DISARMING,
89 AlarmControlPanelState.TRIGGERED,
90)
91ZOMBIE_STATES = ("unknown", "unavailable")
92NS_MOBILE_ACTIONS = "mobile_actions"
93PLATFORMS = ["autoarm"]
95HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN)
98@dataclass
99class AutoArmData:
100 armer: "AlarmArmer"
101 other_data: dict[str, Any]
104# async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
105async def async_setup(
106 hass: HomeAssistant,
107 config: ConfigType,
108) -> bool:
109 _ = CONFIG_SCHEMA
110 if DOMAIN not in config:
111 _LOGGER.warning("AUTOARM No config found")
112 return True
113 config = config.get(DOMAIN, {})
114 expose_config_entity(hass, config)
115 hass.data[HASS_DATA_KEY] = AutoArmData(_async_process_config(hass, config), {})
116 await hass.data[HASS_DATA_KEY].armer.initialize()
118 async def reload_service_handler(service_call: ServiceCall) -> None:
119 """Reload yaml entities."""
120 config = None
121 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data)
122 with contextlib.suppress(HomeAssistantError):
123 config = await async_integration_yaml_config(hass, DOMAIN)
124 if config is None or DOMAIN not in config:
125 _LOGGER.warning("AUTOARM reload rejected for lack of config: %s", config)
126 return
127 hass.data[HASS_DATA_KEY].armer.shutdown()
128 expose_config_entity(hass, config[DOMAIN])
129 hass.data[HASS_DATA_KEY].armer = _async_process_config(hass, config[DOMAIN])
130 await hass.data[HASS_DATA_KEY].armer.initialize()
132 async_register_admin_service(
133 hass,
134 DOMAIN,
135 SERVICE_RELOAD,
136 reload_service_handler,
137 )
138 return True
141def expose_config_entity(hass: HomeAssistant, config: ConfigType) -> None:
142 data: dict[str, Any] = {
143 CONF_ALARM_PANEL: config.get(CONF_ALARM_PANEL, {}).get(CONF_ENTITY_ID),
144 CONF_DIURNAL: config.get(CONF_DIURNAL),
145 CONF_CALENDAR_CONTROL: config.get(CONF_CALENDAR_CONTROL),
146 CONF_BUTTONS: config.get(CONF_BUTTONS, {}),
147 CONF_OCCUPANCY: config.get(CONF_OCCUPANCY, {}),
148 CONF_NOTIFY: config.get(CONF_NOTIFY, {}),
149 CONF_RATE_LIMIT: config.get(CONF_RATE_LIMIT, {}),
150 }
151 try:
152 jsonized: str = json.dumps(obj=data, cls=ExtendedJSONEncoder)
153 hass.states.async_set(f"{DOMAIN}.configured", "valid", json.loads(jsonized))
154 except Exception as e:
155 _LOGGER.error("AUTOARM Failed to expose config data as entity: %s, %s", data, e)
156 hass.states.async_set(entity_id=f"{DOMAIN}.configured", new_state="partially-valid", attributes={"error": str(e)})
159def _async_process_config(hass: HomeAssistant, config: ConfigType) -> "AlarmArmer":
160 calendar_config: ConfigType = config.get(CONF_CALENDAR_CONTROL, {})
161 return AlarmArmer(
162 hass,
163 alarm_panel=config[CONF_ALARM_PANEL].get(CONF_ENTITY_ID),
164 diurnal=config.get(CONF_DIURNAL, {}),
165 buttons=config.get(CONF_BUTTONS, {}),
166 occupancy=config[CONF_OCCUPANCY],
167 notify=config[CONF_NOTIFY],
168 rate_limit=config.get(CONF_RATE_LIMIT, {}),
169 calendars=calendar_config.get(CONF_CALENDARS, []),
170 transitions=config.get(CONF_TRANSITIONS),
171 calendar_no_event_mode=calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO),
172 )
175def unlisten(listener: Callable[[], None] | None) -> None:
176 if listener:
177 try:
178 listener()
179 except Exception as e:
180 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e)
183@dataclass
184class Intervention:
185 """Record of a manual intervention, such as a button push, mobile action or alarm panel change"""
187 created_at: dt.datetime
188 source: ChangeSource
189 state: AlarmControlPanelState | None
191 def as_dict(self) -> dict[str, str | None]:
192 return {
193 "created_at": self.created_at.isoformat(),
194 "source": str(self.source),
195 "state": str(self.state) if self.state is not None else None,
196 }
199class AlarmArmer:
200 def __init__(
201 self,
202 hass: HomeAssistant,
203 alarm_panel: str,
204 buttons: dict[str, ConfigType] | None = None,
205 occupancy: ConfigType | None = None,
206 actions: list | None = None,
207 notify: ConfigType | None = None,
208 diurnal: ConfigType | None = None,
209 rate_limit: ConfigType | None = None,
210 calendar_no_event_mode: str | None = None,
211 calendars: list[ConfigType] | None = None,
212 transitions: dict[str, dict[str, list[ConfigType]]] | None = None,
213 ) -> None:
214 occupancy = occupancy or {}
215 rate_limit = rate_limit or {}
216 diurnal = diurnal or {}
218 self.hass: HomeAssistant = hass
219 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone)
220 self.calendar_configs: list[ConfigType] = calendars or []
221 self.calendar_no_event_mode: str = calendar_no_event_mode or NO_CAL_EVENT_MODE_AUTO
222 self.calendars: list[TrackedCalendar] = []
223 self.alarm_panel: str = alarm_panel
224 self.sunrise_cutoff: dt.time | None = diurnal.get(CONF_SUNRISE, {}).get(CONF_EARLIEST)
225 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, [])
226 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get(
227 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME}
228 )
229 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {})
230 self.buttons: ConfigType = buttons or {}
232 self.actions: list[str] = actions or []
233 self.notify_profiles: dict[str, dict] = notify or {}
234 self.unsubscribes: list = []
235 self.pre_pending_state: AlarmControlPanelState | None = None
236 self.button_device: dict[str, str] = {}
237 self.arming_in_progress: asyncio.Event = asyncio.Event()
239 self.rate_limiter: Limiter = Limiter(
240 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)),
241 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5),
242 )
244 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass)
245 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {}
246 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {}
248 self.initialization_errors: dict[str, int] = {}
249 self.interventions: list[Intervention] = []
250 self.intervention_ttl: int = 60
252 async def initialize(self) -> None:
253 """Async initialization"""
254 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars))
256 self.initialize_alarm_panel()
257 await self.initialize_calendar()
258 await self.initialize_logic()
259 self.initialize_diurnal()
260 self.initialize_occupancy()
261 self.initialize_buttons()
262 self.initialize_integration()
263 self.initialize_housekeeping()
264 self.initialize_home_assistant()
265 await self.reset_armed_state(source=ChangeSource.STARTUP)
267 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state())
269 def record_error(self, stage: str) -> None:
270 self.initialization_errors.setdefault(stage, 0)
271 self.initialization_errors[stage] += 1
273 def initialize_home_assistant(self) -> None:
274 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once(
275 EVENT_HOMEASSISTANT_STOP, self.async_shutdown
276 )
277 self.hass.states.async_set(
278 f"{DOMAIN}.initialized",
279 "valid" if not self.initialization_errors else "invalid",
280 attributes=self.initialization_errors,
281 )
282 self.hass.states.async_set(f"{DOMAIN}.last_calculation", "unavailable", attributes={})
284 self.hass.services.async_register(
285 DOMAIN,
286 "reset_state",
287 self.reset_service,
288 supports_response=SupportsResponse.OPTIONAL,
289 )
291 async def reset_service(self, _call: ServiceCall) -> ServiceResponse:
292 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None))
293 return {"change": new_state or "NO_CHANGE"}
295 def initialize_integration(self) -> None:
296 self.hass.states.async_set(f"{DOMAIN}.last_intervention", "unavailable", attributes={})
298 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
300 def initialize_alarm_panel(self) -> None:
301 """Set up automation for Home Assistant alarm panel
303 See https://www.home-assistant.io/integrations/alarm_control_panel/
305 Succeeds even if control panel has not yet started, listener will pick up events when it does
306 """
307 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change))
308 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel)
310 def initialize_housekeeping(self) -> None:
311 self.unsubscribes.append(
312 async_track_time_change(
313 self.hass,
314 action=self.housekeeping,
315 minute=0,
316 )
317 )
319 def initialize_diurnal(self) -> None:
320 # events API expects a function, however underlying HassJob is fine with coroutines
321 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore
322 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore
324 def initialize_occupancy(self) -> None:
325 """Configure occupants, and listen for changes in their state"""
326 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants))
327 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change))
329 def initialize_buttons(self) -> None:
330 """Initialize (optional) physical alarm state control buttons"""
332 def setup_button(state_name: str, button_entity: str, cb: Callable) -> None:
333 self.button_device[state_name] = button_entity
334 if self.button_device[state_name]:
335 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb))
337 _LOGGER.debug(
338 "AUTOARM Configured %s button for %s",
339 state_name,
340 self.button_device[state_name],
341 )
343 for button_use, button_config in self.buttons.items():
344 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME)
345 for entity_id in button_config[CONF_ENTITY_ID]:
346 if button_use == ATTR_RESET:
347 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay))
348 else:
349 setup_button(
350 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay)
351 )
353 async def initialize_calendar(self) -> None:
354 """Configure calendar polling (optional)"""
355 stage: str = "calendar"
356 self.hass.states.async_set(f"{DOMAIN}.last_calendar_event", "unavailable", attributes={})
357 if not self.calendar_configs:
358 return
359 try:
360 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN)
361 if platforms:
362 platform: entity_platform.EntityPlatform = platforms[0]
363 else:
364 self.record_error(stage)
365 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant")
366 return
367 except Exception as _e:
368 self.record_error(stage)
369 _LOGGER.exception("AUTOARM Unable to access calendar platform")
370 return
371 for calendar_config in self.calendar_configs:
372 tracked_calendar = TrackedCalendar(calendar_config, self)
373 await tracked_calendar.initialize(platform)
374 self.calendars.append(tracked_calendar)
376 async def initialize_logic(self) -> None:
377 stage: str = "logic"
378 for state_str, raw_condition in DEFAULT_TRANSITIONS.items():
379 if state_str not in self.transition_config:
380 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str)
381 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)}
383 for state_str, transition_config in self.transition_config.items():
384 error: str = ""
385 condition_config = transition_config.get(CONF_CONDITIONS)
386 if condition_config is None:
387 error = "Empty conditions"
388 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition")
389 else:
390 try:
391 state = AlarmControlPanelState(state_str)
392 cond: ConditionCheckerType | None = await self.hass_api.build_condition(
393 condition_config, strict=True, validate=True, name=state_str
394 )
396 if cond:
397 # re-run without strict wrapper
398 cond = await self.hass_api.build_condition(condition_config, name=state_str)
399 if cond:
400 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}")
401 self.transitions[state] = cond
402 else:
403 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}")
404 error = "Condition validation failed"
405 except ValueError as ve:
406 self.record_error(stage)
407 error = f"Invalid state {ve}"
408 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}")
409 except vol.Invalid as vi:
410 self.record_error(stage)
411 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}")
412 error = f"Schema error {vi}"
413 except ConditionError as ce:
414 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}")
415 if hasattr(ce, "message"):
416 error = ce.message # type: ignore
417 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore
418 error = ce.error.message # type: ignore
419 else:
420 error = str(ce)
421 except Exception as e:
422 self.record_error(stage)
423 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config)
424 error = f"Unknown exception {e}"
425 if error:
426 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}")
427 self.hass_api.raise_issue(
428 f"transition_condition_{state_str}",
429 is_fixable=False,
430 issue_key="transition_condition",
431 issue_map={"state": state_str, "error": error},
432 severity=ir.IssueSeverity.ERROR,
433 )
435 async def async_shutdown(self, _event: Event) -> None:
436 _LOGGER.info("AUTOARM shut down event received")
437 self.stop_listener = None
438 self.shutdown()
440 def shutdown(self) -> None:
441 _LOGGER.info("AUTOARM shutting down")
442 for calendar in self.calendars:
443 calendar.shutdown()
444 while self.unsubscribes:
445 unlisten(self.unsubscribes.pop())
446 unlisten(self.stop_listener)
447 self.stop_listener = None
448 _LOGGER.info("AUTOARM shut down")
450 def active_calendar_event(self) -> CalendarEvent | None:
451 events: list[CalendarEvent] = []
452 for cal in self.calendars:
453 events.extend(cal.active_events())
454 if events:
455 # TODO: consider sorting events to LIFO
456 return events[0]
457 return None
459 def is_occupied(self) -> bool:
460 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants)
462 def at_home(self) -> list[str]:
463 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME]
465 def not_home(self) -> list[str]:
466 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME]
468 def is_unoccupied(self) -> bool:
469 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants)
471 def is_night(self) -> bool:
472 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON
474 def armed_state(self) -> AlarmControlPanelState:
475 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel))
476 alarm_state = alarm_state_as_enum(raw_state)
477 if alarm_state is None:
478 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING")
479 return AlarmControlPanelState.PENDING
480 return alarm_state
482 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, Any]]:
483 entity_id = old = new = None
484 new_attributes: dict[str, Any] = {}
485 if event and event.data:
486 entity_id = event.data.get("entity_id")
487 old_obj = event.data.get("old_state")
488 if old_obj:
489 old = old_obj.state
490 new_obj = event.data.get("new_state")
491 if new_obj:
492 new = new_obj.state
493 new_attributes = new_obj.attributes
494 return entity_id, old, new, new_attributes
496 async def pending_state(self, source: ChangeSource | None) -> None:
497 self.pre_pending_state = self.armed_state()
498 await self.arm(AlarmControlPanelState.PENDING, source=source)
500 @callback
501 async def delayed_reset_armed_state(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None:
502 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
503 if self.is_intervention_since_request(requested_at):
504 return
505 await self.reset_armed_state(**kwargs)
507 async def reset_armed_state(
508 self, intervention: Intervention | None = None, source: ChangeSource | None = None
509 ) -> str | None:
510 """Logic to automatically work out appropriate current armed state"""
511 state: AlarmControlPanelState | None = None
512 existing_state: AlarmControlPanelState | None = None
513 must_change_state: bool = False
514 last_state_intervention: Intervention | None = None
515 active_calendar_event: CalendarEvent | None = None
517 if source is None and intervention is not None:
518 source = intervention.source
519 _LOGGER.debug(
520 "AUTOARM reset_armed_state(intervention=%s,source=%s)",
521 intervention,
522 source,
523 )
525 try:
526 existing_state = self.armed_state()
527 state = existing_state
528 if self.calendars:
529 active_calendar_event = self.active_calendar_event()
530 if active_calendar_event:
531 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active")
532 return existing_state
533 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL:
534 _LOGGER.debug(
535 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual"
536 )
537 return existing_state
538 if self.calendar_no_event_mode in AlarmControlPanelState:
539 # TODO: may be dupe logic with on_cal event
540 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode)
541 return await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), ChangeSource.CALENDAR)
542 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO:
543 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto")
544 else:
545 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode)
547 # TODO: expose as config ( for manual disarm override ) and condition logic
548 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING
549 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state:
550 _LOGGER.debug("AUTOARM Ignoring previous interventions")
551 else:
552 last_state_intervention = self.last_state_intervention()
553 if last_state_intervention:
554 _LOGGER.debug(
555 "AUTOARM Ignoring automated reset for %s set by %s at %s",
556 last_state_intervention.state,
557 last_state_intervention.source,
558 last_state_intervention.created_at,
559 )
560 return existing_state
561 state = self.determine_state()
562 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state:
563 state = await self.arm(state, source=source)
564 finally:
565 self.hass.states.async_set(
566 f"{DOMAIN}.last_calculation",
567 str(state is not None and state != existing_state),
568 attributes={
569 "new_state": str(state),
570 "old_state": str(existing_state),
571 "source": source,
572 "active_calendar_event": deobjectify(active_calendar_event),
573 "occupied": self.is_occupied(),
574 "night": self.is_night(),
575 "must_change_state": str(must_change_state),
576 "last_state_intervention": deobjectify(last_state_intervention),
577 "intervention": intervention.as_dict() if intervention else None,
578 "time": dt_util.now().isoformat(),
579 },
580 )
582 return state
584 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool:
585 if requested_at is not None and self.has_intervention_since(requested_at):
586 _LOGGER.debug(
587 "AUTOARM Cancelling delayed operation since subsequent manual action",
588 )
589 return True
590 return False
592 def determine_state(self) -> AlarmControlPanelState | None:
593 """Compute a new state using occupancy, sun and transition conditions"""
594 evaluated_state: AlarmControlPanelState | None = None
595 condition_vars: ConditionVariables = ConditionVariables(
596 self.is_occupied(),
597 self.is_night(),
598 state=self.armed_state(),
599 calendar_event=self.active_calendar_event(),
600 occupied_defaults=self.occupied_defaults,
601 at_home=self.at_home(),
602 not_home=self.not_home(),
603 )
604 for state, checker in self.transitions.items():
605 if self.hass_api.evaluate_condition(checker, condition_vars):
606 _LOGGER.debug("AUTOARM Computed state as % from condition", state)
607 evaluated_state = state
608 break
609 if evaluated_state is None:
610 return None
611 return AlarmControlPanelState(evaluated_state)
613 @callback
614 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None:
615 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at)
616 if self.is_intervention_since_request(requested_at):
617 return
618 await self.arm(**kwargs)
620 async def arm(
621 self, arming_state: AlarmControlPanelState | None = None, source: ChangeSource | None = None
622 ) -> AlarmControlPanelState | None:
623 """Change alarm panel state
625 Args:
626 ----
627 arming_state (str, optional): _description_. Defaults to None.
628 source (str,optional): Source of the change, for example 'calendar' or 'button'
630 Returns:
631 -------
632 str: New arming state
634 """
635 if self.rate_limiter.triggered():
636 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source)
637 return None
638 try:
639 self.arming_in_progress.set()
640 existing_state: AlarmControlPanelState | None = self.armed_state()
641 if arming_state != existing_state:
642 self.hass.states.async_set(
643 entity_id=self.alarm_panel, new_state=str(arming_state), attributes={ATTR_CHANGED_BY: f"{DOMAIN}.{source}"}
644 )
645 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source)
646 return arming_state
647 _LOGGER.debug("Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state)
648 return existing_state
649 except Exception as e:
650 _LOGGER.debug("AUTOARM Failed to arm: %s", e)
651 finally:
652 self.arming_in_progress.clear()
653 return None
655 async def notify(self, message: str, profile: str = "normal", title: str | None = None) -> None:
656 notify_service = None
657 try:
658 # separately merge base dict and data sub-dict as cheap and nasty semi-deep-merge
659 selected_profile = self.notify_profiles.get(profile)
660 base_profile = self.notify_profiles.get("common", {})
661 base_profile_data = base_profile.get("data", {})
662 merged_profile = dict(base_profile)
663 merged_profile_data = dict(base_profile_data)
664 if selected_profile:
665 selected_profile_data: dict = selected_profile.get("data", {})
666 merged_profile.update(selected_profile)
667 merged_profile_data.update(selected_profile_data)
668 merged_profile["data"] = merged_profile_data
669 notify_service = merged_profile.get("service", "").replace("notify.", "")
671 title = title or "Alarm Auto Arming"
672 if notify_service and merged_profile:
673 data = merged_profile.get("data", {})
674 await self.hass.services.async_call(
675 "notify",
676 notify_service,
677 service_data={"message": message, "title": title, "data": data},
678 )
679 else:
680 _LOGGER.debug("AUTOARM Skipped notification, service: %s, data: %s", notify_service, merged_profile)
682 except Exception:
683 _LOGGER.exception("AUTOARM notify.%s failed", notify_service)
685 def schedule_state(
686 self,
687 trigger_time: dt.datetime,
688 state: AlarmControlPanelState | None,
689 intervention: Intervention | None,
690 source: ChangeSource | None = None,
691 ) -> None:
692 source = source or intervention.source if intervention else None
694 job: Any
695 if state is None:
696 _LOGGER.debug("Delayed reset, triggered at: %s, source%s", trigger_time, source)
697 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now())
698 else:
699 _LOGGER.debug("Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source)
701 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now())
703 self.unsubscribes.append(
704 async_track_point_in_time(
705 self.hass,
706 job,
707 trigger_time,
708 )
709 )
711 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention:
712 intervention = Intervention(dt_util.now(), source, state)
713 self.interventions.append(intervention)
714 self.hass.states.async_set(f"{DOMAIN}.last_intervention", source, attributes=intervention.as_dict())
716 return intervention
718 def has_intervention_since(self, cutoff: dt.datetime) -> bool:
719 """Has there been a manual intervention since the cutoff time"""
720 if not self.interventions:
721 return False
722 return any(intervention.created_at > cutoff for intervention in self.interventions)
724 def last_state_intervention(self) -> Intervention | None:
725 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None]
726 if candidates:
727 return candidates[-1]
728 return None
730 @callback
731 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002
732 _LOGGER.debug("AUTOARM Sunrise")
733 now = dt_util.now() # uses Home Assistant's time zone setting
734 if not self.sunrise_cutoff or now.time() >= self.sunrise_cutoff:
735 # sun is up, and not earlier than cutoff
736 await self.reset_armed_state(source=ChangeSource.SUNRISE)
737 elif self.sunrise_cutoff and now.time() < self.sunrise_cutoff:
738 _LOGGER.debug(
739 "AUTOARM Rescheduling delayed sunrise action to %s",
740 self.sunrise_cutoff,
741 )
742 self.schedule_state(
743 dt.datetime.combine(now.date(), self.sunrise_cutoff, tzinfo=dt_util.DEFAULT_TIME_ZONE),
744 intervention=None,
745 state=None,
746 source=ChangeSource.SUNRISE,
747 )
749 @callback
750 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002
751 _LOGGER.debug("AUTOARM Sunset")
752 await self.reset_armed_state(source=ChangeSource.SUNSET)
754 @callback
755 async def on_mobile_action(self, event: Event) -> None:
756 _LOGGER.debug("AUTOARM Mobile Action: %s", event)
757 source: ChangeSource = ChangeSource.MOBILE
759 match event.data.get("action"):
760 case "ALARM_PANEL_DISARM":
761 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED)
762 await self.arm(AlarmControlPanelState.DISARMED, source=source)
763 case "ALARM_PANEL_RESET":
764 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
765 case "ALARM_PANEL_AWAY":
766 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY)
767 await self.arm(AlarmControlPanelState.ARMED_AWAY, source=source)
768 case _:
769 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data)
771 @callback
772 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None:
773 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event)
774 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state)
775 if delay:
776 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON)
777 await self.notify(
778 f"Alarm will be set to {state} in {delay}",
779 title=f"Arm set to {state} process starting",
780 )
781 else:
782 await self.arm(state, source=ChangeSource.BUTTON)
784 @callback
785 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None:
786 _LOGGER.debug("AUTOARM Reset Button: %s", event)
787 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None)
788 if delay:
789 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON)
791 await self.notify(
792 f"Alarm will be reset in {delay}",
793 title="Alarm reset wait initiated",
794 )
795 else:
796 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None))
798 @callback
799 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None:
800 """Listen for person state events
802 Args:
803 ----
804 event (Event[EventStateChangedData]): state change event
806 """
807 entity_id, old, new, new_attributes = self._extract_event(event)
808 if old == new:
809 _LOGGER.debug(
810 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s",
811 entity_id,
812 old,
813 new,
814 event,
815 new_attributes,
816 )
817 return
818 _LOGGER.debug(
819 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes
820 )
821 if new in self.occupied_delay:
822 self.schedule_state(
823 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY
824 )
825 else:
826 await self.reset_armed_state(source=ChangeSource.OCCUPANCY)
828 @callback
829 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None:
830 """Alarm Control Panel has been changed outside of AutoArm"""
831 entity_id, old, new, new_attributes = self._extract_event(event)
832 if new_attributes:
833 changed_by = new_attributes.get(ATTR_CHANGED_BY)
834 if changed_by and changed_by.startswith(f"{DOMAIN}."):
835 _LOGGER.debug(
836 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s",
837 entity_id,
838 event.event_type,
839 old,
840 new,
841 )
842 return
843 new_state = alarm_state_as_enum(new)
845 _LOGGER.info(
846 "AUTOARM Panel Change: %s,%s: %s-->%s",
847 entity_id,
848 event.event_type,
849 old,
850 new,
851 )
852 self.record_intervention(ChangeSource.ALARM_PANEL, new_state)
853 if new in ZOMBIE_STATES:
854 _LOGGER.warning("AUTOARM Dezombifying %s ...", new)
855 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION)
856 elif new != old:
857 message = f"Home Assistant alarm level now set from {old} to {new}"
858 await self.notify(message, title=f"Alarm now {new}", profile="quiet")
859 else:
860 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new)
862 async def on_calendar_event_start(self, event: TrackedCalendarEvent, triggered_at: dt.datetime) -> None:
863 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, triggered_at)
864 if event.arming_state != self.armed_state():
865 _LOGGER.info("AUTOARM Calendar event %s changing arming to %s at %s", event.id, event.arming_state, triggered_at)
866 await self.arm(arming_state=event.arming_state, source=ChangeSource.CALENDAR)
867 self.hass.states.async_set(
868 f"{DOMAIN}.last_calendar_event",
869 new_state=event.event.summary or str(event.id),
870 attributes={
871 "calendar": event.calendar_id,
872 "start": event.event.start_datetime_local,
873 "end": event.event.end_datetime_local,
874 "summary": event.event.summary,
875 "description": event.event.description,
876 "uid": event.event.uid,
877 },
878 )
880 async def on_calendar_event_end(self, event: TrackedCalendarEvent, ended_at: dt.datetime) -> None:
881 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, ended_at)
882 if any(cal.has_active_event() for cal in self.calendars):
883 _LOGGER.debug("AUTOARM No action on event end since other cal event active")
884 return
885 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO:
886 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", event.id)
887 # avoid having state locked in vacation by state calculator
888 await self.pending_state(source=ChangeSource.CALENDAR)
889 await self.reset_armed_state(source=ChangeSource.CALENDAR)
890 elif self.calendar_no_event_mode in AlarmControlPanelState:
891 _LOGGER.info(
892 "AUTOARM Calendar event %s ended, and returning to fixed state %s", event.id, self.calendar_no_event_mode
893 )
894 await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), source=ChangeSource.CALENDAR)
895 else:
896 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode")
897 await self.arm(event.previous_state, source=ChangeSource.CALENDAR)
899 @callback
900 async def housekeeping(self, triggered_at: dt.datetime) -> None:
901 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at)
902 now = dt_util.now()
903 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)]
904 for cal in self.calendars:
905 await cal.prune_events()
906 _LOGGER.debug("AUTOARM Housekeeping finished")