Coverage for custom_components/autoarm/autoarming.py: 86%

618 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-05-22 21:57 +0000

1import asyncio 

2import datetime as dt 

3import json 

4import logging 

5import re 

6from collections.abc import Callable, Coroutine 

7from dataclasses import dataclass 

8from functools import partial 

9from typing import TYPE_CHECKING, Any, cast 

10 

11import homeassistant.util.dt as dt_util 

12import voluptuous as vol 

13from homeassistant.components.alarm_control_panel.const import ATTR_CHANGED_BY, AlarmControlPanelState 

14from homeassistant.components.calendar.const import DOMAIN as CALENDAR_DOMAIN 

15from homeassistant.components.sun.const import STATE_BELOW_HORIZON 

16from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry 

17from homeassistant.const import ( 

18 CONF_CONDITIONS, 

19 CONF_DELAY_TIME, 

20 CONF_ENTITY_ID, 

21 CONF_SERVICE, 

22 EVENT_HOMEASSISTANT_STOP, 

23 SERVICE_RELOAD, 

24 STATE_HOME, 

25) 

26from homeassistant.core import ( 

27 Event, 

28 EventStateChangedData, 

29 HomeAssistant, 

30 ServiceCall, 

31 ServiceResponse, 

32 State, 

33 SupportsResponse, 

34 callback, 

35) 

36from homeassistant.exceptions import ConditionError, ConfigEntryNotReady, HomeAssistantError 

37from homeassistant.helpers import condition as condition 

38from homeassistant.helpers import config_validation as cv 

39from homeassistant.helpers import entity_platform 

40from homeassistant.helpers import issue_registry as ir 

41from homeassistant.helpers.event import ( 

42 async_track_point_in_time, 

43 async_track_state_change_event, 

44 async_track_sunrise, 

45 async_track_sunset, 

46 async_track_time_change, 

47) 

48from homeassistant.helpers.reload import ( 

49 async_integration_yaml_config, 

50) 

51from homeassistant.helpers.service import async_register_admin_service 

52from homeassistant.helpers.typing import ConfigType 

53from homeassistant.util.hass_dict import HassKey 

54 

55from custom_components.autoarm.hass_api import HomeAssistantAPI 

56from custom_components.autoarm.notifier import Notifier 

57 

58from .calendar_events import TrackedCalendar, TrackedCalendarEvent 

59from .config_flow import ( 

60 CONF_CALENDAR_ENTITIES, 

61 CONF_CALENDAR_OCCUPANCY_OVERRIDE_STATES, 

62 CONF_NO_EVENT_MODE, 

63 CONF_NOTIFY_ACTION, 

64 CONF_NOTIFY_ENABLED, 

65 CONF_NOTIFY_TARGETS, 

66 CONF_OCCUPANCY_DEFAULT_DAY, 

67 CONF_OCCUPANCY_DEFAULT_NIGHT, 

68 CONF_PERSON_ENTITIES, 

69 CONF_SUNRISE_EARLIEST, 

70 CONF_SUNRISE_LATEST, 

71 CONF_SUNSET_EARLIEST, 

72 CONF_SUNSET_LATEST, 

73 DEFAULT_CALENDAR_OCCUPANCY_OVERRIDE_STATES, 

74 DEFAULT_NOTIFY_ACTION, 

75) 

76from .const import ( 

77 ATTR_RESET, 

78 CONF_ALARM_PANEL, 

79 CONF_BUTTONS, 

80 CONF_CALENDAR_CONTROL, 

81 CONF_CALENDAR_EVENT_STATES, 

82 CONF_CALENDAR_NO_EVENT, 

83 CONF_CALENDAR_POLL_INTERVAL, 

84 CONF_CALENDARS, 

85 CONF_DAY, 

86 CONF_DIURNAL, 

87 CONF_EARLIEST, 

88 CONF_LATEST, 

89 CONF_NIGHT, 

90 CONF_NOTIFY, 

91 CONF_OCCUPANCY, 

92 CONF_OCCUPANCY_DEFAULT, 

93 CONF_RATE_LIMIT, 

94 CONF_RATE_LIMIT_CALLS, 

95 CONF_RATE_LIMIT_PERIOD, 

96 CONF_SUNRISE, 

97 CONF_SUNSET, 

98 CONF_TRANSITIONS, 

99 CONFIG_SCHEMA, 

100 DEFAULT_TRANSITIONS, 

101 DOMAIN, 

102 NO_CAL_EVENT_MODE_AUTO, 

103 NO_CAL_EVENT_MODE_MANUAL, 

104 NOTIFY_COMMON, 

105 YAML_DATA_KEY, 

106 ChangeSource, 

107 ConditionVariables, 

108) 

109from .helpers import ( 

110 AppHealthTracker, 

111 ExtendedExtendedJSONEncoder, 

112 Limiter, 

113 alarm_state_as_enum, 

114 change_source_as_enum, 

115 deobjectify, 

116 safe_state, 

117) 

118 

119if TYPE_CHECKING: 

120 from collections.abc import Mapping 

121 

122 ConditionCheckerType = Callable[[Mapping[str, Any] | None], bool] 

123 

124_LOGGER = logging.getLogger(__name__) 

125 

126OVERRIDE_STATES = (AlarmControlPanelState.ARMED_VACATION, AlarmControlPanelState.ARMED_CUSTOM_BYPASS) 

127EPHEMERAL_STATES = ( 

128 AlarmControlPanelState.PENDING, 

129 AlarmControlPanelState.ARMING, 

130 AlarmControlPanelState.DISARMING, 

131 AlarmControlPanelState.TRIGGERED, 

132) 

133ZOMBIE_STATES = ("unknown", "unavailable") 

134NS_MOBILE_ACTIONS = "mobile_actions" 

135PLATFORMS = ["autoarm"] 

136 

137HASS_DATA_KEY: HassKey["AutoArmData"] = HassKey(DOMAIN) 

138 

139 

140@dataclass 

141class AutoArmData: 

142 armer: "AlarmArmer" 

143 other_data: dict[str, str | dict[str, str] | list[str] | int | float | bool | None] 

144 

145 

146async def async_setup( # noqa: RUF029 

147 hass: HomeAssistant, 

148 config: ConfigType, 

149) -> bool: 

150 _ = CONFIG_SCHEMA 

151 yaml_config: ConfigType = config.get(DOMAIN, {}) 

152 if yaml_config or YAML_DATA_KEY not in hass.data: 

153 hass.data[YAML_DATA_KEY] = yaml_config 

154 

155 has_alarm_panel = CONF_ALARM_PANEL in yaml_config 

156 existing_entries = hass.config_entries.async_entries(DOMAIN) 

157 

158 if has_alarm_panel and not existing_entries: 

159 _LOGGER.info("AUTOARM Triggering import of YAML configuration to ConfigEntry") 

160 hass.async_create_task(hass.config_entries.flow.async_init(DOMAIN, context={"source": SOURCE_IMPORT}, data=yaml_config)) 

161 elif has_alarm_panel and existing_entries: 

162 _LOGGER.warning("AUTOARM YAML core config present but ConfigEntry already exists; ignoring YAML core settings") 

163 ir.async_create_issue( 

164 hass, 

165 DOMAIN, 

166 "yaml_core_config_deprecated", 

167 is_fixable=False, 

168 severity=ir.IssueSeverity.WARNING, 

169 translation_key="yaml_core_config_deprecated", 

170 ) 

