Coverage for custom_components/autoarm/autoarming.py: 87%

535 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-26 21:24 +0000

1import asyncio 

2import contextlib 

3import datetime as dt 

4import json 

5import logging 

6from collections.abc import Callable 

7from dataclasses import dataclass 

8from functools import partial 

9from typing import TYPE_CHECKING, Any 

10 

11import homeassistant.util.dt as dt_util 

12import voluptuous as vol 

13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState 

14from homeassistant.components.calendar import CalendarEvent 

15from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN 

16from homeassistant.components.sun.const import STATE_BELOW_HORIZON 

17from homeassistant.const import CONF_CONDITIONS, CONF_ENTITY_ID, EVENT_HOMEASSISTANT_STOP, SERVICE_RELOAD, STATE_HOME 

18from homeassistant.core import ( 

19 Event, 

20 EventStateChangedData, 

21 HomeAssistant, 

22 ServiceCall, 

23 ServiceResponse, 

24 State, 

25 SupportsResponse, 

26 callback, 

27) 

28from homeassistant.exceptions import ConditionError, HomeAssistantError 

29from homeassistant.helpers import condition as condition 

30from homeassistant.helpers import config_validation as cv 

31from homeassistant.helpers import entity_platform 

32from homeassistant.helpers import issue_registry as ir 

33from homeassistant.helpers.event import ( 

34 async_track_point_in_time, 

35 async_track_state_change_event, 

36 async_track_sunrise, 

37 async_track_sunset, 

38 async_track_time_change, 

39) 

40from homeassistant.helpers.json import ExtendedJSONEncoder 

41from homeassistant.helpers.reload import ( 

42 async_integration_yaml_config, 

43) 

44from homeassistant.helpers.service import async_register_admin_service 

45from homeassistant.helpers.typing import ConfigType 

46from homeassistant.util.hass_dict import HassKey 

47 

48from custom_components.autoarm.hass_api import HomeAssistantAPI 

49 

50from .calendar import TrackedCalendar, TrackedCalendarEvent 

51from .const import ( 

52 ATTR_RESET, 

53 CONF_ALARM_PANEL, 

54 CONF_BUTTONS, 

55 CONF_CALENDAR_CONTROL, 

56 CONF_CALENDAR_NO_EVENT, 

57 CONF_CALENDARS, 

58 CONF_DAY, 

59 CONF_DELAY_TIME, 

60 CONF_DIURNAL, 

61 CONF_EARLIEST, 

62 CONF_NOTIFY, 

63 CONF_OCCUPANCY, 

64 CONF_OCCUPANCY_DEFAULT, 

65 CONF_RATE_LIMIT, 

66 CONF_RATE_LIMIT_CALLS, 

67 CONF_RATE_LIMIT_PERIOD, 

68 CONF_SUNRISE, 

69 CONF_TRANSITIONS, 

70 CONFIG_SCHEMA, 

71 DEFAULT_TRANSITIONS, 

72 DOMAIN, 

73 NO_CAL_EVENT_MODE_AUTO, 

74 NO_CAL_EVENT_MODE_MANUAL, 

75 ChangeSource, 

76 ConditionVariables, 

77) 

78from .helpers import Limiter, alarm_state_as_enum, deobjectify, safe_state 

79 

80if TYPE_CHECKING: 

81 from homeassistant.helpers.condition import ConditionCheckerType 

82 

83_LOGGER = logging.getLogger(__name__) 

84 

85OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS) 

86EPHEMERAL_STATES = ( 

87 AlarmControlPanelState.PENDING, 

88 AlarmControlPanelState.ARMING, 

89 AlarmControlPanelState.DISARMING, 

90 AlarmControlPanelState.TRIGGERED, 

91) 

92ZOMBIE_STATES = ("unknown", "unavailable") 

93NS_MOBILE_ACTIONS = "mobile_actions" 

94PLATFORMS = ["autoarm"] 

95 

96HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN) 

97 

98 

99@dataclass 

100class AutoArmData: 

101 armer: "AlarmArmer" 

102 other_data: dict[str, str | dict[str, str] | list[str] | int | float | bool | None] 

103 

104 

105# async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: 

106async def async_setup( 

107 hass: HomeAssistant, 

108 config: ConfigType, 

109) -> bool: 

110 _ = CONFIG_SCHEMA 

111 if DOMAIN not in config: 

112 _LOGGER.warning("AUTOARM No config found") 

113 return True 

