Coverage for custom_components/autoarm/autoarming.py: 86%

597 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-17 01:14 +0000

1import asyncio 

2import datetime as dt 

3import json 

4import logging 

5import re 

6from collections.abc import Callable 

7from dataclasses import dataclass 

8from functools import partial 

9from typing import TYPE_CHECKING, Any 

10 

11import homeassistant.util.dt as dt_util 

12import voluptuous as vol 

13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState 

14from homeassistant.components.calendar import CalendarEvent 

15from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN 

16from homeassistant.components.sun.const import STATE_BELOW_HORIZON 

17from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry 

18from homeassistant.const import ( 

19 CONF_CONDITIONS, 

20 CONF_ENTITY_ID, 

21 CONF_SERVICE, 

22 EVENT_HOMEASSISTANT_STOP, 

23 SERVICE_RELOAD, 

24 STATE_HOME, 

25) 

26from homeassistant.core import ( 

27 Event, 

28 EventStateChangedData, 

29 HomeAssistant, 

30 ServiceCall, 

31 ServiceResponse, 

32 State, 

33 SupportsResponse, 

34 callback, 

35) 

36from homeassistant.exceptions import ConditionError, ConfigEntryNotReady, HomeAssistantError 

37from homeassistant.helpers import condition as condition 

38from homeassistant.helpers import config_validation as cv 

39from homeassistant.helpers import entity_platform 

40from homeassistant.helpers import issue_registry as ir 

41from homeassistant.helpers.event import ( 

42 async_track_point_in_time, 

43 async_track_state_change_event, 

44 async_track_sunrise, 

45 async_track_sunset, 

46 async_track_time_change, 

47) 

48from homeassistant.helpers.reload import ( 

49 async_integration_yaml_config, 

50) 

51from homeassistant.helpers.service import async_register_admin_service 

52from homeassistant.helpers.typing import ConfigType 

53from homeassistant.util.hass_dict import HassKey 

54 

55from custom_components.autoarm.hass_api import HomeAssistantAPI 

56from custom_components.autoarm.notifier import Notifier 

57 

58from .calendar import TrackedCalendar 

59from .config_flow import ( 

60 CONF_CALENDAR_ENTITIES, 

61 CONF_NO_EVENT_MODE, 

62 CONF_NOTIFY_ACTION, 

63 CONF_NOTIFY_ENABLED, 

64 CONF_NOTIFY_TARGETS, 

65 CONF_OCCUPANCY_DEFAULT_DAY, 

66 CONF_OCCUPANCY_DEFAULT_NIGHT, 

67 CONF_PERSON_ENTITIES, 

68 CONF_SUNRISE_EARLIEST, 

69 CONF_SUNRISE_LATEST, 

70 CONF_SUNSET_EARLIEST, 

71 CONF_SUNSET_LATEST, 

72 DEFAULT_NOTIFY_ACTION, 

73) 

74from .const import ( 

75 ATTR_RESET, 

76 CONF_ALARM_PANEL, 

77 CONF_BUTTONS, 

78 CONF_CALENDAR_CONTROL, 

79 CONF_CALENDAR_EVENT_STATES, 

80 CONF_CALENDAR_NO_EVENT, 

81 CONF_CALENDAR_POLL_INTERVAL, 

82 CONF_CALENDARS, 

83 CONF_DAY, 

84 CONF_DELAY_TIME, 

85 CONF_DIURNAL, 

86 CONF_EARLIEST, 

87 CONF_LATEST, 

88 CONF_NIGHT, 

89 CONF_NOTIFY, 

90 CONF_OCCUPANCY, 

91 CONF_OCCUPANCY_DEFAULT, 

92 CONF_RATE_LIMIT, 

93 CONF_RATE_LIMIT_CALLS, 

94 CONF_RATE_LIMIT_PERIOD, 

95 CONF_SUNRISE, 

96 CONF_SUNSET, 

97 CONF_TRANSITIONS, 

98 CONFIG_SCHEMA, 

99 DEFAULT_TRANSITIONS, 

100 DOMAIN, 

101 NO_CAL_EVENT_MODE_AUTO, 

102 NO_CAL_EVENT_MODE_MANUAL, 

103 NOTIFY_COMMON, 

104 YAML_DATA_KEY, 

105 ChangeSource, 

106 ConditionVariables, 

107) 

108from .helpers import AppHealthTracker, ExtendedExtendedJSONEncoder, Limiter, alarm_state_as_enum, deobjectify, safe_state 

109 

110if TYPE_CHECKING: 

111 from homeassistant.helpers.condition import ConditionCheckerType 

112 

113_LOGGER = logging.getLogger(__name__) 

114 

115OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS) 

116EPHEMERAL_STATES = ( 

117 AlarmControlPanelState.PENDING, 

118 AlarmControlPanelState.ARMING, 

119 AlarmControlPanelState.DISARMING, 

120 AlarmControlPanelState.TRIGGERED, 

121) 

122ZOMBIE_STATES = ("unknown", "unavailable") 

123NS_MOBILE_ACTIONS = "mobile_actions" 

124PLATFORMS = ["autoarm"] 

125 

126HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN) 

127 

128 

129@dataclass 

130class AutoArmData: 

131 armer: "AlarmArmer" 

132 other_data: dict[str, str | dict[str, str] | list[str] | int | float | bool | None] 

133 

134 

135async def async_setup( 

136 hass: HomeAssistant, 

137 config: ConfigType, 

138) -> bool: 

139 _ = CONFIG_SCHEMA 

140 yaml_config: ConfigType = config.get(DOMAIN, {}) 