171 

172 async def reload_service_handler(service_call: ServiceCall) -> None: 

173 """Reload yaml entities.""" 

174 _LOGGER.info("AUTOARM Reloading %s.%s component, data %s", service_call.domain, service_call.service, service_call.data) 

175 try: 

176 fresh_config = await async_integration_yaml_config(hass, DOMAIN) 

177 except HomeAssistantError as err: 

178 raise HomeAssistantError(f"Failed to reload YAML configuration: {err}") from err 

179 if fresh_config is not None and DOMAIN in fresh_config: 

180 hass.data[YAML_DATA_KEY] = fresh_config[DOMAIN] 

181 else: 

182 hass.data[YAML_DATA_KEY] = {} 

183 entries = hass.config_entries.async_entries(DOMAIN) 

184 for entry in entries: 

185 await hass.config_entries.async_reload(entry.entry_id) 

186 

187 async_register_admin_service( 

188 hass, 

189 DOMAIN, 

190 SERVICE_RELOAD, 

191 reload_service_handler, 

192 ) 

193 

194 def supplemental_action_enquire_configuration(_call: ServiceCall) -> ConfigType: 

195 entries = hass.config_entries.async_entries(DOMAIN) 

196 if not entries: 

197 raise HomeAssistantError("No config entry found for AutoArm") 

198 entry = entries[0] 

199 stashed_yaml = hass.data.get(YAML_DATA_KEY, {}) 

200 data: ConfigType = { 

201 CONF_ALARM_PANEL: entry.data.get(CONF_ALARM_PANEL), 

202 CONF_DIURNAL: { 

203 CONF_SUNRISE: { 

204 CONF_EARLIEST: entry.options.get(CONF_SUNRISE_EARLIEST), 

205 CONF_LATEST: entry.options.get(CONF_SUNRISE_LATEST), 

206 }, 

207 CONF_SUNSET: { 

208 CONF_EARLIEST: entry.options.get(CONF_SUNSET_EARLIEST), 

209 CONF_LATEST: entry.options.get(CONF_SUNSET_LATEST), 

210 }, 

211 }, 

212 CONF_CALENDAR_CONTROL: stashed_yaml.get(CONF_CALENDAR_CONTROL), 

213 CONF_BUTTONS: stashed_yaml.get(CONF_BUTTONS, {}), 

214 CONF_OCCUPANCY: { 

215 CONF_ENTITY_ID: entry.options.get(CONF_PERSON_ENTITIES, []), 

216 CONF_OCCUPANCY_DEFAULT: { 

217 CONF_DAY: entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "armed_home"), 

218 }, 

219 }, 

220 CONF_NOTIFY: { 

221 CONF_SERVICE: entry.options.get(CONF_NOTIFY_ACTION) 

222 or stashed_yaml.get(CONF_NOTIFY, {}).get(NOTIFY_COMMON, {}).get(CONF_SERVICE, DEFAULT_NOTIFY_ACTION), 

223 "targets": entry.options.get(CONF_NOTIFY_TARGETS, []), 

224 "profiles": stashed_yaml.get(CONF_NOTIFY, {}), 

225 "enabled": entry.options.get(CONF_NOTIFY_ENABLED, True), 

226 }, 

227 CONF_RATE_LIMIT: stashed_yaml.get(CONF_RATE_LIMIT, {}), 

228 } 

229 try: 

230 jsonized: str = json.dumps(obj=data, cls=ExtendedExtendedJSONEncoder) 

231 return cast("dict[str,Any]", json.loads(jsonized)) 

232 except Exception as err: 

233 raise HomeAssistantError(f"Failed to serialize configuration: {err}") from err 

234 

235 hass.services.async_register( 

236 DOMAIN, 

237 "enquire_configuration", 

238 supplemental_action_enquire_configuration, 

239 supports_response=SupportsResponse.ONLY, 

240 ) 

241 

242 return True 

243 

244 

245async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: 

246 """Set up Auto Arm from a config entry.""" 

247 yaml_config: ConfigType = hass.data.get(YAML_DATA_KEY, {}) 

248 try: 

249 armer = _build_armer_from_entry(hass, entry, yaml_config) 

250 hass.data[HASS_DATA_KEY] = AutoArmData(armer, {}) 

251 await armer.initialize() 

252 except Exception as err: 

253 raise ConfigEntryNotReady(f"Failed to initialize Auto Arm: {err}") from err 

254 entry.async_on_unload(entry.add_update_listener(_async_update_listener)) 

255 return True 

256 

257 

258async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: # noqa: ARG001, RUF029 

259 """Unload Auto Arm config entry.""" 

260 if HASS_DATA_KEY in hass.data: 

261 hass.data[HASS_DATA_KEY].armer.shutdown() 

262 del hass.data[HASS_DATA_KEY] 

263 return True 

264 

265 

266async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: 

267 """Handle options update by reloading the entry.""" 

268 await hass.config_entries.async_reload(entry.entry_id) 

269 

270 

271def _build_armer_from_entry(hass: HomeAssistant, entry: ConfigEntry, yaml_config: ConfigType) -> "AlarmArmer": 

272 """Build an AlarmArmer instance from ConfigEntry data/options merged with YAML.""" 

273 migrate(hass) 

274 

275 alarm_panel: str = entry.data[CONF_ALARM_PANEL] 

276 person_entities: list[str] = entry.options.get(CONF_PERSON_ENTITIES, []) 

277 calendar_entities: list[str] = entry.options.get(CONF_CALENDAR_ENTITIES, []) 

278 occupancy_default_day: str = entry.options.get(CONF_OCCUPANCY_DEFAULT_DAY, "disarmed") 

279 occupancy_default_night: str | None = entry.options.get(CONF_OCCUPANCY_DEFAULT_NIGHT, "armed_night") 

280 no_event_mode: str = entry.options.get(CONF_NO_EVENT_MODE, NO_CAL_EVENT_MODE_AUTO) 

281 

282 # Build occupancy config 

283 yaml_occupancy = yaml_config.get(CONF_OCCUPANCY, {}) 

284 occupancy_defaults: dict[str, str] = {CONF_DAY: occupancy_default_day} 

285 if occupancy_default_night: 

286 occupancy_defaults[CONF_NIGHT] = occupancy_default_night 

287 occupancy: ConfigType = { 

288 CONF_ENTITY_ID: person_entities, 

289 CONF_OCCUPANCY_DEFAULT: occupancy_defaults, 

290 } 

291 yaml_delay_time = yaml_occupancy.get(CONF_DELAY_TIME) if isinstance(yaml_occupancy, dict) else None 

292 if yaml_delay_time: 

293 occupancy[CONF_DELAY_TIME] = yaml_delay_time 

294 

295 # Build calendar config 

296 yaml_calendar_control = yaml_config.get(CONF_CALENDAR_CONTROL, {}) 

297 yaml_calendars: list[ConfigType] = yaml_calendar_control.get(CONF_CALENDARS, []) if yaml_calendar_control else [] 

298 yaml_cal_by_entity: dict[str, ConfigType] = {cal[CONF_ENTITY_ID]: cal for cal in yaml_calendars if CONF_ENTITY_ID in cal} 

299 

