Coverage for custom_components/autoarm/autoarming.py: 88%

512 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-08 20:27 +0000

1import asyncio 

2import contextlib 

3import datetime as dt 

4import json 

5import logging 

6from collections.abc import Callable 

7from dataclasses import dataclass 

8from functools import partial 

9from typing import TYPE_CHECKING, Any 

10 

11import homeassistant.util.dt as dt_util 

12import voluptuous as vol 

13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState 

14from homeassistant.components.calendar import CalendarEvent 

15from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN 

16from homeassistant.components.sun.const import STATE_BELOW_HORIZON 

17from homeassistant.const import CONF_CONDITIONS, CONF_ENTITY_ID, EVENT_HOMEASSISTANT_STOP, SERVICE_RELOAD, STATE_HOME 

18from homeassistant.core import ( 

19 Event, 

20 EventStateChangedData, 

21 HomeAssistant, 

22 ServiceCall, 

23 ServiceResponse, 

24 SupportsResponse, 

25 callback, 

26) 

27from homeassistant.exceptions import ConditionError, HomeAssistantError 

28from homeassistant.helpers import condition as condition 

29from homeassistant.helpers import config_validation as cv 

30from homeassistant.helpers import entity_platform 

31from homeassistant.helpers import issue_registry as ir 

32from homeassistant.helpers.event import ( 

33 async_track_point_in_time, 

34 async_track_state_change_event, 

35 async_track_sunrise, 

36 async_track_sunset, 

37 async_track_time_change, 

38) 

39from homeassistant.helpers.json import ExtendedJSONEncoder 

40from homeassistant.helpers.reload import ( 

41 async_integration_yaml_config, 

42) 

43from homeassistant.helpers.service import async_register_admin_service 

44from homeassistant.helpers.typing import ConfigType 

45from homeassistant.util.hass_dict import HassKey 

46 

47from custom_components.autoarm.hass_api import HomeAssistantAPI 

48 

49from .calendar import TrackedCalendar, TrackedCalendarEvent 

50from .const import ( 

51 ATTR_RESET, 

52 CONF_ALARM_PANEL, 

53 CONF_BUTTONS, 

54 CONF_CALENDAR_CONTROL, 

55 CONF_CALENDAR_NO_EVENT, 

56 CONF_CALENDARS, 

57 CONF_DAY, 

58 CONF_DELAY_TIME, 

59 CONF_DIURNAL, 

60 CONF_EARLIEST, 

61 CONF_NOTIFY, 

62 CONF_OCCUPANCY, 

63 CONF_OCCUPANCY_DEFAULT, 

64 CONF_RATE_LIMIT, 

65 CONF_RATE_LIMIT_CALLS, 

66 CONF_RATE_LIMIT_PERIOD, 

67 CONF_SUNRISE, 

68 CONF_TRANSITIONS, 

69 CONFIG_SCHEMA, 

70 DEFAULT_TRANSITIONS, 

71 DOMAIN, 

72 NO_CAL_EVENT_MODE_AUTO, 

73 NO_CAL_EVENT_MODE_MANUAL, 

74 ChangeSource, 

75 ConditionVariables, 

76) 

77from .helpers import Limiter, alarm_state_as_enum, deobjectify, safe_state 

78 

79if TYPE_CHECKING: 

80 from homeassistant.helpers.condition import ConditionCheckerType 

81 

82_LOGGER = logging.getLogger(__name__) 

83 

84OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS) 

85EPHEMERAL_STATES = ( 

86 AlarmControlPanelState.PENDING, 

87 AlarmControlPanelState.ARMING, 

88 AlarmControlPanelState.DISARMING, 

89 AlarmControlPanelState.TRIGGERED, 

90) 

91ZOMBIE_STATES = ("unknown", "unavailable") 

92NS_MOBILE_ACTIONS = "mobile_actions" 

93PLATFORMS = ["autoarm"] 

94 

95HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN) 

96 

97 

98@dataclass 

99class AutoArmData: 

100 armer: "AlarmArmer" 

101 other_data: dict[str, Any] 

102 

103 

104# async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: 

105async def async_setup( 

106 hass: HomeAssistant, 

107 config: ConfigType, 

108) -> bool: 

109 _ = CONFIG_SCHEMA 

110 if DOMAIN not in config: 