141 if yaml_config or YAML_DATA_KEY not in hass.data: 

142 hass.data[YAML_DATA_KEY] = yaml_config 

143 

144 has_alarm_panel = CONF_ALARM_PANEL in yaml_config 

145 existing_entries = hass.config_entries.async_entries(DOMAIN) 

146 

147 if has_alarm_panel and not existing_entries: 

148 _LOGGER.info("AUTOARM Triggering import of YAML configuration to ConfigEntry") 

149 hass.async_create_task(hass.config_entries.flow.async_init(DOMAIN, context={"source": SOURCE_IMPORT}, data=yaml_config)) 

150 elif has_alarm_panel and existing_entries: 

151 _LOGGER.warning("AUTOARM YAML core config present but ConfigEntry already exists; ignoring YAML core settings") 

152 ir.async_create_issue( 

153 hass, 

154 DOMAIN, 

155 "yaml_core_config_deprecated", 

156 is_fixable=False, 

157 severity=ir.IssueSeverity.WARNING, 

158 translation_key="yaml_core_config_deprecated", 

159 ) 

160 

161 async def reload_service_handler(service_call: ServiceCall) -> None: 

162 """Reload yaml entities.""" 

163 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data) 

164 try: 

165 fresh_config = await async_integration_yaml_config(hass, DOMAIN) 

166 except HomeAssistantError as err: 

167 raise HomeAssistantError(f"Failed to reload YAML configuration: {err}") from err 

168 if fresh_config is not None and DOMAIN in fresh_config: 

169 hass.data[YAML_DATA_KEY] = fresh_config[DOMAIN] 

170 else: 

171 hass.data[YAML_DATA_KEY] = {} 

172 entries = hass.config_entries.async_entries(DOMAIN) 

173 for entry in entries: 

174 await hass.config_entries.async_reload(entry.entry_id) 

175 

176 async_register_admin_service( 

177 hass, 

178 DOMAIN, 

179 SERVICE_RELOAD, 

180 reload_service_handler, 

181 ) 

182 

183 def supplemental_action_enquire_configuration(_call: ServiceCall) -> ConfigType: 

184 entries = hass.config_entries.async_entries(DOMAIN) 

185 if not entries: 

186 raise HomeAssistantError("No config entry found for AutoArm") 

187 entry = entries[0] 

188 stashed_yaml = hass.data.get(YAML_DATA_KEY, {}) 

189 data: ConfigType = { 

190 CONF_ALARM_PANEL: entry.data.get(CONF_ALARM_PANEL), 

191 CONF_DIURNAL: { 

192 CONF_SUNRISE: { 

193 CONF_EARLIEST: entry.options.get(CONF_SUNRISE_EARLIEST), 

194 CONF_LATEST: entry.options.get(CONF_SUNRISE_LATEST), 

195 }, 

196 CONF_SUNSET: { 

197 CONF_EARLIEST: entry.options.get(CONF_SUNSET_EARLIEST), 

198 CONF_LATEST: entry.options.get(CONF_SUNSET_LATEST), 

199 }, 

200 }, 

201 CONF_CALENDAR_CONTROL: stashed_yaml.get(CONF_CALENDAR_CONTROL), 

202 CONF_BUTTONS: stashed_yaml.get(CONF_BUTTONS, {}), 

203 CONF_OCCUPANCY: { 

204 CONF_ENTITY_ID: entry.options.get(CONF_PERSON_ENTITIES, []), 

205 CONF_OCCUPANCY_DEFAULT: { 

206 CONF_DAY: entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "armed_home"), 

207 }, 

208 }, 

209 CONF_NOTIFY: { 

210 CONF_SERVICE: entry.options.get(CONF_NOTIFY_ACTION) 

211 or stashed_yaml.get(CONF_NOTIFY, {}).get(NOTIFY_COMMON, {}).get(CONF_SERVICE, DEFAULT_NOTIFY_ACTION), 

212 "targets": entry.options.get(CONF_NOTIFY_TARGETS, []), 

213 "profiles": stashed_yaml.get(CONF_NOTIFY, {}), 

214 "enabled": entry.options.get(CONF_NOTIFY_ENABLED, True), 

215 }, 

216 CONF_RATE_LIMIT: stashed_yaml.get(CONF_RATE_LIMIT, {}), 

217 } 

218 try: 

219 jsonized: str = json.dumps(obj=data, cls=ExtendedExtendedJSONEncoder) 

220 return json.loads(jsonized) 

221 except Exception as err: 

222 raise HomeAssistantError(f"Failed to serialize configuration: {err}") from err 

223 

224 hass.services.async_register( 

225 DOMAIN, 

226 "enquire_configuration", 

227 supplemental_action_enquire_configuration, 

228 supports_response=SupportsResponse.ONLY, 

229 ) 

230 

231 return True 

232 

233 

234async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: 

235 """Set up Auto Arm from a config entry.""" 

236 yaml_config: ConfigType = hass.data.get(YAML_DATA_KEY, {}) 

237 try: 

238 armer = _build_armer_from_entry(hass, entry, yaml_config) 

239 hass.data[HASS_DATA_KEY] = AutoArmData(armer, {}) 

240 await armer.initialize() 

241 except Exception as err: 

242 raise ConfigEntryNotReady(f"Failed to initialize Auto Arm: {err}") from err 

243 entry.async_on_unload(entry.add_update_listener(_async_update_listener)) 

244 return True 

245 

246 

247async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: # noqa: ARG001 

248 """Unload Auto Arm config entry.""" 

249 if HASS_DATA_KEY in hass.data: 