114 config = config.get(DOMAIN, {}) 

115 

116 hass.data[HASS_DATA_KEY] = AutoArmData(_async_process_config(hass, config), {}) 

117 await hass.data[HASS_DATA_KEY].armer.initialize() 

118 

119 async def reload_service_handler(service_call: ServiceCall) -> None: 

120 """Reload yaml entities.""" 

121 config = None 

122 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data) 

123 with contextlib.suppress(HomeAssistantError): 

124 config = await async_integration_yaml_config(hass, DOMAIN) 

125 if config is None or DOMAIN not in config: 

126 _LOGGER.warning("AUTOARM reload rejected for lack of config: %s", config) 

127 return 

128 hass.data[HASS_DATA_KEY].armer.shutdown() 

129 hass.data[HASS_DATA_KEY].armer = _async_process_config(hass, config[DOMAIN]) 

130 await hass.data[HASS_DATA_KEY].armer.initialize() 

131 

132 async_register_admin_service( 

133 hass, 

134 DOMAIN, 

135 SERVICE_RELOAD, 

136 reload_service_handler, 

137 ) 

138 

139 def supplemental_action_enquire_configuration(_call: ServiceCall) -> ConfigType: 

140 data: ConfigType = { 

141 CONF_ALARM_PANEL: config.get(CONF_ALARM_PANEL, {}).get(CONF_ENTITY_ID), 

142 CONF_DIURNAL: config.get(CONF_DIURNAL), 

143 CONF_CALENDAR_CONTROL: config.get(CONF_CALENDAR_CONTROL), 

144 CONF_BUTTONS: config.get(CONF_BUTTONS, {}), 

145 CONF_OCCUPANCY: config.get(CONF_OCCUPANCY, {}), 

146 CONF_NOTIFY: config.get(CONF_NOTIFY, {}), 

147 CONF_RATE_LIMIT: config.get(CONF_RATE_LIMIT, {}), 

148 } 

149 try: 

150 jsonized: str = json.dumps(obj=data, cls=ExtendedJSONEncoder) 

151 return json.loads(jsonized) 

152 except Exception as e: 

153 _LOGGER.error("AUTOARM Failed to expose config data as entity: %s, %s", data, e) 

154 return {"error": str(e)} 

155 

156 hass.services.async_register( 

157 DOMAIN, 

158 "enquire_configuration", 

159 supplemental_action_enquire_configuration, 

160 supports_response=SupportsResponse.ONLY, 

161 ) 

162 

163 return True 

164 

165 

166def _async_process_config(hass: HomeAssistant, config: ConfigType) -> "AlarmArmer": 

167 calendar_config: ConfigType = config.get(CONF_CALENDAR_CONTROL, {}) 

168 migrate(hass) 

169 service: AlarmArmer = AlarmArmer( 

170 hass, 

171 alarm_panel=config[CONF_ALARM_PANEL].get(CONF_ENTITY_ID), 

172 diurnal=config.get(CONF_DIURNAL, {}), 

173 buttons=config.get(CONF_BUTTONS, {}), 

174 occupancy=config[CONF_OCCUPANCY], 

175 notify=config[CONF_NOTIFY], 

176 rate_limit=config.get(CONF_RATE_LIMIT, {}), 

177 calendars=calendar_config.get(CONF_CALENDARS, []), 

178 transitions=config.get(CONF_TRANSITIONS), 

179 calendar_no_event_mode=calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO), 

180 ) 

181 return service 

182 

183 

184def migrate(hass: HomeAssistant) -> None: 

185 for entity_id in ( 

186 "autoarm.configured", 

187 "autoarm.last_calendar_event", 

188 "autoarm.last_intervention", 

189 "autoarm.initialized", 

190 "autoarm.last_calculation", 

191 ): 

192 try: 

193 if hass.states.get(entity_id): 

194 _LOGGER.info("AUTOARM Migration removing legacy entity_id: %s", entity_id) 

195 hass.states.async_remove(entity_id) 

196 except Exception as e: 

197 _LOGGER.warning("AUTOARM Migration fail for %s:%s", entity_id, e) 

198 

199 

200def unlisten(listener: Callable[[], None] | None) -> None: 

201 if listener: 

202 try: 

203 listener() 

204 except Exception as e: 

205 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e) 

206 

207 

208@dataclass 

209class Intervention: 

210 """Record of a manual intervention, such as a button push, mobile action or alarm panel change""" 

211 