111 _LOGGER.warning("AUTOARM No config found") 

112 return True 

113 config = config.get(DOMAIN, {}) 

114 expose_config_entity(hass, config) 

115 hass.data[HASS_DATA_KEY] = AutoArmData(_async_process_config(hass, config), {}) 

116 await hass.data[HASS_DATA_KEY].armer.initialize() 

117 

118 async def reload_service_handler(service_call: ServiceCall) -> None: 

119 """Reload yaml entities.""" 

120 config = None 

121 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data) 

122 with contextlib.suppress(HomeAssistantError): 

123 config = await async_integration_yaml_config(hass, DOMAIN) 

124 if config is None or DOMAIN not in config: 

125 _LOGGER.warning("AUTOARM reload rejected for lack of config: %s", config) 

126 return 

127 hass.data[HASS_DATA_KEY].armer.shutdown() 

128 expose_config_entity(hass, config[DOMAIN]) 

129 hass.data[HASS_DATA_KEY].armer = _async_process_config(hass, config[DOMAIN]) 

130 await hass.data[HASS_DATA_KEY].armer.initialize() 

131 

132 async_register_admin_service( 

133 hass, 

134 DOMAIN, 

135 SERVICE_RELOAD, 

136 reload_service_handler, 

137 ) 

138 return True 

139 

140 

141def expose_config_entity(hass: HomeAssistant, config: ConfigType) -> None: 

142 data: dict[str, Any] = { 

143 CONF_ALARM_PANEL: config.get(CONF_ALARM_PANEL, {}).get(CONF_ENTITY_ID), 

144 CONF_DIURNAL: config.get(CONF_DIURNAL), 

145 CONF_CALENDAR_CONTROL: config.get(CONF_CALENDAR_CONTROL), 

146 CONF_BUTTONS: config.get(CONF_BUTTONS, {}), 

147 CONF_OCCUPANCY: config.get(CONF_OCCUPANCY, {}), 

148 CONF_NOTIFY: config.get(CONF_NOTIFY, {}), 

149 CONF_RATE_LIMIT: config.get(CONF_RATE_LIMIT, {}), 

150 } 

151 try: 

152 jsonized: str = json.dumps(obj=data, cls=ExtendedJSONEncoder) 

153 hass.states.async_set(f"{DOMAIN}.configured", "valid", json.loads(jsonized)) 

154 except Exception as e: 

155 _LOGGER.error("AUTOARM Failed to expose config data as entity: %s, %s", data, e) 

156 hass.states.async_set(entity_id=f"{DOMAIN}.configured", new_state="partially-valid", attributes={"error": str(e)}) 

157 

158 

159def _async_process_config(hass: HomeAssistant, config: ConfigType) -> "AlarmArmer": 

160 calendar_config: ConfigType = config.get(CONF_CALENDAR_CONTROL, {}) 

161 return AlarmArmer( 

162 hass, 

163 alarm_panel=config[CONF_ALARM_PANEL].get(CONF_ENTITY_ID), 

164 diurnal=config.get(CONF_DIURNAL, {}), 

165 buttons=config.get(CONF_BUTTONS, {}), 

166 occupancy=config[CONF_OCCUPANCY], 

167 notify=config[CONF_NOTIFY], 

168 rate_limit=config.get(CONF_RATE_LIMIT, {}), 

169 calendars=calendar_config.get(CONF_CALENDARS, []), 

170 transitions=config.get(CONF_TRANSITIONS), 

171 calendar_no_event_mode=calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO), 

172 ) 

173 

174 

175def unlisten(listener: Callable[[], None] | None) -> None: 

176 if listener: 

177 try: 

178 listener() 

179 except Exception as e: 

180 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e) 

181 

182 

183@dataclass 

184class Intervention: 

185 """Record of a manual intervention, such as a button push, mobile action or alarm panel change""" 

186 

187 created_at: dt.datetime 

188 source: ChangeSource 

189 state: AlarmControlPanelState | None 

190 

191 def as_dict(self) -> dict[str, str | None]: 

192 return { 

193 "created_at": self.created_at.isoformat(), 

194 "source": str(self.source), 

195 "state": str(self.state) if self.state is not None else None, 

196 } 

197 

198 

199class AlarmArmer: 