250 hass.data[HASS_DATA_KEY].armer.shutdown() 

251 del hass.data[HASS_DATA_KEY] 

252 return True 

253 

254 

255async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: 

256 """Handle options update by reloading the entry.""" 

257 await hass.config_entries.async_reload(entry.entry_id) 

258 

259 

260def _build_armer_from_entry(hass: HomeAssistant, entry: ConfigEntry, yaml_config: ConfigType) -> "AlarmArmer": 

261 """Build an AlarmArmer instance from ConfigEntry data/options merged with YAML.""" 

262 migrate(hass) 

263 

264 alarm_panel: str = entry.data[CONF_ALARM_PANEL] 

265 person_entities: list[str] = entry.options.get(CONF_PERSON_ENTITIES, []) 

266 calendar_entities: list[str] = entry.options.get(CONF_CALENDAR_ENTITIES, []) 

267 occupancy_default_day: str = entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "disarmed") 

268 occupancy_default_night: str | None = entry.options.get(CONF_OCCUPANCY_DEFAULT_NIGHT, "armed_night") 

269 no_event_mode: str = entry.options.get(CONF_NO_EVENT_MODE, NO_CAL_EVENT_MODE_AUTO) 

270 

271 # Build occupancy config 

272 yaml_occupancy = yaml_config.get(CONF_OCCUPANCY, {}) 

273 occupancy_defaults: dict[str, str] = {CONF_DAY: occupancy_default_day} 

274 if occupancy_default_night: 

275 occupancy_defaults[CONF_NIGHT] = occupancy_default_night 

276 occupancy: ConfigType = { 

277 CONF_ENTITY_ID: person_entities, 

278 CONF_OCCUPANCY_DEFAULT: occupancy_defaults, 

279 } 

280 yaml_delay_time = yaml_occupancy.get(CONF_DELAY_TIME) if isinstance(yaml_occupancy, dict) else None 

281 if yaml_delay_time: 

282 occupancy[CONF_DELAY_TIME] = yaml_delay_time 

283 

284 # Build calendar config 

285 yaml_calendar_control = yaml_config.get(CONF_CALENDAR_CONTROL, {}) 

286 yaml_calendars: list[ConfigType] = yaml_calendar_control.get(CONF_CALENDARS, []) if yaml_calendar_control else [] 

287 yaml_cal_by_entity: dict[str, ConfigType] = {cal[CONF_ENTITY_ID]: cal for cal in yaml_calendars if CONF_ENTITY_ID in cal} 

288 

289 calendar_list: list[ConfigType] = [] 

290 for cal_entity_id in calendar_entities: 

291 yaml_override = yaml_cal_by_entity.get(cal_entity_id, {}) 

292 cal_config: ConfigType = { 

293 CONF_ENTITY_ID: cal_entity_id, 

294 CONF_CALENDAR_POLL_INTERVAL: yaml_override.get(CONF_CALENDAR_POLL_INTERVAL, 15), 

295 CONF_CALENDAR_EVENT_STATES: yaml_override.get(CONF_CALENDAR_EVENT_STATES, _validated_default_calendar_mappings()), 

296 } 

297 calendar_list.append(cal_config) 

298 

299 calendar_config: ConfigType = {} 

300 if calendar_list: 

301 calendar_config = { 

302 CONF_CALENDAR_NO_EVENT: no_event_mode, 

303 CONF_CALENDARS: calendar_list, 

304 } 

305 

306 # Build notify config: service from options overrides YAML when explicitly set 

307 notify_profiles = yaml_config.get(CONF_NOTIFY, {}) 

308 

309 # Build diurnal cutoffs: options take priority, YAML is fallback 

310 yaml_diurnal = yaml_config.get(CONF_DIURNAL, {}) or {} 

311 yaml_sunrise = yaml_diurnal.get(CONF_SUNRISE, {}) or {} 

312 yaml_sunset = yaml_diurnal.get(CONF_SUNSET, {}) or {} 

313 

314 def _parse_time(option_key: str, yaml_fallback: dt.time | None) -> dt.time | None: 

315 val = entry.options.get(option_key) 

316 if val is not None: 

317 return cv.time(val) if isinstance(val, str) else val 

318 return yaml_fallback 

319 

320 return AlarmArmer( 

321 hass, 

322 alarm_panel=alarm_panel, 

323 sunrise_earliest=_parse_time(CONF_SUNRISE_EARLIEST, yaml_sunrise.get(CONF_EARLIEST)), 

324 sunrise_latest=_parse_time(CONF_SUNRISE_LATEST, yaml_sunrise.get(CONF_LATEST)), 

325 sunset_earliest=_parse_time(CONF_SUNSET_EARLIEST, yaml_sunset.get(CONF_EARLIEST)), 

326 sunset_latest=_parse_time(CONF_SUNSET_LATEST, yaml_sunset.get(CONF_LATEST)), 

327 buttons=yaml_config.get(CONF_BUTTONS, {}), 

328 occupancy=occupancy, 

329 notify_profiles=notify_profiles, 

330 notify_enabled=entry.options.get(CONF_NOTIFY_ENABLED, False), 

331 notify_action=entry.options.get(CONF_NOTIFY_ACTION), 

332 notify_targets=entry.options.get(CONF_NOTIFY_TARGETS, []), 

333 rate_limit=yaml_config.get(CONF_RATE_LIMIT, {}), 

334 calendar_config=calendar_config, 

335 transitions=yaml_config.get(CONF_TRANSITIONS), 

336 ) 

337 

338 

339def _validated_default_calendar_mappings() -> dict[str, list[re.Pattern[str]]]: 