300 calendar_list: list[ConfigType] = [] 

301 for cal_entity_id in calendar_entities: 

302 yaml_override = yaml_cal_by_entity.get(cal_entity_id, {}) 

303 cal_config: ConfigType = { 

304 CONF_ENTITY_ID: cal_entity_id, 

305 CONF_CALENDAR_POLL_INTERVAL: yaml_override.get(CONF_CALENDAR_POLL_INTERVAL, 15), 

306 CONF_CALENDAR_EVENT_STATES: yaml_override.get(CONF_CALENDAR_EVENT_STATES, _validated_default_calendar_mappings()), 

307 } 

308 calendar_list.append(cal_config) 

309 

310 calendar_config: ConfigType = {} 

311 if calendar_list: 

312 calendar_config = { 

313 CONF_CALENDAR_NO_EVENT: no_event_mode, 

314 CONF_CALENDARS: calendar_list, 

315 } 

316 

317 # Build notify config: service from options overrides YAML when explicitly set 

318 notify_profiles = yaml_config.get(CONF_NOTIFY, {}) 

319 

320 # Build diurnal cutoffs: options take priority, YAML is fallback 

321 yaml_diurnal = yaml_config.get(CONF_DIURNAL, {}) or {} 

322 yaml_sunrise = yaml_diurnal.get(CONF_SUNRISE, {}) or {} 

323 yaml_sunset = yaml_diurnal.get(CONF_SUNSET, {}) or {} 

324 

325 def _parse_time(option_key: str, yaml_fallback: dt.time | None) -> dt.time | None: 

326 val = entry.options.get(option_key) 

327 if val is not None: 

328 return cv.time(val) if isinstance(val, str) else val 

329 return yaml_fallback 

330 

331 return AlarmArmer( 

332 hass, 

333 alarm_panel=alarm_panel, 

334 sunrise_earliest=_parse_time(CONF_SUNRISE_EARLIEST, yaml_sunrise.get(CONF_EARLIEST)), 

335 sunrise_latest=_parse_time(CONF_SUNRISE_LATEST, yaml_sunrise.get(CONF_LATEST)), 

336 sunset_earliest=_parse_time(CONF_SUNSET_EARLIEST, yaml_sunset.get(CONF_EARLIEST)), 

337 sunset_latest=_parse_time(CONF_SUNSET_LATEST, yaml_sunset.get(CONF_LATEST)), 

338 buttons=yaml_config.get(CONF_BUTTONS, {}), 

339 occupancy=occupancy, 

340 notify_profiles=notify_profiles, 

341 notify_enabled=entry.options.get(CONF_NOTIFY_ENABLED, False), 

342 notify_action=entry.options.get(CONF_NOTIFY_ACTION), 

343 notify_targets=entry.options.get(CONF_NOTIFY_TARGETS, []), 

344 rate_limit=yaml_config.get(CONF_RATE_LIMIT, {}), 

345 calendar_config=calendar_config, 

346 transitions=yaml_config.get(CONF_TRANSITIONS), 

347 calendar_occupancy_override_states=entry.options.get( 

348 CONF_CALENDAR_OCCUPANCY_OVERRIDE_STATES, DEFAULT_CALENDAR_OCCUPANCY_OVERRIDE_STATES 

349 ), 

350 ) 

351 

352 

353def _validated_default_calendar_mappings() -> dict[str, list[re.Pattern[str]]]: 

354 """Build default calendar event state mappings with compiled regex patterns. 

355 

356 Mirrors the schema validation that CALENDAR_SCHEMA applies (ensure_list + is_regex). 

357 """ 

358 from .const import DEFAULT_CALENDAR_MAPPINGS 

359 

360 result: dict[str, list[re.Pattern[str]]] = {} 

361 for state, patterns in DEFAULT_CALENDAR_MAPPINGS.items(): 

362 state_str = str(state) 

363 if isinstance(patterns, str): 

364 patterns = [patterns] 

365 result[state_str] = [re.compile(p) if isinstance(p, str) else p for p in patterns] 

366 return result 

367 

368 

369def migrate(hass: HomeAssistant) -> None: 

370 for entity_id in ( 

371 "autoarm.configured", 

372 "autoarm.last_calendar_event", 

373 "autoarm.last_intervention", 

374 "autoarm.initialized", 

375 "autoarm.last_calculation", 

376 ): 

377 try: 

378 if hass.states.get(entity_id): 

379 _LOGGER.info("AUTOARM Migration removing legacy entity_id: %s", entity_id) 

380 hass.states.async_remove(entity_id) 

381 except Exception as e: 

382 _LOGGER.warning("AUTOARM Migration fail for %s:%s", entity_id, e) 

383 

384 

385def unlisten(listener: Callable[[], None] | None) -> None: 

386 if listener: 

387 try: 

388 listener() 

389 except Exception as e: 

390 _LOGGER.debug("AUTOARM Failure closing listener %s: %s", listener, e) 

391 

392 

393@dataclass 

394class Intervention: 

395 """Record of a manual intervention, such as a button push, mobile action or alarm panel change""" 

396 

397 created_at: dt.datetime 

398 source: ChangeSource 

399 state: AlarmControlPanelState | None 

400 

401 def as_dict(self) -> dict[str, str | None]: 

402 return { 

403 "created_at": self.created_at.isoformat(), 

404 "source": str(self.source), 

405 "state": str(self.state) if self.state is not None else None, 

406 } 

407 

408 

409@dataclass 

410class AlarmStateWithAttributes: 

411 state: AlarmControlPanelState 

412 source: ChangeSource 

413 attributes: dict[str, str] 

414 

415 

416class AlarmArmer: 

417 def __init__( 

418 self, 

419 hass: HomeAssistant, 

420 alarm_panel: str, 

421 buttons: dict[str, ConfigType] | None = None, 

422 occupancy: ConfigType | None = None, 

423 actions: list[str] | None = None, 

424 notify_enabled: bool = True, 

425 notify_action: str | None = None, 

426 notify_targets: list[str] | None = None, 

427 notify_profiles: ConfigType | None = None, 

428 sunrise_earliest: dt.time | None = None, 

429 sunrise_latest: dt.time | None = None, 

430 sunset_earliest: dt.time | None = None, 

431 sunset_latest: dt.time | None = None, 

432 rate_limit: ConfigType | None = None, 

433 calendar_config: ConfigType | None = None, 

434 transitions: dict[str, dict[str, list[ConfigType]]] | None = None, 

435 calendar_occupancy_override_states: list[str] | None = None, 

436 ) -> None: 

437 occupancy = occupancy or {} 

438 rate_limit = rate_limit or {} 

439 

440 self.hass: HomeAssistant = hass 

441 self.app_health_tracker: AppHealthTracker = AppHealthTracker(hass) 

442 if notify_enabled and not notify_profiles and not notify_action: 

443 _LOGGER.warning("AUTOARM Notification disabled - no config") 

444 notify_enabled = False 

445 if notify_enabled: 

446 self.notifier: Notifier | None = Notifier( 

447 notify_profiles, hass, self.app_health_tracker, notify_action, notify_targets 

448 ) 

449 else: 

450 self.notifier = None 

451 self.local_tz = dt_util.get_time_zone(self.hass.config.time_zone) 

452 calendar_config = calendar_config or {} 