200 def __init__( 

201 self, 

202 hass: HomeAssistant, 

203 alarm_panel: str, 

204 buttons: dict[str, ConfigType] | None = None, 

205 occupancy: ConfigType | None = None, 

206 actions: list | None = None, 

207 notify: ConfigType | None = None, 

208 diurnal: ConfigType | None = None, 

209 rate_limit: ConfigType | None = None, 

210 calendar_no_event_mode: str | None = None, 

211 calendars: list[ConfigType] | None = None, 

212 transitions: dict[str, dict[str, list[ConfigType]]] | None = None, 

213 ) -> None: 

214 occupancy = occupancy or {} 

215 rate_limit = rate_limit or {} 

216 diurnal = diurnal or {} 

217 

218 self.hass: HomeAssistant = hass 

219 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone) 

220 self.calendar_configs: list[ConfigType] = calendars or [] 

221 self.calendar_no_event_mode: str = calendar_no_event_mode or NO_CAL_EVENT_MODE_AUTO 

222 self.calendars: list[TrackedCalendar] = [] 

223 self.alarm_panel: str = alarm_panel 

224 self.sunrise_cutoff: dt.time | None = diurnal.get(CONF_SUNRISE, {}).get(CONF_EARLIEST) 

225 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, []) 

226 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get( 

227 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME} 

228 ) 

229 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {}) 

230 self.buttons: ConfigType = buttons or {} 

231 

232 self.actions: list[str] = actions or [] 

233 self.notify_profiles: dict[str, dict] = notify or {} 

234 self.unsubscribes: list = [] 

235 self.pre_pending_state: AlarmControlPanelState | None = None 

236 self.button_device: dict[str, str] = {} 

237 self.arming_in_progress: asyncio.Event = asyncio.Event() 

238 

239 self.rate_limiter: Limiter = Limiter( 

240 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)), 

241 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5), 

242 ) 

243 

244 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass) 

245 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {} 

246 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {} 

247 

248 self.initialization_errors: dict[str, int] = {} 

249 self.interventions: list[Intervention] = [] 

250 self.intervention_ttl: int = 60 

251 

252 async def initialize(self) -> None: 

253 """Async initialization""" 

254 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars)) 

255 

256 self.initialize_alarm_panel() 

257 await self.initialize_calendar() 

258 await self.initialize_logic() 

259 self.initialize_diurnal() 

260 self.initialize_occupancy() 

261 self.initialize_buttons() 

262 self.initialize_integration() 

263 self.initialize_housekeeping() 

264 self.initialize_home_assistant() 

265 await self.reset_armed_state(source=ChangeSource.STARTUP) 

266 

267 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state()) 

268 

269 def record_error(self, stage: str) -> None: 

270 self.initialization_errors.setdefault(stage, 0) 

271 self.initialization_errors[stage] += 1 

272 

273 def initialize_home_assistant(self) -> None: 

274 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once( 

275 EVENT_HOMEASSISTANT_STOP, self.async_shutdown 

276 ) 

277 self.hass.states.async_set( 

278 f"{DOMAIN}.initialized", 

279 "valid" if not self.initialization_errors else "invalid", 

280 attributes=self.initialization_errors, 

281 ) 

282 self.hass.states.async_set(f"{DOMAIN}.last_calculation", "unavailable", attributes={}) 

283 

284 self.hass.services.async_register( 

285 DOMAIN, 

286 "reset_state", 

287 self.reset_service, 

288 supports_response=SupportsResponse.OPTIONAL, 

289 ) 

290 

291 async def reset_service(self, _call: ServiceCall) -> ServiceResponse: 

292 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None)) 

293 return {"change": new_state or "NO_CHANGE"} 

294 

295 def initialize_integration(self) -> None: 

296 self.hass.states.async_set(f"{DOMAIN}.last_intervention", "unavailable", attributes={}) 

297 

298 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

299 

300 def initialize_alarm_panel(self) -> None: 

301 """Set up automation for Home Assistant alarm panel 

302 

303 See https://www.home-assistant.io/integrations/alarm_control_panel/ 

304 

305 Succeeds even if control panel has not yet started, listener will pick up events when it does 

306 """ 

307 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change)) 

308 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel) 

309 

310 def initialize_housekeeping(self) -> None: 