340 """Build default calendar event state mappings with compiled regex patterns. 

341 

342 Mirrors the schema validation that CALENDAR_SCHEMA applies (ensure_list + is_regex). 

343 """ 

344 from .const import DEFAULT_CALENDAR_MAPPINGS 

345 

346 result: dict[str, list[re.Pattern[str]]] = {} 

347 for state, patterns in DEFAULT_CALENDAR_MAPPINGS.items(): 

348 state_str = str(state) 

349 if isinstance(patterns, str): 

350 patterns = [patterns] 

351 result[state_str] = [re.compile(p) if isinstance(p, str) else p for p in patterns] 

352 return result 

353 

354 

355def migrate(hass: HomeAssistant) -> None: 

356 for entity_id in ( 

357 "autoarm.configured", 

358 "autoarm.last_calendar_event", 

359 "autoarm.last_intervention", 

360 "autoarm.initialized", 

361 "autoarm.last_calculation", 

362 ): 

363 try: 

364 if hass.states.get(entity_id): 

365 _LOGGER.info("AUTOARM Migration removing legacy entity_id: %s", entity_id) 

366 hass.states.async_remove(entity_id) 

367 except Exception as e: 

368 _LOGGER.warning("AUTOARM Migration fail for %s:%s", entity_id, e) 

369 

370 

371def unlisten(listener: Callable[[], None] | None) -> None: 

372 if listener: 

373 try: 

374 listener() 

375 except Exception as e: 

376 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e) 

377 

378 

379@dataclass 

380class Intervention: 

381 """Record of a manual intervention, such as a button push, mobile action or alarm panel change""" 

382 

383 created_at: dt.datetime 

384 source: ChangeSource 

385 state: AlarmControlPanelState | None 

386 

387 def as_dict(self) -> dict[str, str | None]: 

388 return { 

389 "created_at": self.created_at.isoformat(), 

390 "source": str(self.source), 

391 "state": str(self.state) if self.state is not None else None, 

392 } 

393 

394 

395class AlarmArmer: 

396 def __init__( 

397 self, 

398 hass: HomeAssistant, 

399 alarm_panel: str, 

400 buttons: dict[str, ConfigType] | None = None, 

401 occupancy: ConfigType | None = None, 

402 actions: list[str] | None = None, 

403 notify_enabled: bool = True, 

404 notify_action: str | None = None, 

405 notify_targets: list[str] | None = None, 

406 notify_profiles: ConfigType | None = None, 

407 sunrise_earliest: dt.time | None = None, 

408 sunrise_latest: dt.time | None = None, 

409 sunset_earliest: dt.time | None = None, 

410 sunset_latest: dt.time | None = None, 

411 rate_limit: ConfigType | None = None, 

412 calendar_config: ConfigType | None = None, 

413 transitions: dict[str, dict[str, list[ConfigType]]] | None = None, 

414 ) -> None: 

415 occupancy = occupancy or {} 

416 rate_limit = rate_limit or {} 

417 

418 self.hass: HomeAssistant = hass 

419 self.app_health_tracker: AppHealthTracker = AppHealthTracker(hass) 

420 if notify_enabled and not notify_profiles and not notify_action: 

421 _LOGGER.warning("AUTOARM Notification disabled - no config") 

422 notify_enabled = False 

423 if notify_enabled: 

424 self.notifier: Notifier | None = Notifier( 

425 notify_profiles, hass, self.app_health_tracker, notify_action, notify_targets 

426 ) 

427 else: 

428 self.notifier = None 

429 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone) 

430 calendar_config = calendar_config or {} 

431 self.calendar_configs: list[ConfigType] = calendar_config.get(CONF_CALENDARS, []) or [] 

432 self.calendars: list[TrackedCalendar] = [] 

433 self.calendar_no_event_mode: str | None = calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO) 

434 self.alarm_panel: str = alarm_panel 

435 self.sunrise_earliest: dt.time | None = sunrise_earliest 

436 self.sunrise_latest: dt.time | None = sunrise_latest 

437 self.sunset_earliest: dt.time | None = sunset_earliest 

438 self.sunset_latest: dt.time | None = sunset_latest 

439 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, []) 

440 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get( 

441 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME} 

442 ) 

443 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {}) 

444 self.buttons: ConfigType = buttons or {} 

445 

446 self.actions: list[str] = actions or [] 

447 self.unsubscribes: list[Callable[[], None]] = [] 

448 self.pre_pending_state: AlarmControlPanelState | None = None 

449 self.button_device: dict[str, str] = {} 

450 self.arming_in_progress: asyncio.Event = asyncio.Event() 

451 

452 self.rate_limiter: Limiter = Limiter( 

453 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)), 

454 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5), 

455 ) 

456 

457 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass) 

458 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {} 

459 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {} 

460 

461 self.interventions: list[Intervention] = [] 

462 self.intervention_ttl: int = 60 

463 

464 async def initialize(self) -> None: 

465 """Async initialization""" 

466 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars)) 

467 

468 self.initialize_alarm_panel() 

469 await self.initialize_calendar() 

470 await self.initialize_logic() 

471 self.initialize_diurnal() 

472 self.initialize_occupancy() 

473 self.initialize_buttons() 

474 self.initialize_integration() 

475 self.initialize_housekeeping() 

476 self.initialize_home_assistant() 

477 await self.reset_armed_state(source=ChangeSource.STARTUP) 

478 

479 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state()) 

480 

481 def initialize_home_assistant(self) -> None: 

482 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once( 

483 EVENT_HOMEASSISTANT_STOP, self.async_shutdown 

484 ) 