453 self.calendar_configs: list[ConfigType] = calendar_config.get(CONF_CALENDARS, []) or [] 

454 self.calendars: list[TrackedCalendar] = [] 

455 self.calendar_no_event_mode: str | None = calendar_config.get(CONF_CALENDAR_NO_EVENT, NO_CAL_EVENT_MODE_AUTO) 

456 self.calendar_occupancy_override_states: list[str] = ( 

457 calendar_occupancy_override_states 

458 if calendar_occupancy_override_states is not None 

459 else DEFAULT_CALENDAR_OCCUPANCY_OVERRIDE_STATES 

460 ) 

461 self.alarm_panel: str = alarm_panel 

462 self.sunrise_earliest: dt.time | None = sunrise_earliest 

463 self.sunrise_latest: dt.time | None = sunrise_latest 

464 self.sunset_earliest: dt.time | None = sunset_earliest 

465 self.sunset_latest: dt.time | None = sunset_latest 

466 self.occupants: list[str] = occupancy.get(CONF_ENTITY_ID, []) 

467 self.occupied_defaults: dict[str, AlarmControlPanelState] = occupancy.get( 

468 CONF_OCCUPANCY_DEFAULT, {CONF_DAY: AlarmControlPanelState.ARMED_HOME} 

469 ) 

470 self.occupied_delay: dict[str, dt.timedelta] = occupancy.get(CONF_DELAY_TIME, {}) 

471 self.buttons: ConfigType = buttons or {} 

472 

473 self.actions: list[str] = actions or [] 

474 self.unsubscribes: list[Callable[[], None]] = [] 

475 self.pre_pending_state: AlarmControlPanelState | None = None 

476 self.button_device: dict[str, str] = {} 

477 self.arming_in_progress: asyncio.Event = asyncio.Event() 

478 

479 self.rate_limiter: Limiter = Limiter( 

480 window=rate_limit.get(CONF_RATE_LIMIT_PERIOD, dt.timedelta(seconds=60)), 

481 max_calls=rate_limit.get(CONF_RATE_LIMIT_CALLS, 5), 

482 ) 

483 

484 self.hass_api: HomeAssistantAPI = HomeAssistantAPI(hass) 

485 self.transitions: dict[AlarmControlPanelState, ConditionCheckerType] = {} 

486 self.transition_config: dict[str, dict[str, list[ConfigType]]] = transitions or {} 

487 

488 self.interventions: list[Intervention] = [] 

489 self.intervention_ttl: int = 60 

490 

491 async def initialize(self) -> None: 

492 """Async initialization""" 

493 _LOGGER.info("AUTOARM occupied=%s, state=%s, calendars=%s", self.is_occupied(), self.armed_state(), len(self.calendars)) 

494 

495 self.initialize_alarm_panel() 

496 await self.initialize_calendar() 

497 await self.initialize_logic() 

498 self.initialize_diurnal() 

499 self.initialize_occupancy() 

500 self.initialize_buttons() 

501 self.initialize_integration() 

502 self.initialize_housekeeping() 

503 self.initialize_home_assistant() 

504 await self.reset_armed_state(source=ChangeSource.STARTUP) 

505 

506 _LOGGER.info("AUTOARM Initialized, state: %s", self.armed_state()) 

507 

508 def initialize_home_assistant(self) -> None: 

509 self.stop_listener: Callable[[], None] | None = self.hass.bus.async_listen_once( 

510 EVENT_HOMEASSISTANT_STOP, self.async_shutdown 

511 ) 

512 self.app_health_tracker.app_initialized() 

513 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calculation", "unavailable", attributes={}) 

514 

515 self.hass.services.async_register( 

516 DOMAIN, 

517 "reset_state", 

518 self.reset_service, 

519 supports_response=SupportsResponse.OPTIONAL, 

520 ) 

521 

522 async def reset_service(self, _call: ServiceCall) -> ServiceResponse: 

523 new_state = await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.ACTION, state=None)) 

524 return {"change": new_state or "NO_CHANGE"} 

525 

526 def initialize_integration(self) -> None: 

527 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", "unavailable", attributes={}) 

528 

529 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

530 

531 def initialize_alarm_panel(self) -> None: 

532 """Set up automation for Home Assistant alarm panel 

533 

534 See https://www.home-assistant.io/integrations/alarm_control_panel/ 

535 

536 Succeeds even if control panel has not yet started, listener will pick up events when it does 

537 """ 

538 self.unsubscribes.append(async_track_state_change_event(self.hass, [self.alarm_panel], self.on_panel_change)) 

539 _LOGGER.debug("AUTOARM Auto-arming %s", self.alarm_panel) 

540 

541 def initialize_housekeeping(self) -> None: 

542 self.unsubscribes.append( 

543 async_track_time_change( 

544 self.hass, 

545 action=self.housekeeping, 

546 minute=0, 

547 ) 

548 ) 

549 

550 def initialize_diurnal(self) -> None: 

551 # events API expects a function, however underlying HassJob is fine with coroutines 

552 self.unsubscribes.append(async_track_sunrise(self.hass, self.on_sunrise, None)) # type: ignore 

553 self.unsubscribes.append(async_track_sunset(self.hass, self.on_sunset, None)) # type: ignore 

554 if self.sunrise_latest: 

555 self.unsubscribes.append( 

556 async_track_time_change( 

557 self.hass, 

558 self.on_sunrise_latest, 

559 hour=self.sunrise_latest.hour, 

560 minute=self.sunrise_latest.minute, 

561 second=self.sunrise_latest.second, 

562 ) 

563 ) 

564 if self.sunset_latest: 

565 self.unsubscribes.append( 

566 async_track_time_change( 

567 self.hass, 

568 self.on_sunset_latest, 

569 hour=self.sunset_latest.hour, 

570 minute=self.sunset_latest.minute, 

571 second=self.sunset_latest.second, 

572 ) 

573 ) 

574 

575 def initialize_occupancy(self) -> None: 

576 """Configure occupants, and listen for changes in their state""" 

577 if self.occupants: 

578 _LOGGER.info("AUTOARM Occupancy determined by %s", ",".join(self.occupants)) 

579 self.unsubscribes.append(async_track_state_change_event(self.hass, self.occupants, self.on_occupancy_change)) 

580 else: 

581 _LOGGER.info("AUTOARM Occupancy not configured") 

582 

583 def initialize_buttons(self) -> None: 

584 """Initialize (optional) physical alarm state control buttons""" 

585 

586 def setup_button(state_name: str, button_entity: str, cb: Callable[..., Coroutine[Any, Any, None]]) -> None: 

587 self.button_device[state_name] = button_entity 

588 if self.button_device[state_name]: 

589 self.unsubscribes.append(async_track_state_change_event(self.hass, [button_entity], cb)) 

590 

591 _LOGGER.debug( 

592 "AUTOARM Configured %s button for %s", 

593 state_name, 

594 self.button_device[state_name], 

595 ) 

596 

597 for button_use, button_config in self.buttons.items(): 

598 delay: dt.timedelta | None = button_config.get(CONF_DELAY_TIME) 

599 for entity_id in button_config[CONF_ENTITY_ID]: 

600 if button_use == ATTR_RESET: 

601 setup_button(ATTR_RESET, entity_id, partial(self.on_reset_button, delay)) 

602 else: 