212 created_at: dt.datetime 

213 source: ChangeSource 

214 state: AlarmControlPanelState | None 

215 

216 def as_dict(self) -> dict[str, str | None]: 

217 return { 

218 "created_at": self.created_at.isoformat(), 

219 "source": str(self.source), 

220 "state": str(self.state) if self.state is not None else None, 

221 } 

222 

223 

224class AlarmArmer: 

225 def __init__( 

226 self, 

227 hass: HomeAssistant, 

228 alarm_panel: str, 

229 buttons: dict[str, ConfigType] | None = None, 

230 occupancy: ConfigType | None = None, 

231 actions: list | None = None, 

232 notify: ConfigType | None = None, 

233 diurnal: ConfigType | None = None, 

234 rate_limit: ConfigType | None = None, 

235 calendar_no_event_mode: str | None = None, 

236 calendars: list[ConfigType] | None = None, 

237 transitions: dict[str, dict[str, list[ConfigType]]] | None = None, 

238 ) -> None: 

239 occupancy = occupancy or {} 

240 rate_limit = rate_limit or {} 

241 diurnal = diurnal or {} 

242 

243 self.hass: HomeAssistant = hass 

244 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone) 

245 self.calendar_configs: list[ConfigType] = calendars or [] 

246 self.calendar_no_event_mode: str = calendar_no_event_mode or NO_CAL_EVENT_MODE_AUTO 

247 self.calendars: list[TrackedCalendar] = [] 

248 self.alarm_panel: str = alarm_panel 

249 self.sunrise_cutoff: dt.time | None = diurnal.get(CONF_SUNRISE, {}).get(CONF_EARLIEST) 

250 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, []) 

251 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get( 

252 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME} 

253 ) 

254 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {}) 

255 self.buttons: ConfigType = buttons or {} 

256 

257 self.actions: list[str] = actions or [] 

258 self.notify_profiles: dict[str, dict] = notify or {} 

259 self.unsubscribes: list = [] 

260 self.pre_pending_state: AlarmControlPanelState | None = None 

261 self.button_device: dict[str, str] = {} 

262 self.arming_in_progress: asyncio.Event = asyncio.Event() 

263 

264 self.rate_limiter: Limiter = Limiter( 

265 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)), 

266 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5), 

267 ) 

268 

269 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass) 

270 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {} 

271 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {} 

272 

273 self.initialization_errors: dict[str, int] = {} 

274 self.interventions: list[Intervention] = [] 

275 self.intervention_ttl: int = 60 

276 self.failures = 0 

277 

278 async def initialize(self) -> None: 

279 """Async initialization""" 

280 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars)) 

281 

282 self.initialize_alarm_panel() 

283 await self.initialize_calendar() 

284 await self.initialize_logic() 

285 self.initialize_diurnal() 

286 self.initialize_occupancy() 

287 self.initialize_buttons() 

288 self.initialize_integration() 

289 self.initialize_housekeeping() 

290 self.initialize_home_assistant() 

291 await self.reset_armed_state(source=ChangeSource.STARTUP) 

292 

293 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state()) 

294 

295 def record_initialization_error(self, stage: str) -> None: 

296 self.initialization_errors.setdefault(stage, 0) 

297 self.initialization_errors[stage] += 1 

298 self.failures += 1 

299 self.hass.states.async_set( 

300 f"sensor.{DOMAIN}_failures", str(self.failures), attributes={"initialization_errors": self.initialization_errors} 

301 ) 

302 

303 def record_runtime_error(self) -> None: 

304 self.failures += 1 

305 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures)) 

306 

307 def initialize_home_assistant(self) -> None: 

308 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once( 

309 EVENT_HOMEASSISTANT_STOP, self.async_shutdown 

310 ) 

311 self.hass.states.async_set( 

312 f"binary_sensor.{DOMAIN}_initialized", 

313 "valid" if not self.initialization_errors else "invalid", 

314 attributes=self.initialization_errors, 

315 ) 

316 self.hass.states.async_set(f"sensor.{DOMAIN}_failures", str(self.failures)) 

317 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calculation", "unavailable", attributes={}) 

318 

319 self.hass.services.async_register( 

320 DOMAIN, 

321 "reset_state", 

322 self.reset_service, 

323 supports_response=SupportsResponse.OPTIONAL, 

324 ) 

325 

326 async def reset_service(self, _call: ServiceCall) -> ServiceResponse: 