311 self.unsubscribes.append( 

312 async_track_time_change( 

313 self.hass, 

314 action=self.housekeeping, 

315 minute=0, 

316 ) 

317 ) 

318 

319 def initialize_diurnal(self) -> None: 

320 # events API expects a function, however underlying HassJob is fine with coroutines 

321 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore 

322 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore 

323 

324 def initialize_occupancy(self) -> None: 

325 """Configure occupants, and listen for changes in their state""" 

326 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants)) 

327 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change)) 

328 

329 def initialize_buttons(self) -> None: 

330 """Initialize (optional) physical alarm state control buttons""" 

331 

332 def setup_button(state_name: str, button_entity: str, cb: Callable) -> None: 

333 self.button_device[state_name] = button_entity 

334 if self.button_device[state_name]: 

335 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb)) 

336 

337 _LOGGER.debug( 

338 "AUTOARM Configured %s button for %s", 

339 state_name, 

340 self.button_device[state_name], 

341 ) 

342 

343 for button_use, button_config in self.buttons.items(): 

344 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME) 

345 for entity_id in button_config[CONF_ENTITY_ID]: 

346 if button_use == ATTR_RESET: 

347 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay)) 

348 else: 

349 setup_button( 

350 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay) 

351 ) 

352 

353 async def initialize_calendar(self) -> None: 

354 """Configure calendar polling (optional)""" 

355 stage: str = "calendar" 

356 self.hass.states.async_set(f"{DOMAIN}.last_calendar_event", "unavailable", attributes={}) 

357 if not self.calendar_configs: 

358 return 

359 try: 

360 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN) 

361 if platforms: 

362 platform: entity_platform.EntityPlatform = platforms[0] 

363 else: 

364 self.record_error(stage) 

365 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant") 

366 return 

367 except Exception as _e: 

368 self.record_error(stage) 

369 _LOGGER.exception("AUTOARM Unable to access calendar platform") 

370 return 

371 for calendar_config in self.calendar_configs: 

372 tracked_calendar = TrackedCalendar(calendar_config, self) 

373 await tracked_calendar.initialize(platform) 

374 self.calendars.append(tracked_calendar) 

375 

376 async def initialize_logic(self) -> None: 

377 stage: str = "logic" 

378 for state_str, raw_condition in DEFAULT_TRANSITIONS.items(): 

379 if state_str not in self.transition_config: 

380 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str) 

381 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)} 

382 

383 for state_str, transition_config in self.transition_config.items(): 

384 error: str = "" 

385 condition_config = transition_config.get(CONF_CONDITIONS) 

386 if condition_config is None: 

387 error = "Empty conditions" 

388 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition") 

389 else: 

390 try: 

391 state = AlarmControlPanelState(state_str) 

392 cond: ConditionCheckerType | None = await self.hass_api.build_condition( 

393 condition_config, strict=True, validate=True, name=state_str 

394 ) 

395 

396 if cond: 

397 # re-run without strict wrapper 

398 cond = await self.hass_api.build_condition(condition_config, name=state_str) 

399 if cond: 

400 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}") 

401 self.transitions[state] = cond 

402 else: 

403 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}") 

404 error = "Condition validation failed" 

405 except ValueError as ve: 

406 self.record_error(stage) 

407 error = f"Invalid state {ve}" 

408 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}") 

409 except vol.Invalid as vi: 

410 self.record_error(stage) 

411 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}") 

412 error = f"Schema error {vi}" 

413 except ConditionError as ce: 

414 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}") 

415 if hasattr(ce, "message"): 

416 error = ce.message # type: ignore 

417 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore 

418 error = ce.error.message # type: ignore 

419 else: 

420 error = str(ce) 

421 except Exception as e: 

422 self.record_error(stage) 

423 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config) 

424 error = f"Unknown exception {e}" 

425 if error: 

426 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}") 

427 self.hass_api.raise_issue( 

428 f"transition_condition_{state_str}", 

429 is_fixable=False, 

430 issue_key="transition_condition", 

431 issue_map={"state": state_str, "error": error}, 

432 severity=ir.IssueSeverity.ERROR, 

433 ) 

434 

435 async def async_shutdown(self, _event: Event) -> None: 

436 _LOGGER.info("AUTOARM shut down event received") 

437 self.stop_listener = None 

438 self.shutdown() 

439 

440 def shutdown(self) -> None: 