603 setup_button( 

604 button_use, entity_id, partial(self.on_alarm_state_button, AlarmControlPanelState(button_use), delay) 

605 ) 

606 

607 async def initialize_calendar(self) -> None: 

608 """Configure calendar polling (optional)""" 

609 stage: str = "calendar" 

610 self.hass.states.async_set(f"sensor.{DOMAIN}_last_calendar_event", "unavailable", attributes={}) 

611 if not self.calendar_configs: 

612 return 

613 try: 

614 platforms: list[entity_platform.EntityPlatform] = entity_platform.async_get_platforms(self.hass, CALENDAR_DOMAIN) 

615 if platforms: 

616 platform: entity_platform.EntityPlatform = platforms[0] 

617 else: 

618 self.app_health_tracker.record_initialization_error(stage) 

619 _LOGGER.error("AUTOARM Calendar platform not available from Home Assistant") 

620 return 

621 except Exception as _e: 

622 self.app_health_tracker.record_initialization_error(stage) 

623 _LOGGER.exception("AUTOARM Unable to access calendar platform") 

624 return 

625 for calendar_config in self.calendar_configs: 

626 tracked_calendar = TrackedCalendar( 

627 self.hass, calendar_config, self.calendar_no_event_mode, self, self.app_health_tracker 

628 ) 

629 await tracked_calendar.initialize(platform) 

630 self.calendars.append(tracked_calendar) 

631 

632 async def initialize_logic(self) -> None: 

633 stage: str = "logic" 

634 for state_str, raw_condition in DEFAULT_TRANSITIONS.items(): 

635 if state_str not in self.transition_config: 

636 _LOGGER.info("AUTOARM Defaulting transition condition for %s", state_str) 

637 self.transition_config[state_str] = {CONF_CONDITIONS: cv.CONDITIONS_SCHEMA(raw_condition)} 

638 

639 for state_str, transition_config in self.transition_config.items(): 

640 error: str = "" 

641 condition_config = transition_config.get(CONF_CONDITIONS) 

642 if condition_config is None: 

643 error = "Empty conditions" 

644 _LOGGER.warning(f"AUTOARM Found no conditions for {state_str} transition") 

645 else: 

646 try: 

647 state = AlarmControlPanelState(state_str) 

648 cond: ConditionCheckerType | None = await self.hass_api.build_condition( 

649 condition_config, strict=True, validate=True, name=state_str 

650 ) 

651 

652 if cond: 

653 # re-run without strict wrapper 

654 cond = await self.hass_api.build_condition(condition_config, name=state_str) 

655 if cond: 

656 _LOGGER.debug(f"AUTOARM Validated transition logic for {state_str}") 

657 self.transitions[state] = cond 

658 else: 

659 _LOGGER.warning(f"AUTOARM Failed to validate transition logic for {state_str}") 

660 error = "Condition validation failed" 

661 except ValueError as ve: 

662 self.app_health_tracker.record_initialization_error(stage) 

663 error = f"Invalid state {ve}" 

664 _LOGGER.error(f"AUTOARM Invalid state in {state_str} transition - {ve}") 

665 except vol.Invalid as vi: 

666 self.app_health_tracker.record_initialization_error(stage) 

667 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant schema check {vi}") 

668 error = f"Schema error {vi}" 

669 except ConditionError as ce: 

670 _LOGGER.error(f"AUTOARM Transition {state_str} conditions fails Home Assistant condition check {ce}") 

671 if hasattr(ce, "message"): 

672 error = ce.message # type: ignore[attr-defined,unused-ignore] 

673 elif hasattr(ce, "error") and hasattr(ce.error, "message"): # type: ignore[attr-defined,unused-ignore] 

674 error = ce.error.message # type: ignore[attr-defined,unused-ignore] 

675 else: 

676 error = str(ce) 

677 except Exception as e: 

678 self.app_health_tracker.record_initialization_error(stage) 

679 _LOGGER.exception("AUTOARM Disabling transition %s with error validating %s", state_str, condition_config) 

680 error = f"Unknown exception {e}" 

681 if error: 

682 _LOGGER.warning(f"AUTOARM raising report issue for {error} on {state_str}") 

683 self.hass_api.raise_issue( 

684 f"transition_condition_{state_str}", 

685 is_fixable=False, 

686 issue_key="transition_condition", 

687 issue_map={"state": state_str, "error": error}, 

688 severity=ir.IssueSeverity.ERROR, 

689 ) 

690 

691 async def async_shutdown(self, _event: Event) -> None: 

692 _LOGGER.info("AUTOARM shut down event received") 

693 self.stop_listener = None 

694 self.shutdown() 

695 

696 def shutdown(self) -> None: 

697 _LOGGER.info("AUTOARM shutting down") 

698 for calendar in self.calendars: 

699 calendar.shutdown() 

700 while self.unsubscribes: 

701 unlisten(self.unsubscribes.pop()) 

702 unlisten(self.stop_listener) 

703 self.stop_listener = None 

704 _LOGGER.info("AUTOARM shut down") 

705 

706 def active_calendar_event(self) -> TrackedCalendarEvent | None: 

707 events: list[TrackedCalendarEvent] = [] 

708 for cal in self.calendars: 

709 events.extend(cal.active_events()) 

710 if events: 

711 # TODO: consider sorting events to LIFO 

712 return events[0] 

713 return None 

714 

715 def has_active_calendar_event(self) -> bool: 

716 return any(cal.has_active_event() for cal in self.calendars) 

717 

718 def is_occupied(self) -> bool | None: 

719 """Ternary - true at least one person entity has state home, false none of them, null if no occupants defined""" 

720 if self.occupants: 

721 return any(safe_state(self.hass.states.get(p)) == STATE_HOME for p in self.occupants) 

722 return None 

723 

724 def at_home(self) -> list[str] | None: 

725 if self.occupants: 

726 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) == STATE_HOME] 

727 return None 

728 

729 def not_home(self) -> list[str] | None: 

730 if self.occupants: 

731 return [p for p in self.occupants if safe_state(self.hass.states.get(p)) != STATE_HOME] 

732 return None 

733 

734 def is_unoccupied(self) -> bool | None: 

735 """Ternary - false at least one person entity has state home, true none of them, null if no occupants defined""" 

736 if self.occupants: 

737 return all(safe_state(self.hass.states.get(p)) != STATE_HOME for p in self.occupants) 

738 return None 

739 

740 def is_night(self) -> bool: 

741 return safe_state(self.hass.states.get("sun.sun")) == STATE_BELOW_HORIZON 

742 

743 def armed_state(self) -> AlarmControlPanelState: 

744 raw_state: str | None = safe_state(self.hass.states.get(self.alarm_panel)) 

745 alarm_state: AlarmControlPanelState | None = alarm_state_as_enum(raw_state) 

746 if alarm_state is None: 

747 _LOGGER.warning("AUTOARM No alarm state available - treating as PENDING") 

748 return AlarmControlPanelState.PENDING 

749 return alarm_state 

750 

751 def current_state(self) -> AlarmStateWithAttributes: 

752 state: State | None = self.hass.states.get(self.alarm_panel) 

753 source: ChangeSource | None = None 

754 alarm_state: AlarmControlPanelState | None = ( 

755 alarm_state_as_enum(state.state) if state and state.state is not None else None 

756 ) 

757 