327 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None)) 

328 return {"change": new_state or "NO_CHANGE"} 

329 

330 def initialize_integration(self) -> None: 

331 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", "unavailable", attributes={}) 

332 

333 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

334 

335 def initialize_alarm_panel(self) -> None: 

336 """Set up automation for Home Assistant alarm panel 

337 

338 See https://www.home-assistant.io/integrations/alarm_control_panel/ 

339 

340 Succeeds even if control panel has not yet started, listener will pick up events when it does 

341 """ 

342 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change)) 

343 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel) 

344 

345 def initialize_housekeeping(self) -> None: 

346 self.unsubscribes.append( 

347 async_track_time_change( 

348 self.hass, 

349 action=self.housekeeping, 

350 minute=0, 

351 ) 

352 ) 

353 

354 def initialize_diurnal(self) -> None: 

355 # events API expects a function, however underlying HassJob is fine with coroutines 

356 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore 

357 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore 

358 

359 def initialize_occupancy(self) -> None: 

360 """Configure occupants, and listen for changes in their state""" 

361 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants)) 

362 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change)) 

363 

364 def initialize_buttons(self) -> None: 

365 """Initialize (optional) physical alarm state control buttons""" 

366 

367 def setup_button(state_name: str, button_entity: str, cb: Callable) -> None: 

368 self.button_device[state_name] = button_entity 

369 if self.button_device[state_name]: 

370 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb)) 

371 

372 _LOGGER.debug( 

373 "AUTOARM Configured %s button for %s", 

374 state_name, 

375 self.button_device[state_name], 

376 ) 

377 

378 for button_use, button_config in self.buttons.items(): 

379 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME) 

380 for entity_id in button_config[CONF_ENTITY_ID]: 

381 if button_use == ATTR_RESET: 

382 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay)) 

383 else: 

384 setup_button( 

385 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay) 

386 ) 

387 

388 async def initialize_calendar(self) -> None: 

389 """Configure calendar polling (optional)""" 

390 stage: str = "calendar" 

391 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calendar_event", "unavailable", attributes={}) 

392 if not self.calendar_configs: 

393 return 

394 try: 

395 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN) 

396 if platforms: 

397 platform: entity_platform.EntityPlatform = platforms[0] 

398 else: 

399 self.record_initialization_error(stage) 

400 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant") 

401 return 

402 except Exception as _e: 

403 self.record_initialization_error(stage) 

404 _LOGGER.exception("AUTOARM Unable to access calendar platform") 

405 return 

406 for calendar_config in self.calendar_configs: 

407 tracked_calendar = TrackedCalendar(calendar_config, self) 

408 await tracked_calendar.initialize(platform) 

409 self.calendars.append(tracked_calendar) 

410 

411 async def initialize_logic(self) -> None: 

412 stage: str = "logic" 

413 for state_str, raw_condition in DEFAULT_TRANSITIONS.items(): 

414 if state_str not in self.transition_config: 

415 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str) 

416 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)} 

417 

418 for state_str, transition_config in self.transition_config.items(): 

419 error: str = "" 

420 condition_config = transition_config.get(CONF_CONDITIONS) 

421 if condition_config is None: 

422 error = "Empty conditions" 

423 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition") 

424 else: 

425 try: 

426 state = AlarmControlPanelState(state_str) 

427 cond: ConditionCheckerType | None = await self.hass_api.build_condition( 

428 condition_config, strict=True, validate=True, name=state_str 

429 ) 

430 

431 if cond: 

432 # re-run without strict wrapper 

433 cond = await self.hass_api.build_condition(condition_config, name=state_str) 

434 if cond: 

435 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}") 

436 self.transitions[state] = cond 

437 else: 

438 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}") 

439 error = "Condition validation failed" 

440 except ValueError as ve: 

441 self.record_initialization_error(stage) 

442 error = f"Invalid state {ve}" 

443 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}") 

444 except vol.Invalid as vi: 

445 self.record_initialization_error(stage) 

446 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}") 

447 error = f"Schema error {vi}" 

448 except ConditionError as ce: 

449 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}") 

450 if hasattr(ce, "message"): 

451 error = ce.message # type: ignore 

452 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore[attr-defined] 

453 error = ce.error.message # type: ignore 

454 else: 

455 error = str(ce) 

456 except Exception as e: 

457 self.record_initialization_error(stage) 

458 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config) 

459 error = f"Unknown exception {e}" 