441 _LOGGER.info("AUTOARM shutting down") 

442 for calendar in self.calendars: 

443 calendar.shutdown() 

444 while self.unsubscribes: 

445 unlisten(self.unsubscribes.pop()) 

446 unlisten(self.stop_listener) 

447 self.stop_listener = None 

448 _LOGGER.info("AUTOARM shut down") 

449 

450 def active_calendar_event(self) -> CalendarEvent | None: 

451 events: list[CalendarEvent] = [] 

452 for cal in self.calendars: 

453 events.extend(cal.active_events()) 

454 if events: 

455 # TODO: consider sorting events to LIFO 

456 return events[0] 

457 return None 

458 

459 def is_occupied(self) -> bool: 

460 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants) 

461 

462 def at_home(self) -> list[str]: 

463 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME] 

464 

465 def not_home(self) -> list[str]: 

466 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME] 

467 

468 def is_unoccupied(self) -> bool: 

469 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants) 

470 

471 def is_night(self) -> bool: 

472 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON 

473 

474 def armed_state(self) -> AlarmControlPanelState: 

475 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel)) 

476 alarm_state = alarm_state_as_enum(raw_state) 

477 if alarm_state is None: 

478 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING") 

479 return AlarmControlPanelState.PENDING 

480 return alarm_state 

481 

482 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, Any]]: 

483 entity_id = old = new = None 

484 new_attributes: dict[str, Any] = {} 

485 if event and event.data: 

486 entity_id = event.data.get("entity_id") 

487 old_obj = event.data.get("old_state") 

488 if old_obj: 

489 old = old_obj.state 

490 new_obj = event.data.get("new_state") 

491 if new_obj: 

492 new = new_obj.state 

493 new_attributes = new_obj.attributes 

494 return entity_id, old, new, new_attributes 

495 

496 async def pending_state(self, source: ChangeSource | None) -> None: 

497 self.pre_pending_state = self.armed_state() 

498 await self.arm(AlarmControlPanelState.PENDING, source=source) 

499 

500 @callback 

501 async def delayed_reset_armed_state(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None: 

502 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

503 if self.is_intervention_since_request(requested_at): 

504 return 

505 await self.reset_armed_state(**kwargs) 

506 

507 async def reset_armed_state( 

508 self, intervention: Intervention | None = None, source: ChangeSource | None = None 

509 ) -> str | None: 

510 """Logic to automatically work out appropriate current armed state""" 

511 state: AlarmControlPanelState | None = None 

512 existing_state: AlarmControlPanelState | None = None 

513 must_change_state: bool = False 

514 last_state_intervention: Intervention | None = None 

515 active_calendar_event: CalendarEvent | None = None 

516 

517 if source is None and intervention is not None: 

518 source = intervention.source 

519 _LOGGER.debug( 

520 "AUTOARM reset_armed_state(intervention=%s,source=%s)", 

521 intervention, 

522 source, 

523 ) 

524 

525 try: 

526 existing_state = self.armed_state() 

527 state = existing_state 

528 if self.calendars: 

529 active_calendar_event = self.active_calendar_event() 

530 if active_calendar_event: 

531 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active") 

532 return existing_state 

533 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL: 

534 _LOGGER.debug( 

535 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual" 

536 ) 

537 return existing_state 

538 if self.calendar_no_event_mode in AlarmControlPanelState: 

539 # TODO: may be dupe logic with on_cal event 

540 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode) 

541 return await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), ChangeSource.CALENDAR) 

542 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

543 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto") 

544 else: 

545 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode) 

546 

547 # TODO: expose as config ( for manual disarm override ) and condition logic 

548 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING 

549 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state: 

550 _LOGGER.debug("AUTOARM Ignoring previous interventions") 

551 else: 

552 last_state_intervention = self.last_state_intervention() 

553 if last_state_intervention: 

554 _LOGGER.debug( 

555 "AUTOARM Ignoring automated reset for %s set by %s at %s", 

556 last_state_intervention.state, 

557 last_state_intervention.source, 

558 last_state_intervention.created_at, 

559 ) 

560 return existing_state 

561 state = self.determine_state() 

562 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state: 

563 state = await self.arm(state, source=source) 

564 finally: 