485 self.app_health_tracker.app_initialized() 

486 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calculation", "unavailable", attributes={}) 

487 

488 self.hass.services.async_register( 

489 DOMAIN, 

490 "reset_state", 

491 self.reset_service, 

492 supports_response=SupportsResponse.OPTIONAL, 

493 ) 

494 

495 async def reset_service(self, _call: ServiceCall) -> ServiceResponse: 

496 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None)) 

497 return {"change": new_state or "NO_CHANGE"} 

498 

499 def initialize_integration(self) -> None: 

500 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", "unavailable", attributes={}) 

501 

502 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

503 

504 def initialize_alarm_panel(self) -> None: 

505 """Set up automation for Home Assistant alarm panel 

506 

507 See https://www.home-assistant.io/integrations/alarm_control_panel/ 

508 

509 Succeeds even if control panel has not yet started, listener will pick up events when it does 

510 """ 

511 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change)) 

512 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel) 

513 

514 def initialize_housekeeping(self) -> None: 

515 self.unsubscribes.append( 

516 async_track_time_change( 

517 self.hass, 

518 action=self.housekeeping, 

519 minute=0, 

520 ) 

521 ) 

522 

523 def initialize_diurnal(self) -> None: 

524 # events API expects a function, however underlying HassJob is fine with coroutines 

525 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore 

526 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore 

527 if self.sunrise_latest: 

528 self.unsubscribes.append( 

529 async_track_time_change( 

530 self.hass, 

531 self.on_sunrise_latest, 

532 hour=self.sunrise_latest.hour, 

533 minute=self.sunrise_latest.minute, 

534 second=self.sunrise_latest.second, 

535 ) 

536 ) 

537 if self.sunset_latest: 

538 self.unsubscribes.append( 

539 async_track_time_change( 

540 self.hass, 

541 self.on_sunset_latest, 

542 hour=self.sunset_latest.hour, 

543 minute=self.sunset_latest.minute, 

544 second=self.sunset_latest.second, 

545 ) 

546 ) 

547 

548 def initialize_occupancy(self) -> None: 

549 """Configure occupants, and listen for changes in their state""" 

550 if self.occupants: 

551 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants)) 

552 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change)) 

553 else: 

554 _LOGGER.info("AUTOARM Occupancy not configured") 

555 

556 def initialize_buttons(self) -> None: 

557 """Initialize (optional) physical alarm state control buttons""" 

558 

559 def setup_button(state_name: str, button_entity: str, cb: Callable) -> None: 

560 self.button_device[state_name] = button_entity 

561 if self.button_device[state_name]: 

562 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb)) 

563 

564 _LOGGER.debug( 

565 "AUTOARM Configured %s button for %s", 

566 state_name, 

567 self.button_device[state_name], 

568 ) 

569 

570 for button_use, button_config in self.buttons.items(): 

571 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME) 

572 for entity_id in button_config[CONF_ENTITY_ID]: 

573 if button_use == ATTR_RESET: 

574 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay)) 

575 else: 

576 setup_button( 

577 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay) 

578 ) 

579 

580 async def initialize_calendar(self) -> None: 

581 """Configure calendar polling (optional)""" 

582 stage: str = "calendar" 

583 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calendar_event", "unavailable", attributes={}) 

584 if not self.calendar_configs: 

585 return 

586 try: 

587 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN) 

588 if platforms: 

589 platform: entity_platform.EntityPlatform = platforms[0] 

590 else: 

591 self.app_health_tracker.record_initialization_error(stage) 

592 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant") 

593 return 

594 except Exception as _e: 

595 self.app_health_tracker.record_initialization_error(stage) 

596 _LOGGER.exception("AUTOARM Unable to access calendar platform") 

597 return 

598 for calendar_config in self.calendar_configs: 

599 tracked_calendar = TrackedCalendar( 

600 self.hass, calendar_config, self.calendar_no_event_mode, self, self.app_health_tracker 

601 ) 

602 await tracked_calendar.initialize(platform) 

603 self.calendars.append(tracked_calendar) 

604 

605 async def initialize_logic(self) -> None: 

606 stage: str = "logic" 

607 for state_str, raw_condition in DEFAULT_TRANSITIONS.items(): 

608 if state_str not in self.transition_config: 

609 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str) 

610 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)} 

611 

612 for state_str, transition_config in self.transition_config.items(): 

613 error: str = "" 

614 condition_config = transition_config.get(CONF_CONDITIONS) 

615 if condition_config is None: 

616 error = "Empty conditions" 

617 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition") 

618 else: 

619 try: 

620 state = AlarmControlPanelState(state_str) 

621 cond: ConditionCheckerType | None = await self.hass_api.build_condition( 

622 condition_config, strict=True, validate=True, name=state_str 

623 ) 

624 

625 if cond: 

626 # re-run without strict wrapper 

627 cond = await self.hass_api.build_condition(condition_config, name=state_str) 

628 if cond: 

629 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}") 

630 self.transitions[state] = cond 

631 else: 

632 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}") 

633 error = "Condition validation failed" 

634 except ValueError as ve: 

635 self.app_health_tracker.record_initialization_error(stage) 

636 error = f"Invalid state {ve}" 

637 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}") 

638 except vol.Invalid as vi: 

639 self.app_health_tracker.record_initialization_error(stage) 

640 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}") 

641 error = f"Schema error {vi}" 

642 except ConditionError as ce: 

643 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}") 

644 if hasattr(ce, "message"): 

645 error = ce.message # type: ignore 

646 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore[attr-defined] 

647 error = ce.error.message # type: ignore 

648 else: 