460 if error: 

461 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}") 

462 self.hass_api.raise_issue( 

463 f"transition_condition_{state_str}", 

464 is_fixable=False, 

465 issue_key="transition_condition", 

466 issue_map={"state": state_str, "error": error}, 

467 severity=ir.IssueSeverity.ERROR, 

468 ) 

469 

470 async def async_shutdown(self, _event: Event) -> None: 

471 _LOGGER.info("AUTOARM shut down event received") 

472 self.stop_listener = None 

473 self.shutdown() 

474 

475 def shutdown(self) -> None: 

476 _LOGGER.info("AUTOARM shutting down") 

477 for calendar in self.calendars: 

478 calendar.shutdown() 

479 while self.unsubscribes: 

480 unlisten(self.unsubscribes.pop()) 

481 unlisten(self.stop_listener) 

482 self.stop_listener = None 

483 _LOGGER.info("AUTOARM shut down") 

484 

485 def active_calendar_event(self) -> CalendarEvent | None: 

486 events: list[CalendarEvent] = [] 

487 for cal in self.calendars: 

488 events.extend(cal.active_events()) 

489 if events: 

490 # TODO: consider sorting events to LIFO 

491 return events[0] 

492 return None 

493 

494 def is_occupied(self) -> bool: 

495 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants) 

496 

497 def at_home(self) -> list[str]: 

498 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME] 

499 

500 def not_home(self) -> list[str]: 

501 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME] 

502 

503 def is_unoccupied(self) -> bool: 

504 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants) 

505 

506 def is_night(self) -> bool: 

507 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON 

508 

509 def armed_state(self) -> AlarmControlPanelState: 

510 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel)) 

511 alarm_state = alarm_state_as_enum(raw_state) 

512 if alarm_state is None: 

513 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING") 

514 return AlarmControlPanelState.PENDING 

515 return alarm_state 

516 

517 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, str]]: 

518 entity_id = old = new = None 

519 new_attributes: dict[str, str] = {} 

520 if event and event.data: 

521 entity_id = event.data.get("entity_id") 

522 old_obj = event.data.get("old_state") 

523 if old_obj: 

524 old = old_obj.state 

525 new_obj = event.data.get("new_state") 

526 if new_obj: 

527 new = new_obj.state 

528 new_attributes = new_obj.attributes 

529 return entity_id, old, new, new_attributes 

530 

531 async def pending_state(self, source: ChangeSource | None) -> None: 

532 self.pre_pending_state = self.armed_state() 

533 await self.arm(AlarmControlPanelState.PENDING, source=source) 

534 

535 @callback 

536 async def delayed_reset_armed_state(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None: 

537 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

538 if self.is_intervention_since_request(requested_at): 

539 return 

540 await self.reset_armed_state(**kwargs) 

541 

542 async def reset_armed_state( 

543 self, intervention: Intervention | None = None, source: ChangeSource | None = None 

544 ) -> str | None: 

545 """Logic to automatically work out appropriate current armed state""" 

546 state: AlarmControlPanelState | None = None 

547 existing_state: AlarmControlPanelState | None = None 

548 must_change_state: bool = False 

549 last_state_intervention: Intervention | None = None 

550 active_calendar_event: CalendarEvent | None = None 

551 

552 if source is None and intervention is not None: 

553 source = intervention.source 

554 _LOGGER.debug( 

555 "AUTOARM reset_armed_state(intervention=%s,source=%s)", 

556 intervention, 

557 source, 

558 ) 

559 

560 try: 

561 existing_state = self.armed_state() 

562 state = existing_state 

563 if self.calendars: 

564 active_calendar_event = self.active_calendar_event() 

565 if active_calendar_event: 

566 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active") 

567 return existing_state 

568 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL: 

569 _LOGGER.debug( 

570 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual" 

571 ) 

572 return existing_state 

573 if self.calendar_no_event_mode in AlarmControlPanelState: 

574 # TODO: may be dupe logic with on_cal event 

575 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode) 

576 return await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), ChangeSource.CALENDAR) 

577 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

578 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto") 

579 else: 

580 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode) 

581 

582 # TODO: expose as config ( for manual disarm override ) and condition logic 

583 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING 

584 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state: 

585 _LOGGER.debug("AUTOARM Ignoring previous interventions") 

586 else: 

587 last_state_intervention = self.last_state_intervention() 

588 if last_state_intervention: 