565 self.hass.states.async_set( 

566 f"{DOMAIN}.last_calculation", 

567 str(state is not None and state != existing_state), 

568 attributes={ 

569 "new_state": str(state), 

570 "old_state": str(existing_state), 

571 "source": source, 

572 "active_calendar_event": deobjectify(active_calendar_event), 

573 "occupied": self.is_occupied(), 

574 "night": self.is_night(), 

575 "must_change_state": str(must_change_state), 

576 "last_state_intervention": deobjectify(last_state_intervention), 

577 "intervention": intervention.as_dict() if intervention else None, 

578 "time": dt_util.now().isoformat(), 

579 }, 

580 ) 

581 

582 return state 

583 

584 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool: 

585 if requested_at is not None and self.has_intervention_since(requested_at): 

586 _LOGGER.debug( 

587 "AUTOARM Cancelling delayed operation since subsequent manual action", 

588 ) 

589 return True 

590 return False 

591 

592 def determine_state(self) -> AlarmControlPanelState | None: 

593 """Compute a new state using occupancy, sun and transition conditions""" 

594 evaluated_state: AlarmControlPanelState | None = None 

595 condition_vars: ConditionVariables = ConditionVariables( 

596 self.is_occupied(), 

597 self.is_night(), 

598 state=self.armed_state(), 

599 calendar_event=self.active_calendar_event(), 

600 occupied_defaults=self.occupied_defaults, 

601 at_home=self.at_home(), 

602 not_home=self.not_home(), 

603 ) 

604 for state, checker in self.transitions.items(): 

605 if self.hass_api.evaluate_condition(checker, condition_vars): 

606 _LOGGER.debug("AUTOARM Computed state as % from condition", state) 

607 evaluated_state = state 

608 break 

609 if evaluated_state is None: 

610 return None 

611 return AlarmControlPanelState(evaluated_state) 

612 

613 @callback 

614 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None: 

615 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

616 if self.is_intervention_since_request(requested_at): 

617 return 

618 await self.arm(**kwargs) 

619 

620 async def arm( 

621 self, arming_state: AlarmControlPanelState | None = None, source: ChangeSource | None = None 

622 ) -> AlarmControlPanelState | None: 

623 """Change alarm panel state 

624 

625 Args: 

626 ---- 

627 arming_state (str, optional): _description_. Defaults to None. 

628 source (str,optional): Source of the change, for example 'calendar' or 'button' 

629 

630 Returns: 

631 ------- 

632 str: New arming state 

633 

634 """ 

635 if self.rate_limiter.triggered(): 

636 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source) 

637 return None 

638 try: 

639 self.arming_in_progress.set() 

640 existing_state: AlarmControlPanelState | None = self.armed_state() 

641 if arming_state != existing_state: 

642 self.hass.states.async_set( 

643 entity_id=self.alarm_panel, new_state=str(arming_state), attributes={ATTR_CHANGED_BY: f"{DOMAIN}.{source}"} 

644 ) 

645 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source) 

646 return arming_state 

647 _LOGGER.debug("Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state) 

648 return existing_state 

649 except Exception as e: 

650 _LOGGER.debug("AUTOARM Failed to arm: %s", e) 

651 finally: 

652 self.arming_in_progress.clear() 

653 return None 

654 

655 async def notify(self, message: str, profile: str = "normal", title: str | None = None) -> None: 

656 notify_service = None 

657 try: 

658 # separately merge base dict and data sub-dict as cheap and nasty semi-deep-merge 

659 selected_profile = self.notify_profiles.get(profile) 

660 base_profile = self.notify_profiles.get("common", {}) 

661 base_profile_data = base_profile.get("data", {}) 

662 merged_profile = dict(base_profile) 

663 merged_profile_data = dict(base_profile_data) 

664 if selected_profile: 

665 selected_profile_data: dict = selected_profile.get("data", {}) 

666 merged_profile.update(selected_profile) 

667 merged_profile_data.update(selected_profile_data) 

668 merged_profile["data"] = merged_profile_data 

669 notify_service = merged_profile.get("service", "").replace("notify.", "") 

670 

671 title = title or "Alarm Auto Arming" 

672 if notify_service and merged_profile: 

673 data = merged_profile.get("data", {}) 

674 await self.hass.services.async_call( 

675 "notify", 

676 notify_service, 

677 service_data={"message": message, "title": title, "data": data}, 

678 ) 

679 else: 

680 _LOGGER.debug("AUTOARM Skipped notification, service: %s, data: %s", notify_service, merged_profile) 

681 

682 except Exception: 

683 _LOGGER.exception("AUTOARM notify.%s failed", notify_service) 

684 

685 def schedule_state( 

686 self, 

687 trigger_time: dt.datetime, 

688 state: AlarmControlPanelState | None, 

689 intervention: Intervention | None, 

690 source: ChangeSource | None = None, 

691 ) -> None: 

692 source = source or intervention.source if intervention else None 

693 

694 job: Any 

695 if state is None: 

696 _LOGGER.debug("Delayed reset, triggered at: %s, source%s", trigger_time, source) 

697 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now()) 