649 error = str(ce) 

650 except Exception as e: 

651 self.app_health_tracker.record_initialization_error(stage) 

652 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config) 

653 error = f"Unknown exception {e}" 

654 if error: 

655 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}") 

656 self.hass_api.raise_issue( 

657 f"transition_condition_{state_str}", 

658 is_fixable=False, 

659 issue_key="transition_condition", 

660 issue_map={"state": state_str, "error": error}, 

661 severity=ir.IssueSeverity.ERROR, 

662 ) 

663 

664 async def async_shutdown(self, _event: Event) -> None: 

665 _LOGGER.info("AUTOARM shut down event received") 

666 self.stop_listener = None 

667 self.shutdown() 

668 

669 def shutdown(self) -> None: 

670 _LOGGER.info("AUTOARM shutting down") 

671 for calendar in self.calendars: 

672 calendar.shutdown() 

673 while self.unsubscribes: 

674 unlisten(self.unsubscribes.pop()) 

675 unlisten(self.stop_listener) 

676 self.stop_listener = None 

677 _LOGGER.info("AUTOARM shut down") 

678 

679 def active_calendar_event(self) -> CalendarEvent | None: 

680 events: list[CalendarEvent] = [] 

681 for cal in self.calendars: 

682 events.extend(cal.active_events()) 

683 if events: 

684 # TODO: consider sorting events to LIFO 

685 return events[0] 

686 return None 

687 

688 def has_active_calendar_event(self) -> bool: 

689 return any(cal.has_active_event() for cal in self.calendars) 

690 

691 def is_occupied(self) -> bool | None: 

692 """Ternary - true at least one person entity has state home, false none of them, null if no occupants defined""" 

693 if self.occupants: 

694 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants) 

695 return None 

696 

697 def at_home(self) -> list[str] | None: 

698 if self.occupants: 

699 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME] 

700 return None 

701 

702 def not_home(self) -> list[str] | None: 

703 if self.occupants: 

704 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME] 

705 return None 

706 

707 def is_unoccupied(self) -> bool | None: 

708 """Ternary - false at least one person entity has state home, true none of them, null if no occupants defined""" 

709 if self.occupants: 

710 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants) 

711 return None 

712 

713 def is_night(self) -> bool: 

714 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON 

715 

716 def armed_state(self) -> AlarmControlPanelState: 

717 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel)) 

718 alarm_state = alarm_state_as_enum(raw_state) 

719 if alarm_state is None: 

720 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING") 

721 return AlarmControlPanelState.PENDING 

722 return alarm_state 

723 

724 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, str]]: 

725 entity_id = old = new = None 

726 new_attributes: dict[str, str] = {} 

727 if event and event.data: 

728 entity_id = event.data.get("entity_id") 

729 old_obj = event.data.get("old_state") 

730 if old_obj: 

731 old = old_obj.state 

732 new_obj = event.data.get("new_state") 

733 if new_obj: 

734 new = new_obj.state 

735 new_attributes = new_obj.attributes 

736 return entity_id, old, new, new_attributes 

737 

738 async def pending_state(self, source: ChangeSource | None) -> None: 

739 self.pre_pending_state = self.armed_state() 

740 await self.arm(AlarmControlPanelState.PENDING, source=source) 

741 

742 @callback 

743 async def delayed_reset_armed_state(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs) -> None: 

744 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

745 if self.is_intervention_since_request(requested_at): 

746 return 

747 await self.reset_armed_state(**kwargs) 

748 

749 async def reset_armed_state( 

750 self, intervention: Intervention | None = None, source: ChangeSource | None = None 

751 ) -> str | None: 

752 """Logic to automatically work out appropriate current armed state""" 

753 state: AlarmControlPanelState | None = None 

754 existing_state: AlarmControlPanelState | None = None 

755 must_change_state: bool = False 

756 last_state_intervention: Intervention | None = None 

757 active_calendar_event: CalendarEvent | None = None 

758 

759 if source is None and intervention is not None: 

760 source = intervention.source 

761 _LOGGER.debug( 

762 "AUTOARM reset_armed_state(intervention=%s,source=%s)", 

763 intervention, 

764 source, 

765 ) 

766 

767 try: 

768 existing_state = self.armed_state() 

769 state = existing_state 

770 if self.calendars: 

771 active_calendar_event = self.active_calendar_event() 

772 if active_calendar_event: 

773 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active") 

774 return existing_state 

775 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL: 

776 _LOGGER.debug( 

777 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual" 

778 ) 

779 return existing_state 

780 if self.calendar_no_event_mode in AlarmControlPanelState: 

781 # TODO: may be dupe logic with on_cal event 

782 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode) 

783 return await self.arm(alarm_state_as_enum(self.calendar_no_event_mode), ChangeSource.CALENDAR) 

784 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

785 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto") 

786 else: 

787 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode) 

788 

789 # TODO: expose as config ( for manual disarm override ) and condition logic 

790 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING 

791 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state: 

792 _LOGGER.debug("AUTOARM Ignoring previous interventions") 

793 else: 

794 last_state_intervention = self.last_state_intervention() 

795 if last_state_intervention: 

796 _LOGGER.debug( 

797 "AUTOARM Ignoring automated reset for %s set by %s at %s", 

798 last_state_intervention.state, 

799 last_state_intervention.source, 

800 last_state_intervention.created_at, 

801 ) 

802 return existing_state 

803 state = self.determine_state() 

804 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state: 

805 state = await self.arm(state, source=source) 

806 finally: 