589 _LOGGER.debug( 

590 "AUTOARM Ignoring automated reset for %s set by %s at %s", 

591 last_state_intervention.state, 

592 last_state_intervention.source, 

593 last_state_intervention.created_at, 

594 ) 

595 return existing_state 

596 state = self.determine_state() 

597 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state: 

598 state = await self.arm(state, source=source) 

599 finally: 

600 self.hass.states.async_set( 

601 f"sensor.{DOMAIN}_last_calculation", 

602 str(state is not None and state != existing_state), 

603 attributes={ 

604 "new_state": str(state), 

605 "old_state": str(existing_state), 

606 "source": source, 

607 "active_calendar_event": deobjectify(active_calendar_event), 

608 "occupied": self.is_occupied(), 

609 "night": self.is_night(), 

610 "must_change_state": str(must_change_state), 

611 "last_state_intervention": deobjectify(last_state_intervention), 

612 "intervention": intervention.as_dict() if intervention else None, 

613 "time": dt_util.now().isoformat(), 

614 }, 

615 ) 

616 

617 return state 

618 

619 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool: 

620 if requested_at is not None and self.has_intervention_since(requested_at): 

621 _LOGGER.debug( 

622 "AUTOARM Cancelling delayed operation since subsequent manual action", 

623 ) 

624 return True 

625 return False 

626 

627 def determine_state(self) -> AlarmControlPanelState | None: 

628 """Compute a new state using occupancy, sun and transition conditions""" 

629 evaluated_state: AlarmControlPanelState | None = None 

630 condition_vars: ConditionVariables = ConditionVariables( 

631 self.is_occupied(), 

632 self.is_night(), 

633 state=self.armed_state(), 

634 calendar_event=self.active_calendar_event(), 

635 occupied_defaults=self.occupied_defaults, 

636 at_home=self.at_home(), 

637 not_home=self.not_home(), 

638 ) 

639 for state, checker in self.transitions.items(): 

640 if self.hass_api.evaluate_condition(checker, condition_vars): 

641 _LOGGER.debug("AUTOARM Computed state as % from condition", state) 

642 evaluated_state = state 

643 break 

644 if evaluated_state is None: 

645 return None 

646 return AlarmControlPanelState(evaluated_state) 

647 

648 @callback 

649 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None: 

650 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

651 if self.is_intervention_since_request(requested_at): 

652 return 

653 await self.arm(**kwargs) 

654 

655 async def arm( 

656 self, arming_state: AlarmControlPanelState | None = None, source: ChangeSource | None = None 

657 ) -> AlarmControlPanelState | None: 

658 """Change alarm panel state 

659 

660 Args: 

661 ---- 

662 arming_state (str, optional): _description_. Defaults to None. 

663 source (str,optional): Source of the change, for example 'calendar' or 'button' 

664 

665 Returns: 

666 ------- 

667 str: New arming state 

668 

669 """ 

670 if self.rate_limiter.triggered(): 

671 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source) 

672 return None 

673 try: 

674 self.arming_in_progress.set() 

675 existing_state: AlarmControlPanelState | None = self.armed_state() 

676 if arming_state != existing_state: 

677 attrs: dict[str, str] = {} 

678 panel_state: State | None = self.hass.states.get(self.alarm_panel) 

679 if panel_state: 

680 attrs.update(panel_state.attributes) 

681 attrs[ATTR_CHANGED_BY] = f"{DOMAIN}.{source}" 

682 self.hass.states.async_set(entity_id=self.alarm_panel, new_state=str(arming_state), attributes=attrs) 

683 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source) 

684 return arming_state 