698 else: 

699 _LOGGER.debug("Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source) 

700 

701 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now()) 

702 

703 self.unsubscribes.append( 

704 async_track_point_in_time( 

705 self.hass, 

706 job, 

707 trigger_time, 

708 ) 

709 ) 

710 

711 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention: 

712 intervention = Intervention(dt_util.now(), source, state) 

713 self.interventions.append(intervention) 

714 self.hass.states.async_set(f"{DOMAIN}.last_intervention", source, attributes=intervention.as_dict()) 

715 

716 return intervention 

717 

718 def has_intervention_since(self, cutoff: dt.datetime) -> bool: 

719 """Has there been a manual intervention since the cutoff time""" 

720 if not self.interventions: 

721 return False 

722 return any(intervention.created_at > cutoff for intervention in self.interventions) 

723 

724 def last_state_intervention(self) -> Intervention | None: 

725 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None] 

726 if candidates: 

727 return candidates[-1] 

728 return None 

729 

730 @callback 

731 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002 

732 _LOGGER.debug("AUTOARM Sunrise") 

733 now = dt_util.now() # uses Home Assistant's time zone setting 

734 if not self.sunrise_cutoff or now.time() >= self.sunrise_cutoff: 

735 # sun is up, and not earlier than cutoff 

736 await self.reset_armed_state(source=ChangeSource.SUNRISE) 

737 elif self.sunrise_cutoff and now.time() < self.sunrise_cutoff: 

738 _LOGGER.debug( 

739 "AUTOARM Rescheduling delayed sunrise action to %s", 

740 self.sunrise_cutoff, 

741 ) 

742 self.schedule_state( 

743 dt.datetime.combine(now.date(), self.sunrise_cutoff, tzinfo=dt_util.DEFAULT_TIME_ZONE), 

744 intervention=None, 

745 state=None, 

746 source=ChangeSource.SUNRISE, 

747 ) 

748 

749 @callback 

750 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002 

751 _LOGGER.debug("AUTOARM Sunset") 

752 await self.reset_armed_state(source=ChangeSource.SUNSET) 

753 

754 @callback 

755 async def on_mobile_action(self, event: Event) -> None: 

756 _LOGGER.debug("AUTOARM Mobile Action: %s", event) 

757 source: ChangeSource = ChangeSource.MOBILE 

758 

759 match event.data.get("action"): 

760 case "ALARM_PANEL_DISARM": 

761 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED) 

762 await self.arm(AlarmControlPanelState.DISARMED, source=source) 

763 case "ALARM_PANEL_RESET": 

764 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

765 case "ALARM_PANEL_AWAY": 

766 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY) 

767 await self.arm(AlarmControlPanelState.ARMED_AWAY, source=source) 

768 case _: 

769 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data) 

770 

771 @callback 

772 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None: 

773 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event) 

774 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state) 

775 if delay: 

776 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON) 

777 await self.notify( 

778 f"Alarm will be set to {state} in {delay}", 

779 title=f"Arm set to {state} process starting", 

780 ) 

781 else: 

782 await self.arm(state, source=ChangeSource.BUTTON) 

783 

784 @callback 

785 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None: 

786 _LOGGER.debug("AUTOARM Reset Button: %s", event) 

787 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None) 

788 if delay: 

789 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON) 

790 

791 await self.notify( 

792 f"Alarm will be reset in {delay}", 

793 title="Alarm reset wait initiated", 

794 ) 

795 else: 

796 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

797 

798 @callback 

799 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None: 