807 self.hass.states.async_set( 

808 f"sensor.{DOMAIN}_last_calculation", 

809 str(state is not None and state != existing_state), 

810 attributes={ 

811 "new_state": str(state), 

812 "old_state": str(existing_state), 

813 "source": source, 

814 "active_calendar_event": deobjectify(active_calendar_event), 

815 "occupied": self.is_occupied(), 

816 "night": self.is_night(), 

817 "must_change_state": str(must_change_state), 

818 "last_state_intervention": deobjectify(last_state_intervention), 

819 "intervention": intervention.as_dict() if intervention else None, 

820 "time": dt_util.now().isoformat(), 

821 }, 

822 ) 

823 

824 return state 

825 

826 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool: 

827 if requested_at is not None and self.has_intervention_since(requested_at): 

828 _LOGGER.debug( 

829 "AUTOARM Cancelling delayed operation since subsequent manual action", 

830 ) 

831 return True 

832 return False 

833 

834 def determine_state(self) -> AlarmControlPanelState | None: 

835 """Compute a new state using occupancy, sun and transition conditions""" 

836 evaluated_state: AlarmControlPanelState | None = None 

837 condition_vars: ConditionVariables = ConditionVariables( 

838 occupied=self.is_occupied(), 

839 unoccupied=self.is_unoccupied(), 

840 night=self.is_night(), 

841 state=self.armed_state(), 

842 calendar_event=self.active_calendar_event(), 

843 occupied_defaults=self.occupied_defaults, 

844 at_home=self.at_home(), 

845 not_home=self.not_home(), 

846 ) 

847 for state, checker in self.transitions.items(): 

848 if self.hass_api.evaluate_condition(checker, condition_vars): 

849 _LOGGER.debug("AUTOARM Computed state as %s from condition", state) 

850 evaluated_state = state 

851 break 

852 if evaluated_state is None: 

853 return None 

854 return AlarmControlPanelState(evaluated_state) 

855 

856 @callback 

857 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs: Any) -> None: 

858 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

859 if self.is_intervention_since_request(requested_at): 

860 return 

861 await self.arm(**kwargs) 

862 

863 async def arm( 

864 self, arming_state: AlarmControlPanelState | None, source: ChangeSource | None = None 

865 ) -> AlarmControlPanelState | None: 

866 """Change alarm panel state 

867 

868 Args: 

869 ---- 

870 arming_state (str, optional): _description_. Defaults to None. 

871 source (str,optional): Source of the change, for example 'calendar' or 'button' 

872 

873 Returns: 

874 ------- 

875 str: New arming state 

876 

877 """ 

878 if arming_state is None: 

879 return None 

880 if self.armed_state() == arming_state: 

881 return None 

882 if self.rate_limiter.triggered(): 

883 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source) 

884 return None 

885 try: 

886 self.arming_in_progress.set() 

887 existing_state: AlarmControlPanelState | None = self.armed_state() 

888 if arming_state != existing_state: 

889 attrs: dict[str, str] = {} 

890 panel_state: State | None = self.hass.states.get(self.alarm_panel) 

891 if panel_state: 

892 attrs.update(panel_state.attributes) 

893 attrs[ATTR_CHANGED_BY] = f"{DOMAIN}.{source}" 

894 self.hass.states.async_set(entity_id=self.alarm_panel, new_state=str(arming_state), attributes=attrs) 

895 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source) 

896 if self.notifier and source and arming_state: 

897 await self.notifier.notify(source=source, from_state=existing_state, to_state=arming_state) 

898 return arming_state 

899 _LOGGER.debug("Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state) 

900 return existing_state 

901 except Exception as e: 

902 _LOGGER.error("AUTOARM Failed to arm: %s", e) 

903 self.app_health_tracker.record_runtime_error() 

904 finally: 

905 self.arming_in_progress.clear() 

906 return None 

907 

908 def schedule_state( 

909 self, 

910 trigger_time: dt.datetime, 

911 state: AlarmControlPanelState | None, 

912 intervention: Intervention | None, 

913 source: ChangeSource | None = None, 

914 ) -> None: 

915 source = source or intervention.source if intervention else None 

916 

917 job: Callable 

918 if state is None: 

919 _LOGGER.debug("Delayed reset, triggered at: %s, source%s", trigger_time, source) 

920 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now()) 

921 else: 

922 _LOGGER.debug("Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source) 

923 

924 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now()) 

925 

926 self.unsubscribes.append( 

927 async_track_point_in_time( 

928 self.hass, 

929 job, 

930 trigger_time, 

931 ) 

932 ) 

933 

934 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention: 

935 intervention = Intervention(dt_util.now(), source, state) 

936 self.interventions.append(intervention) 

937 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", source, attributes=intervention.as_dict()) 

938 

939 return intervention 

940 

941 def has_intervention_since(self, cutoff: dt.datetime) -> bool: 

942 """Has there been a manual intervention since the cutoff time""" 

943 if not self.interventions: 

944 return False 

945 return any(intervention.created_at > cutoff for intervention in self.interventions) 

946 

947 def last_state_intervention(self) -> Intervention | None: 

948 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None] 

949 if candidates: 

950 return candidates[-1] 

951 return None 

952 

953 @callback 

954 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002 

955 _LOGGER.debug("AUTOARM Sunrise") 

956 now = dt_util.now() 

957 if not self.sunrise_earliest or now.time() >= self.sunrise_earliest: 

958 await self.reset_armed_state(source=ChangeSource.SUNRISE) 

959 else: 

960 _LOGGER.debug("AUTOARM Rescheduling delayed sunrise action to %s", self.sunrise_earliest) 