758 if state and state.attributes and state.attributes.get(ATTR_CHANGED_BY): 

759 source = change_source_as_enum(state.attributes[ATTR_CHANGED_BY].split("_", 1)[-1]) 

760 return AlarmStateWithAttributes( 

761 state=alarm_state or AlarmControlPanelState.PENDING, 

762 source=source or ChangeSource.UNKNOWN, 

763 attributes=state.attributes if state else {}, 

764 ) 

765 

766 def _extract_event(self, event: Event[EventStateChangedData]) -> tuple[str | None, str | None, str | None, dict[str, str]]: 

767 entity_id = old = new = None 

768 new_attributes: dict[str, str] = {} 

769 if event and event.data: 

770 entity_id = event.data.get("entity_id") 

771 old_obj = event.data.get("old_state") 

772 if old_obj: 

773 old = old_obj.state 

774 new_obj = event.data.get("new_state") 

775 if new_obj: 

776 new = new_obj.state 

777 new_attributes = new_obj.attributes 

778 return entity_id, old, new, new_attributes 

779 

780 async def pending_state(self, source: ChangeSource | None, change_context: dict[str, Any] | None = None) -> None: 

781 self.pre_pending_state = self.armed_state() 

782 change_context = change_context or {} 

783 change_context.update({ 

784 "source": str(source), 

785 "original_caller": change_context.get("caller"), 

786 "caller": "pending_state", 

787 "pre_pending_state": self.pre_pending_state, 

788 }) 

789 await self.arm( 

790 AlarmControlPanelState.PENDING, 

791 source=source, 

792 change_context=change_context, 

793 ) 

794 

795 @callback 

796 async def delayed_reset_armed_state( 

797 self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs: Any 

798 ) -> None: 

799 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

800 if self.is_intervention_since_request(requested_at): 

801 return 

802 await self.reset_armed_state(**kwargs) 

803 

804 async def reset_armed_state( 

805 self, intervention: Intervention | None = None, source: ChangeSource | None = None 

806 ) -> str | None: 

807 """Logic to automatically work out appropriate current armed state""" 

808 state: AlarmControlPanelState | None = None 

809 existing_state: AlarmControlPanelState | None = None 

810 must_change_state: bool = False 

811 last_state_intervention: Intervention | None = None 

812 active_calendar_event: TrackedCalendarEvent | None = None 

813 

814 if source is None and intervention is not None: 

815 source = intervention.source 

816 _LOGGER.debug( 

817 "AUTOARM reset_armed_state(intervention=%s,source=%s)", 

818 intervention, 

819 source, 

820 ) 

821 reset_decision: str = "no_change" 

822 try: 

823 existing_state = self.armed_state() 

824 state = existing_state 

825 if self.calendars: 

826 active_calendar_event = self.active_calendar_event() 

827 if active_calendar_event: 

828 cal_state: AlarmControlPanelState = active_calendar_event.arming_state 

829 if ( 

830 source == ChangeSource.OCCUPANCY 

831 and cal_state is not None 

832 and active_calendar_event.is_recurring() 

833 and str(cal_state) in self.calendar_occupancy_override_states 

834 ): 

835 _LOGGER.debug("AUTOARM Allowing occupancy reset for recurring overridable calendar event %s", cal_state) 

836 else: 

837 _LOGGER.debug("AUTOARM Ignoring reset while calendar event active") 

838 reset_decision = "ignore_for_active_calendar_event" 

839 return existing_state 

840 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_MANUAL: 

841 _LOGGER.debug( 

842 "AUTOARM Ignoring reset while calendar configured, no active event, and default mode is manual" 

843 ) 

844 reset_decision = "ignore_for_calendar_manual_default" 

845 return existing_state 

846 if self.calendar_no_event_mode in AlarmControlPanelState: 

847 # TODO: may be dupe logic with on_cal event 

848 _LOGGER.debug("AUTOARM Applying fixed reset on end of calendar event, %s", self.calendar_no_event_mode) 

849 reset_decision = "reset_on_calendar_event_end" 

850 return await self.arm( 

851 alarm_state_as_enum(self.calendar_no_event_mode), 

852 source=ChangeSource.CALENDAR, 

853 change_context={ 

854 "reset_decision": reset_decision, 

855 "calendar_no_event_mode": self.calendar_no_event_mode, 

856 "caller": "reset_armed_state", 

857 }, 

858 ) 

859 if self.calendar_no_event_mode == NO_CAL_EVENT_MODE_AUTO: 

860 _LOGGER.debug("AUTOARM Applying reset while calendar configured, no active event, and default mode is auto") 

861 else: 

862 _LOGGER.warning("AUTOARM Unexpected state for calendar no event mode: %s", self.calendar_no_event_mode) 

863 

864 # TODO: expose as config ( for manual disarm override ) and condition logic 

865 must_change_state = existing_state is None or existing_state == AlarmControlPanelState.PENDING 

866 if intervention or source in (ChangeSource.CALENDAR, ChangeSource.OCCUPANCY) or must_change_state: 

867 _LOGGER.debug("AUTOARM Ignoring previous interventions") 

868 else: 

869 last_state_intervention = self.last_state_intervention() 

870 if last_state_intervention: 

871 _LOGGER.debug( 

872 "AUTOARM Ignoring automated reset for %s set by %s at %s", 

873 last_state_intervention.state, 

874 last_state_intervention.source, 

875 last_state_intervention.created_at, 

876 ) 

877 reset_decision = "ignore_after_manual_intervention" 

878 return existing_state 

879 state = self.determine_state() 

880 if state is not None and state != AlarmControlPanelState.PENDING and state != existing_state: 

881 reset_decision = "change_state" 

882 state = await self.arm( 

883 state, source=source, change_context={"reset_decision": reset_decision, "caller": "reset_armed_state"} 

884 ) 

885 

886 finally: 

887 self.hass.states.async_set( 

888 f"sensor.{DOMAIN}_last_calculation", 

889 str(state is not None and state != existing_state), 

890 attributes={ 

891 "new_state": str(state), 

892 "old_state": str(existing_state), 

893 "source": str(source), 

894 "active_calendar_event": deobjectify(active_calendar_event.event) if active_calendar_event else None, 

895 "occupied": self.is_occupied(), 

896 "night": self.is_night(), 

897 "must_change_state": str(must_change_state), 

898 "last_state_intervention": deobjectify(last_state_intervention), 

899 "intervention": intervention.as_dict() if intervention else None, 

900 "time": dt_util.now().isoformat(), 

901 "reset_decision": reset_decision, 

902 }, 

903 ) 

904 

905 return state 

906 

907 def is_intervention_since_request(self, requested_at: dt.datetime | None) -> bool: 

908 if requested_at is not None and self.has_intervention_since(requested_at): 

909 _LOGGER.debug( 

910 "AUTOARM Cancelling delayed operation since subsequent manual action", 

911 ) 

912 return True 

913 return False 

914 

915 def determine_state(self) -> AlarmControlPanelState | None: 

916 """Compute a new state using occupancy, sun and transition conditions""" 

917 evaluated_state: AlarmControlPanelState | None = None 

918 active_calendar_event: TrackedCalendarEvent | None = self.active_calendar_event() 