800 """Listen for person state events 

801 

802 Args: 

803 ---- 

804 event (Event[EventStateChangedData]): state change event 

805 

806 """ 

807 entity_id, old, new, new_attributes = self._extract_event(event) 

808 if old == new: 

809 _LOGGER.debug( 

810 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s", 

811 entity_id, 

812 old, 

813 new, 

814 event, 

815 new_attributes, 

816 ) 

817 return 

818 _LOGGER.debug( 

819 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes 

820 ) 

821 if new in self.occupied_delay: 

822 self.schedule_state( 

823 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY 

824 ) 

825 else: 

826 await self.reset_armed_state(source=ChangeSource.OCCUPANCY) 

827 

828 @callback 

829 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None: 

830 """Alarm Control Panel has been changed outside of AutoArm""" 

831 entity_id, old, new, new_attributes = self._extract_event(event) 

832 if new_attributes: 

833 changed_by = new_attributes.get(ATTR_CHANGED_BY) 

834 if changed_by and changed_by.startswith(f"{DOMAIN}."): 

835 _LOGGER.debug( 

836 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s", 

837 entity_id, 

838 event.event_type, 

839 old, 

840 new, 

841 ) 

842 return 

843 new_state = alarm_state_as_enum(new) 

844 

845 _LOGGER.info( 

846 "AUTOARM Panel Change: %s,%s: %s-->%s", 

847 entity_id, 

848 event.event_type, 

849 old, 

850 new, 

851 ) 

852 self.record_intervention(ChangeSource.ALARM_PANEL, new_state) 

853 if new in ZOMBIE_STATES: 

854 _LOGGER.warning("AUTOARM Dezombifying %s ...", new) 

855 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION) 

856 elif new != old: 

857 message = f"Home Assistant alarm level now set from {old} to {new}" 

858 await self.notify(message, title=f"Alarm now {new}", profile="quiet") 

859 else: 

860 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new) 

861 

862 async def on_calendar_event_start(self, event: TrackedCalendarEvent, triggered_at: dt.datetime) -> None: 

863 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, triggered_at) 

864 if event.arming_state != self.armed_state(): 

865 _LOGGER.info("AUTOARM Calendar event %s changing arming to %s at %s", event.id, event.arming_state, triggered_at) 

866 await self.arm(arming_state=event.arming_state, source=ChangeSource.CALENDAR) 

867 self.hass.states.async_set( 

868 f"{DOMAIN}.last_calendar_event", 

869 new_state=event.event.summary or str(event.id), 

870 attributes={ 

871 "calendar": event.calendar_id, 

872 "start": event.event.start_datetime_local, 

873 "end": event.event.end_datetime_local, 

874 "summary": event.event.summary, 

875 "description": event.event.description, 

876 "uid": event.event.uid, 

877 }, 

878 ) 

879 

880 async def on_calendar_event_end(self, event: TrackedCalendarEvent, ended_at: dt.datetime) -> None: 

881 _LOGGER.debug("AUTOARM on_calendar_event_start(%s,%s)", event.id, ended_at) 

882 if any(cal.has_active_event() for cal in self.calendars): 

883 _LOGGER.debug("AUTOARM No action on event end since other cal event active") 

884 return 

885 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

886 _LOGGER.info("AUTOARM Calendar event %s ended, and arming state", event.id) 

887 # avoid having state locked in vacation by state calculator 

888 await self.pending_state(source=ChangeSource.CALENDAR) 

889 await self.reset_armed_state(source=ChangeSource.CALENDAR) 

890 elif self.calendar_no_event_mode in AlarmControlPanelState: 

891 _LOGGER.info( 

892 "AUTOARM Calendar event %s ended, and returning to fixed state %s", event.id, self.calendar_no_event_mode 

893 ) 

894 await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), source=ChangeSource.CALENDAR) 

895 else: 

896 _LOGGER.debug("AUTOARM Reinstate previous state on calendar event end in manual mode") 

897 await self.arm(event.previous_state, source=ChangeSource.CALENDAR) 

898 

899 @callback 

900 async def housekeeping(self, triggered_at: dt.datetime) -> None: 

901 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at) 

902 now = dt_util.now() 

903 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)] 

904 for cal in self.calendars: 

905 await cal.prune_events() 

906 _LOGGER.debug("AUTOARM Housekeeping finished")