961 self.schedule_state( 

962 dt.datetime.combine(now.date(), self.sunrise_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE), 

963 intervention=None, 

964 state=None, 

965 source=ChangeSource.SUNRISE, 

966 ) 

967 

968 @callback 

969 async def on_sunrise_latest(self, *args: Any) -> None: # noqa: ARG002 

970 _LOGGER.debug("AUTOARM Sunrise latest cutoff reached") 

971 await self.reset_armed_state(source=ChangeSource.SUNRISE) 

972 

973 @callback 

974 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002 

975 _LOGGER.debug("AUTOARM Sunset") 

976 now = dt_util.now() 

977 if not self.sunset_earliest or now.time() >= self.sunset_earliest: 

978 await self.reset_armed_state(source=ChangeSource.SUNSET) 

979 else: 

980 _LOGGER.debug("AUTOARM Rescheduling delayed sunset action to %s", self.sunset_earliest) 

981 self.schedule_state( 

982 dt.datetime.combine(now.date(), self.sunset_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE), 

983 intervention=None, 

984 state=None, 

985 source=ChangeSource.SUNSET, 

986 ) 

987 

988 @callback 

989 async def on_sunset_latest(self, *args: Any) -> None: # noqa: ARG002 

990 _LOGGER.debug("AUTOARM Sunset latest cutoff reached") 

991 await self.reset_armed_state(source=ChangeSource.SUNSET) 

992 

993 @callback 

994 async def on_mobile_action(self, event: Event) -> None: 

995 _LOGGER.debug("AUTOARM Mobile Action: %s", event) 

996 source: ChangeSource = ChangeSource.MOBILE 

997 

998 match event.data.get("action"): 

999 case "ALARM_PANEL_DISARM": 

1000 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED) 

1001 await self.arm(AlarmControlPanelState.DISARMED, source=source) 

1002 case "ALARM_PANEL_RESET": 

1003 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

1004 case "ALARM_PANEL_AWAY": 

1005 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY) 

1006 await self.arm(AlarmControlPanelState.ARMED_AWAY, source=source) 

1007 case _: 

1008 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data) 

1009 

1010 @callback 

1011 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None: 

1012 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event) 

1013 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state) 

1014 if delay: 

1015 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON) 

1016 if self.notifier: 

1017 await self.notifier.notify( 

1018 ChangeSource.BUTTON, 

1019 from_state=self.armed_state(), 

1020 to_state=state, 

1021 message=f"Alarm will be set to {state} in {delay}", 

1022 title=f"Arm set to {state} process starting", 

1023 ) 

1024 else: 

1025 await self.arm(state, source=ChangeSource.BUTTON) 

1026 

1027 @callback 

1028 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None: 

1029 _LOGGER.debug("AUTOARM Reset Button: %s", event) 

1030 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None) 

1031 if delay: 

1032 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON) 

1033 if self.notifier: 

1034 await self.notifier.notify( 

1035 ChangeSource.BUTTON, 

1036 message=f"Alarm will be reset in {delay}", 

1037 title="Alarm reset wait initiated", 

1038 ) 

1039 else: 

1040 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

1041 

1042 @callback 

1043 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None: 

1044 """Listen for person state events 

1045 

1046 Args: 

1047 ---- 

1048 event (Event[EventStateChangedData]): state change event 

1049 

1050 """ 

1051 entity_id, old, new, new_attributes = self._extract_event(event) 

1052 if old == new: 

1053 _LOGGER.debug( 

1054 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s", 

1055 entity_id, 

1056 old, 

1057 new, 

1058 event, 

1059 new_attributes, 

1060 ) 

1061 return 

1062 _LOGGER.debug( 

1063 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes 

1064 ) 

1065 if new in self.occupied_delay: 

1066 self.schedule_state( 

1067 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY 

1068 ) 

1069 else: 

1070 await self.reset_armed_state(source=ChangeSource.OCCUPANCY) 

1071 

1072 @callback 

1073 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None: 

1074 """Alarm Control Panel has been changed outside of AutoArm""" 

1075 entity_id, old, new, new_attributes = self._extract_event(event) 

1076 if new_attributes: 

1077 changed_by = new_attributes.get(ATTR_CHANGED_BY) 

1078 if changed_by and changed_by.startswith(f"{DOMAIN}."): 

1079 _LOGGER.debug( 

1080 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s", 

1081 entity_id, 

1082 event.event_type, 

1083 old, 

1084 new, 

1085 ) 

1086 return 

1087 new_state: AlarmControlPanelState | None = alarm_state_as_enum(new) 

1088 old_state: AlarmControlPanelState | None = alarm_state_as_enum(old) 

1089 

1090 _LOGGER.info( 

1091 "AUTOARM Panel Change: %s,%s: %s-->%s", 

1092 entity_id, 

1093 event.event_type, 

1094 old, 

1095 new, 

1096 ) 

1097 self.record_intervention(ChangeSource.ALARM_PANEL, new_state) 

1098 if new in ZOMBIE_STATES: 

1099 _LOGGER.warning("AUTOARM Dezombifying %s ...", new) 

1100 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION) 

1101 elif new != old: 

1102 if self.notifier: 

1103 await self.notifier.notify(ChangeSource.ALARM_PANEL, old_state, new_state) 

1104 else: 

1105 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new) 

1106 

1107 @callback 

1108 async def housekeeping(self, triggered_at: dt.datetime) -> None: 

1109 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at) 

1110 now = dt_util.now() 

1111 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)] 

1112 for cal in self.calendars: 

1113 await cal.prune_events() 

1114 _LOGGER.debug("AUTOARM Housekeeping finished")