919 condition_vars: ConditionVariables = ConditionVariables( 

920 occupied=self.is_occupied(), 

921 unoccupied=self.is_unoccupied(), 

922 night=self.is_night(), 

923 state=self.armed_state(), 

924 calendar_event=active_calendar_event.event if active_calendar_event else None, 

925 occupied_defaults=self.occupied_defaults, 

926 at_home=self.at_home(), 

927 not_home=self.not_home(), 

928 ) 

929 for state, checker in self.transitions.items(): 

930 if self.hass_api.evaluate_condition(checker, condition_vars): 

931 _LOGGER.debug("AUTOARM Computed state as %s from condition", state) 

932 evaluated_state = state 

933 break 

934 if evaluated_state is None: 

935 return None 

936 return AlarmControlPanelState(evaluated_state) 

937 

938 @callback 

939 async def delayed_arm(self, triggered_at: dt.datetime, requested_at: dt.datetime | None, **kwargs: Any) -> None: 

940 _LOGGER.debug("AUTOARM delayed_arm at %s, requested_at: %s", triggered_at, requested_at) 

941 if self.is_intervention_since_request(requested_at): 

942 return 

943 await self.arm(**kwargs) 

944 

945 async def arm( 

946 self, 

947 arming_state: AlarmControlPanelState | None, 

948 source: ChangeSource | None = None, 

949 change_context: dict[str, Any] | None = None, 

950 ) -> AlarmControlPanelState | None: 

951 """Change alarm panel state 

952 

953 Args: 

954 ---- 

955 arming_state (str, optional): _description_. Defaults to None. 

956 source (str,optional): Source of the change, for example 'calendar' or 'button' 

957 change_context (dict,optional): Detailed context for the reason arm triggered 

958 

959 Returns: 

960 ------- 

961 str: New arming state 

962 

963 """ 

964 _LOGGER.debug("AUTOARM arm(arming_state=%s,source=%s,change_context=%s", arming_state, source, change_context) 

965 if arming_state is None: 

966 return None 

967 if self.armed_state() == arming_state: 

968 return None 

969 if self.arming_in_progress.is_set(): 

970 _LOGGER.warning("AUTOARM arming already in progress, skipping for %s", source) 

971 return None 

972 if self.rate_limiter.triggered(): 

973 _LOGGER.debug("AUTOARM Rate limit triggered by %s, skipping arm", source) 

974 return None 

975 try: 

976 self.arming_in_progress.set() 

977 existing_state: AlarmControlPanelState | None = self.armed_state() 

978 if arming_state != existing_state: 

979 attrs: dict[str, str] = {} 

980 panel_state: State | None = self.hass.states.get(self.alarm_panel) 

981 if panel_state: 

982 attrs.update(panel_state.attributes) 

983 attrs[ATTR_CHANGED_BY] = f"{DOMAIN}.{source}" 

984 self.hass.states.async_set(entity_id=self.alarm_panel, new_state=str(arming_state), attributes=attrs) 

985 

986 _LOGGER.info("AUTOARM Setting %s from %s to %s for %s", self.alarm_panel, existing_state, arming_state, source) 

987 if self.notifier and source and arming_state: 

988 await self.notifier.notify(source=source, from_state=existing_state, to_state=arming_state) 

989 

990 self.hass_api.fire_event( 

991 event_name="change", 

992 event_data={ 

993 "panel": self.alarm_panel, 

994 "panel_state": panel_state, 

995 "original_state": existing_state, 

996 "new_state": arming_state, 

997 "change_source": source, 

998 "occupied": self.is_occupied(), 

999 "night": self.is_night(), 

1000 "context": change_context or {}, 

1001 }, 

1002 ) 

1003 return arming_state 

1004 _LOGGER.debug("AUTOARM Skipping arm for %s, as %s already %s", source, self.alarm_panel, arming_state) 

1005 return existing_state 

1006 except Exception as e: 

1007 _LOGGER.error("AUTOARM Failed to arm: %s", e) 

1008 self.app_health_tracker.record_runtime_error() 

1009 finally: 

1010 self.arming_in_progress.clear() 

1011 return None 

1012 

1013 def schedule_state( 

1014 self, 

1015 trigger_time: dt.datetime, 

1016 state: AlarmControlPanelState | None, 

1017 intervention: Intervention | None, 

1018 source: ChangeSource | None = None, 

1019 ) -> None: 

1020 source = source or intervention.source if intervention else None 

1021 

1022 job: Callable[[dt.datetime], Coroutine[Any, Any, None] | None] 

1023 if state is None: 

1024 _LOGGER.debug("AUTOARM Delayed reset, triggered at: %s, source%s", trigger_time, source) 

1025 job = partial(self.delayed_reset_armed_state, intervention=intervention, source=source, requested_at=dt_util.now()) 

1026 else: 

1027 _LOGGER.debug("AUTOARM Delayed arm %s, triggered at: %s, source%s", state, trigger_time, source) 

1028 

1029 job = partial(self.delayed_arm, arming_state=state, source=source, requested_at=dt_util.now()) 

1030 

1031 self.unsubscribes.append( 

1032 async_track_point_in_time( 

1033 self.hass, 

1034 job, 

1035 trigger_time, 

1036 ) 

1037 ) 

1038 

1039 def record_intervention(self, source: ChangeSource, state: AlarmControlPanelState | None) -> Intervention: 

1040 intervention = Intervention(dt_util.now(), source, state) 

1041 self.interventions.append(intervention) 

1042 self.hass.states.async_set(f"sensor.{DOMAIN}_last_intervention", source, attributes=intervention.as_dict()) 

1043 

1044 return intervention 

1045 

1046 def has_intervention_since(self, cutoff: dt.datetime) -> bool: 

1047 """Has there been a manual intervention since the cutoff time""" 

1048 if not self.interventions: 

1049 return False 

1050 return any(intervention.created_at > cutoff for intervention in self.interventions) 

1051 

1052 def last_state_intervention(self) -> Intervention | None: 

1053 candidates: list[Intervention] = [i for i in self.interventions if i.state is not None] 

1054 if candidates: 

1055 return candidates[-1] 

1056 return None 

1057 

1058 @callback 

1059 async def on_sunrise(self, *args: Any) -> None: # noqa: ARG002 

1060 _LOGGER.debug("AUTOARM Sunrise") 

1061 now = dt_util.now() 

1062 if not self.sunrise_earliest or now.time() >= self.sunrise_earliest: 

1063 await self.reset_armed_state(source=ChangeSource.SUNRISE) 

1064 else: 

1065 _LOGGER.debug("AUTOARM Rescheduling delayed sunrise action to %s", self.sunrise_earliest) 

1066 self.schedule_state( 

1067 dt.datetime.combine(now.date(), self.sunrise_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE), 

1068 intervention=None, 

1069 state=None, 

1070 source=ChangeSource.SUNRISE, 

1071 ) 

1072 

1073 @callback 

1074 async def on_sunrise_latest(self, *args: Any) -> None: # noqa: ARG002 

1075 _LOGGER.debug("AUTOARM Sunrise latest cutoff reached") 

1076 await self.reset_armed_state(source=ChangeSource.SUNRISE) 

1077 

1078 @callback 

1079 async def on_sunset(self, *args: Any) -> None: # noqa: ARG002 

1080 _LOGGER.debug("AUTOARM Sunset") 

1081 now = dt_util.now() 

1082 if not self.sunset_earliest or now.time() >= self.sunset_earliest: 