685 _LOGGER.debug("Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state) 

686 return existing_state 

687 except Exception as e: 

688 _LOGGER.error("AUTOARM Failed to arm: %s", e) 

689 self.record_runtime_error() 

690 finally: 

691 self.arming_in_progress.clear() 

692 return None 

693 

694 async def notify(self, message: str, profile: str = "normal", title: str | None = None) -> None: 

695 notify_service = None 

696 try: 

697 # separately merge base dict and data sub-dict as cheap and nasty semi-deep-merge 

698 selected_profile = self.notify_profiles.get(profile) 

699 base_profile = self.notify_profiles.get("common", {}) 

700 base_profile_data = base_profile.get("data", {}) 

701 merged_profile = dict(base_profile) 

702 merged_profile_data = dict(base_profile_data) 

703 if selected_profile: 

704 selected_profile_data: dict = selected_profile.get("data", {}) 

705 merged_profile.update(selected_profile) 

706 merged_profile_data.update(selected_profile_data) 

707 merged_profile["data"] = merged_profile_data 

708 notify_service = merged_profile.get("service", "").replace("notify.", "") 

709 

710 title = title or "Alarm Auto Arming" 

711 if notify_service and merged_profile: 

712 data = merged_profile.get("data", {}) 

713 await self.hass.services.async_call( 

714 "notify", 

715 notify_service, 

716 service_data={"message": message, "title": title, "data": data}, 

717 ) 

718 else: 

719 _LOGGER.debug("AUTOARM Skipped notification, service: %s, data: %s", notify_service, merged_profile) 

720 

721 except Exception: 

722 self.record_runtime_error() 

723 _LOGGER.exception("AUTOARM notify.%s failed", notify_service) 

724 

725 def schedule_state( 

726 self, 

727 trigger_time: dt.datetime, 

728 state: AlarmControlPanelState | None, 

729 intervention: Intervention | None, 

730 source: ChangeSource | None = None, 

731 ) -> None: 

732 source = source or intervention.source if intervention else None 

733 

734 job: Callable 

735 if state is None: 

736 _LOGGER.debug("Delayed reset, triggered at: %s, source%s", trigger_time, source) 

737 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now()) 

738 else: 

739 _LOGGER.debug("Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source) 

740 

741 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now()) 

742 

743 self.unsubscribes.append( 

744 async_track_point_in_time( 

745 self.hass, 

746 job, 

747 trigger_time, 

748 ) 

749 ) 

750 

751 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention: 

752 intervention = Intervention(dt_util.now(), source, state) 

753 self.interventions.append(intervention) 

754 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", source, attributes=intervention.as_dict()) 

755 

756 return intervention 

757 

758 def has_intervention_since(self, cutoff: dt.datetime) -> bool: 

759 """Has there been a manual intervention since the cutoff time""" 

760 if not self.interventions: 

761 return False 

762 return any(intervention.created_at > cutoff for intervention in self.interventions) 

763 

764 def last_state_intervention(self) -> Intervention | None: 

765 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None] 

766 if candidates: 

767 return candidates[-1] 

768 return None 

769 

770 @callback 

771 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002 

772 _LOGGER.debug("AUTOARM Sunrise") 

773 now = dt_util.now() # uses Home Assistant's time zone setting 

774 if not self.sunrise_cutoff or now.time() >= self.sunrise_cutoff: 

775 # sun is up, and not earlier than cutoff 

776 await self.reset_armed_state(source=ChangeSource.SUNRISE) 

777 elif self.sunrise_cutoff and now.time() < self.sunrise_cutoff: 

778 _LOGGER.debug( 

779 "AUTOARM Rescheduling delayed sunrise action to %s", 

780 self.sunrise_cutoff, 

781 ) 

782 self.schedule_state( 

783 dt.datetime.combine(now.date(), self.sunrise_cutoff, tzinfo=dt_util.DEFAULT_TIME_ZONE), 

784 intervention=None, 

785 state=None, 

786 source=ChangeSource.SUNRISE, 

787 ) 

788 

789 @callback 

790 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002 

791 _LOGGER.debug("AUTOARM Sunset") 

792 await self.reset_armed_state(source=ChangeSource.SUNSET) 

793 

794 @callback 

795 async def on_mobile_action(self, event: Event) -> None: 

796 _LOGGER.debug("AUTOARM Mobile Action: %s", event) 

797 source: ChangeSource = ChangeSource.MOBILE 

798 

799 match event.data.get("action"): 

800 case "ALARM_PANEL_DISARM": 

801 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED) 

802 await self.arm(AlarmControlPanelState.DISARMED, source=source) 

803 case "ALARM_PANEL_RESET": 

804 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

805 case "ALARM_PANEL_AWAY": 

806 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY) 

807 await self.arm(AlarmControlPanelState.ARMED_AWAY, source=source) 

808 case _: 

809 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data) 

810 

811 @callback 

812 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None: 

813 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event) 

814 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state) 

815 if delay: 

816 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON) 

817 await self.notify( 

818 f"Alarm will be set to {state} in {delay}", 

819 title=f"Arm set to {state} process starting", 

820 ) 

821 else: 

822 await self.arm(state, source=ChangeSource.BUTTON) 

823 

824 @callback 

825 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None: 

826 _LOGGER.debug("AUTOARM Reset Button: %s", event) 

827 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None) 

828 if delay: 

829 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON) 

830 

831 await self.notify( 

832 f"Alarm will be reset in {delay}", 

833 title="Alarm reset wait initiated", 

834 ) 

835 else: 

836 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

837 

838 @callback 

839 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None: 

840 """Listen for person state events 

841 

842 Args: 

843 ---- 

844 event (Event[EventStateChangedData]): state change event 

845 

846 """ 

847 entity_id, old, new, new_attributes = self._extract_event(event) 

848 if old == new: 

849 _LOGGER.debug( 

850 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s", 

851 entity_id, 

852 old, 

853 new, 

854 event, 

855 new_attributes, 

856 ) 

857 return 

858 _LOGGER.debug( 

859 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes 

860 ) 

861 if new in self.occupied_delay: 

862 self.schedule_state( 

863 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY 

864 ) 

865 else: 

866 await self.reset_armed_state(source=ChangeSource.OCCUPANCY) 

867 

868 @callback 

869 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None: 

870 """Alarm Control Panel has been changed outside of AutoArm""" 

871 entity_id, old, new, new_attributes = self._extract_event(event) 

872 if new_attributes: 

873 changed_by = new_attributes.get(ATTR_CHANGED_BY) 

874 if changed_by and changed_by.startswith(f"{DOMAIN}."): 

875 _LOGGER.debug( 

876 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s", 

877 entity_id, 

878 event.event_type, 

879 old, 

880 new, 

881 ) 

882 return 

883 new_state = alarm_state_as_enum(new) 

884 

885 _LOGGER.info( 

886 "AUTOARM Panel Change: %s,%s: %s-->%s", 

887 entity_id, 

888 event.event_type, 

889 old, 

890 new, 

891 ) 

892 self.record_intervention(ChangeSource.ALARM_PANEL, new_state) 

893 if new in ZOMBIE_STATES: 

894 _LOGGER.warning("AUTOARM Dezombifying %s ...", new) 

895 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION) 

896 elif new != old: 

897 message = f"Home Assistant alarm level now set from {old} to {new}" 

898 await self.notify(message, title=f"Alarm now {new}", profile="quiet") 

899 else: 

900 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new) 

901 

902 async def on_calendar_event_start(self, event: TrackedCalendarEvent, triggered_at: dt.datetime) -> None: 

903 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, triggered_at) 

904 if event.arming_state != self.armed_state(): 

905 _LOGGER.info("AUTOARM Calendar event %s changing arming to %s at %s", event.id, event.arming_state, triggered_at) 

906 await self.arm(arming_state=event.arming_state, source=ChangeSource.CALENDAR) 

907 self.hass.states.async_set( 

908 f"sensor.{DOMAIN}_last_calendar_event", 

909 new_state=event.event.summary or str(event.id), 

910 attributes={ 

911 "calendar": event.calendar_id, 

912 "start": event.event.start_datetime_local, 

913 "end": event.event.end_datetime_local, 

914 "summary": event.event.summary, 

915 "description": event.event.description, 

916 "uid": event.event.uid, 

917 }, 

918 ) 

919 

920 async def on_calendar_event_end(self, event: TrackedCalendarEvent, ended_at: dt.datetime) -> None: 

921 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, ended_at) 

922 if any(cal.has_active_event() for cal in self.calendars): 

923 _LOGGER.debug("AUTOARM No action on event end since other cal event active") 

924 return 

925 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

926 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", event.id) 

927 # avoid having state locked in vacation by state calculator 

928 await self.pending_state(source=ChangeSource.CALENDAR) 

929 await self.reset_armed_state(source=ChangeSource.CALENDAR) 

930 elif self.calendar_no_event_mode in AlarmControlPanelState: 

931 _LOGGER.info( 

932 "AUTOARM Calendar event %s ended, and returning to fixed state %s", event.id, self.calendar_no_event_mode 

933 ) 

934 await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), source=ChangeSource.CALENDAR) 

935 else: 

936 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode") 

937 await self.arm(event.previous_state, source=ChangeSource.CALENDAR) 

938 

939 @callback 

940 async def housekeeping(self, triggered_at: dt.datetime) -> None: 

941 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at) 

942 now = dt_util.now() 

943 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)] 

944 for cal in self.calendars: 

945 await cal.prune_events() 

946 _LOGGER.debug("AUTOARM Housekeeping finished")