1083 await self.reset_armed_state(source=ChangeSource.SUNSET) 

1084 else: 

1085 _LOGGER.debug("AUTOARM Rescheduling delayed sunset action to %s", self.sunset_earliest) 

1086 self.schedule_state( 

1087 dt.datetime.combine(now.date(), self.sunset_earliest, tzinfo=dt_util.DEFAULT_TIME_ZONE), 

1088 intervention=None, 

1089 state=None, 

1090 source=ChangeSource.SUNSET, 

1091 ) 

1092 

1093 @callback 

1094 async def on_sunset_latest(self, *args: Any) -> None: # noqa: ARG002 

1095 _LOGGER.debug("AUTOARM Sunset latest cutoff reached") 

1096 await self.reset_armed_state(source=ChangeSource.SUNSET) 

1097 

1098 @callback 

1099 async def on_mobile_action(self, event: Event) -> None: 

1100 _LOGGER.debug("AUTOARM Mobile Action: %s", event) 

1101 source: ChangeSource = ChangeSource.MOBILE 

1102 

1103 match event.data.get("action"): 

1104 case "ALARM_PANEL_DISARM": 

1105 self.record_intervention(source=source, state=AlarmControlPanelState.DISARMED) 

1106 await self.arm( 

1107 AlarmControlPanelState.DISARMED, 

1108 source=source, 

1109 change_context={"caller": "on_mobile_action", "event_data": event.data, "event_type": event.event_type}, 

1110 ) 

1111 case "ALARM_PANEL_RESET": 

1112 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

1113 case "ALARM_PANEL_AWAY": 

1114 self.record_intervention(source=source, state=AlarmControlPanelState.ARMED_AWAY) 

1115 await self.arm( 

1116 AlarmControlPanelState.ARMED_AWAY, 

1117 source=source, 

1118 change_context={"caller": "on_mobile_action", "event_data": event.data, "event_type": event.event_type}, 

1119 ) 

1120 case _: 

1121 _LOGGER.debug("AUTOARM Ignoring mobile action: %s", event.data) 

1122 

1123 @callback 

1124 async def on_alarm_state_button(self, state: AlarmControlPanelState, delay: dt.timedelta | None, event: Event) -> None: 

1125 _LOGGER.debug("AUTOARM Alarm %s Button: %s", state, event) 

1126 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=state) 

1127 if delay: 

1128 self.schedule_state(dt_util.now() + delay, state, intervention, source=ChangeSource.BUTTON) 

1129 if self.notifier: 

1130 await self.notifier.notify( 

1131 ChangeSource.BUTTON, 

1132 from_state=self.armed_state(), 

1133 to_state=state, 

1134 message=f"Alarm will be set to {state} in {delay}", 

1135 title=f"Arm set to {state} process starting", 

1136 ) 

1137 else: 

1138 await self.arm( 

1139 state, 

1140 source=ChangeSource.BUTTON, 

1141 change_context={ 

1142 "caller": "on_alarm_state_button", 

1143 "event_data": event.data, 

1144 "delay": str(delay), 

1145 "event_type": event.event_type, 

1146 }, 

1147 ) 

1148 

1149 @callback 

1150 async def on_reset_button(self, delay: dt.timedelta | None, event: Event) -> None: 

1151 _LOGGER.debug("AUTOARM Reset Button: %s", event) 

1152 intervention = self.record_intervention(source=ChangeSource.BUTTON, state=None) 

1153 if delay: 

1154 self.schedule_state(dt_util.now() + delay, None, intervention, ChangeSource.BUTTON) 

1155 if self.notifier: 

1156 await self.notifier.notify( 

1157 ChangeSource.BUTTON, 

1158 message=f"Alarm will be reset in {delay}", 

1159 title="Alarm reset wait initiated", 

1160 ) 

1161 else: 

1162 await self.reset_armed_state(intervention=self.record_intervention(source=ChangeSource.BUTTON, state=None)) 

1163 

1164 @callback 

1165 async def on_occupancy_change(self, event: Event[EventStateChangedData]) -> None: 

1166 """Listen for person state events 

1167 

1168 Args: 

1169 ---- 

1170 event (Event[EventStateChangedData]): state change event 

1171 

1172 """ 

1173 entity_id, old, new, new_attributes = self._extract_event(event) 

1174 if old == new: 

1175 _LOGGER.debug( 

1176 "AUTOARM Occupancy Non-state Change: %s, state:%s->%s, event: %s, attrs:%s", 

1177 entity_id, 

1178 old, 

1179 new, 

1180 event, 

1181 new_attributes, 

1182 ) 

1183 return 

1184 _LOGGER.debug( 

1185 "AUTOARM Occupancy state Change: %s, state:%s->%s, event: %s, attrs:%s", entity_id, old, new, event, new_attributes 

1186 ) 

1187 if new in self.occupied_delay: 

1188 self.schedule_state( 

1189 dt_util.now() + self.occupied_delay[new], state=None, intervention=None, source=ChangeSource.OCCUPANCY 

1190 ) 

1191 else: 

1192 await self.reset_armed_state(source=ChangeSource.OCCUPANCY) 

1193 

1194 @callback 

1195 async def on_panel_change(self, event: Event[EventStateChangedData]) -> None: 

1196 """Alarm Control Panel has been changed outside of AutoArm""" 

1197 entity_id, old, new, new_attributes = self._extract_event(event) 

1198 if new_attributes: 

1199 changed_by = new_attributes.get(ATTR_CHANGED_BY) 

1200 if changed_by and changed_by.startswith(f"{DOMAIN}."): 

1201 _LOGGER.debug( 

1202 "AUTOARM Panel Change Ignored: %s,%s: %s-->%s", 

1203 entity_id, 

1204 event.event_type, 

1205 old, 

1206 new, 

1207 ) 

1208 return 

1209 new_state: AlarmControlPanelState | None = alarm_state_as_enum(new) 

1210 old_state: AlarmControlPanelState | None = alarm_state_as_enum(old) 

1211 

1212 _LOGGER.info( 

1213 "AUTOARM Panel Change: %s,%s: %s-->%s", 

1214 entity_id, 

1215 event.event_type, 

1216 old, 

1217 new, 

1218 ) 

1219 self.record_intervention(ChangeSource.ALARM_PANEL, new_state) 

1220 if new in ZOMBIE_STATES: 

1221 _LOGGER.warning("AUTOARM Dezombifying %s ...", new) 

1222 await self.reset_armed_state(source=ChangeSource.ZOMBIFICATION) 

1223 elif new != old: 

1224 if self.notifier: 

1225 await self.notifier.notify(ChangeSource.ALARM_PANEL, old_state, new_state) 

1226 else: 

1227 _LOGGER.debug("AUTOARM panel change leaves state unchanged at %s", new) 

1228 

1229 @callback 

1230 async def housekeeping(self, triggered_at: dt.datetime) -> None: 

1231 _LOGGER.debug("AUTOARM Housekeeping starting, triggered at %s", triggered_at) 

1232 now = dt_util.now() 

1233 self.interventions = [i for i in self.interventions if now < i.created_at + dt.timedelta(minutes=self.intervention_ttl)] 

1234 for cal in self.calendars: 

1235 await cal.prune_events() 

1236 _LOGGER.debug("AUTOARM Housekeeping finished")