Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions homeassistant/components/backup/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
issue_registry as ir,
start,
)
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.json import json_bytes
from homeassistant.util import dt as dt_util, json as json_util
from homeassistant.util.async_iterator import AsyncIteratorReader
Expand Down Expand Up @@ -78,6 +79,8 @@
validate_password_stream,
)

UPLOAD_PROGRESS_DEBOUNCE_SECONDS = 1


@dataclass(frozen=True, kw_only=True, slots=True)
class NewBackup:
Expand Down Expand Up @@ -590,23 +593,49 @@ async def upload_backup_to_agent(agent_id: str) -> None:
)
agent = self.backup_agents[agent_id]

latest_uploaded_bytes = 0

@callback
def on_upload_progress(*, bytes_uploaded: int, **kwargs: Any) -> None:
"""Handle upload progress."""
def _emit_upload_progress() -> None:
"""Emit the latest upload progress event."""
self.async_on_backup_event(
UploadBackupEvent(
manager_state=self.state,
agent_id=agent_id,
uploaded_bytes=bytes_uploaded,
uploaded_bytes=latest_uploaded_bytes,
total_bytes=_backup.size,
)
)

upload_progress_debouncer: Debouncer[None] = Debouncer(
self.hass,
LOGGER,
cooldown=UPLOAD_PROGRESS_DEBOUNCE_SECONDS,
immediate=True,
function=_emit_upload_progress,
)

@callback
def on_upload_progress(*, bytes_uploaded: int, **kwargs: Any) -> None:
"""Handle upload progress."""
nonlocal latest_uploaded_bytes
latest_uploaded_bytes = bytes_uploaded
upload_progress_debouncer.async_schedule_call()

await agent.async_upload_backup(
open_stream=open_stream_func,
backup=_backup,
on_progress=on_upload_progress,
)
upload_progress_debouncer.async_cancel()
self.async_on_backup_event(
UploadBackupEvent(
manager_state=self.state,
agent_id=agent_id,
uploaded_bytes=_backup.size,
total_bytes=_backup.size,
)
)
if streamer:
await streamer.wait()

Expand Down
18 changes: 9 additions & 9 deletions tests/components/backup/snapshots/test_websocket.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -5619,10 +5619,10 @@
# name: test_generate[None].6
dict({
'event': dict({
'agent_id': 'backup.local',
'manager_state': 'create_backup',
'reason': None,
'stage': None,
'state': 'completed',
'total_bytes': 10240,
'uploaded_bytes': 10240,
}),
'id': 1,
'type': 'event',
Expand Down Expand Up @@ -5694,10 +5694,10 @@
# name: test_generate[data1].6
dict({
'event': dict({
'agent_id': 'backup.local',
'manager_state': 'create_backup',
'reason': None,
'stage': None,
'state': 'completed',
'total_bytes': 10240,
'uploaded_bytes': 10240,
}),
'id': 1,
'type': 'event',
Expand Down Expand Up @@ -5769,10 +5769,10 @@
# name: test_generate[data2].6
dict({
'event': dict({
'agent_id': 'backup.local',
'manager_state': 'create_backup',
'reason': None,
'stage': None,
'state': 'completed',
'total_bytes': 10240,
'uploaded_bytes': 10240,
}),
'id': 1,
'type': 'event',
Expand Down
141 changes: 126 additions & 15 deletions tests/components/backup/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
from collections.abc import Callable, Generator
from dataclasses import replace
from datetime import timedelta
from io import StringIO
import json
from pathlib import Path
Expand Down Expand Up @@ -47,12 +48,14 @@
ReceiveBackupStage,
ReceiveBackupState,
RestoreBackupState,
UploadBackupEvent,
WrittenBackup,
)
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import issue_registry as ir
from homeassistant.util import dt as dt_util

from .common import (
LOCAL_AGENT_ID,
Expand All @@ -65,6 +68,7 @@
setup_backup_platform,
)

from tests.common import async_fire_time_changed
from tests.typing import ClientSessionGenerator, WebSocketGenerator

_EXPECTED_FILES = [
Expand Down Expand Up @@ -596,7 +600,10 @@ async def test_initiate_backup(
"state": CreateBackupState.IN_PROGRESS,
}

# Consume any upload progress events before the final state event
result = await ws_client.receive_json()
while "uploaded_bytes" in result["event"]:
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"reason": None,
Expand Down Expand Up @@ -843,7 +850,10 @@ async def test_initiate_backup_with_agent_error(
"state": CreateBackupState.IN_PROGRESS,
}

# Consume any upload progress events before the final state event
result = await ws_client.receive_json()
while "uploaded_bytes" in result["event"]:
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"reason": "upload_failed",
Expand Down Expand Up @@ -1401,7 +1411,10 @@ async def test_initiate_backup_non_agent_upload_error(
"state": CreateBackupState.IN_PROGRESS,
}

# Consume any upload progress events before the final state event
result = await ws_client.receive_json()
while "uploaded_bytes" in result["event"]:
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"reason": "upload_failed",
Expand Down Expand Up @@ -1594,7 +1607,10 @@ async def test_initiate_backup_file_error_upload_to_agents(
"state": CreateBackupState.IN_PROGRESS,
}

# Consume any upload progress events before the final state event
result = await ws_client.receive_json()
while "uploaded_bytes" in result["event"]:
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"reason": "upload_failed",
Expand Down Expand Up @@ -2709,7 +2725,10 @@ async def test_receive_backup_file_read_error(
"state": ReceiveBackupState.IN_PROGRESS,
}

# Consume any upload progress events before the final state event
result = await ws_client.receive_json()
while "uploaded_bytes" in result["event"]:
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.RECEIVE_BACKUP,
"reason": final_state_reason,
Expand Down Expand Up @@ -3526,7 +3545,10 @@ async def test_initiate_backup_per_agent_encryption(
"state": CreateBackupState.IN_PROGRESS,
}

# Consume any upload progress events before the final state event
result = await ws_client.receive_json()
while "uploaded_bytes" in result["event"]:
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"reason": None,
Expand Down Expand Up @@ -3761,25 +3783,114 @@ async def upload_with_progress(**kwargs: Any) -> None:
result = await ws_client.receive_json()
assert result["event"]["stage"] == CreateBackupStage.UPLOAD_TO_AGENTS

# Upload progress events for the remote agent
# Collect all upload progress events until the final state event
progress_events = []
result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"agent_id": "test.remote",
"uploaded_bytes": 500,
"total_bytes": ANY,
}
while "uploaded_bytes" in result["event"]:
progress_events.append(result["event"])
result = await ws_client.receive_json()

result = await ws_client.receive_json()
assert result["event"] == {
"manager_state": BackupManagerState.CREATE_BACKUP,
"agent_id": "test.remote",
"uploaded_bytes": 1000,
"total_bytes": ANY,
}
# Verify progress events from the remote agent (500 from agent + final from manager)
remote_progress = [e for e in progress_events if e["agent_id"] == "test.remote"]
assert len(remote_progress) == 2
assert remote_progress[0]["uploaded_bytes"] == 500
assert remote_progress[1]["uploaded_bytes"] == remote_progress[1]["total_bytes"]

# Verify progress event from the local agent (final from manager)
local_progress = [e for e in progress_events if e["agent_id"] == LOCAL_AGENT_ID]
assert len(local_progress) == 1
assert local_progress[0]["uploaded_bytes"] == local_progress[0]["total_bytes"]

result = await ws_client.receive_json()
assert result["event"]["state"] == CreateBackupState.COMPLETED

result = await ws_client.receive_json()
assert result["event"] == {"manager_state": BackupManagerState.IDLE}


async def test_upload_progress_debounced(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
generate_backup_id: MagicMock,
) -> None:
"""Test that rapid upload progress events are debounced.

Verify that when the on_progress callback is called multiple times during
the debounce cooldown period, only the latest event is fired.
"""
agent_ids = ["test.remote"]
mock_agents = await setup_backup_integration(hass, remote_agents=["test.remote"])
manager = hass.data[DATA_MANAGER]

remote_agent = mock_agents["test.remote"]

progress_done = asyncio.Event()
upload_done = asyncio.Event()

async def upload_with_progress(**kwargs: Any) -> None:
"""Upload and report progress."""
on_progress = kwargs["on_progress"]
# First call fires immediately
on_progress(bytes_uploaded=100)
# These two are buffered during cooldown; 1000 should replace 500
on_progress(bytes_uploaded=500)
on_progress(bytes_uploaded=1000)
progress_done.set()
await upload_done.wait()

remote_agent.async_upload_backup.side_effect = upload_with_progress

# Subscribe directly to collect all events
events: list[Any] = []
manager.async_subscribe_events(events.append)

ws_client = await hass_ws_client(hass)

with patch("pathlib.Path.open", mock_open(read_data=b"test")):
await ws_client.send_json_auto_id(
{"type": "backup/generate", "agent_ids": agent_ids}
)
result = await ws_client.receive_json()
assert result["success"] is True

# Wait for upload to reach the sync point (progress reported, upload paused)
await progress_done.wait()

# At this point the debouncer's cooldown timer is pending.
# The first event (100 bytes) fired immediately, 500 and 1000 are buffered.
remote_events = [
e
for e in events
if isinstance(e, UploadBackupEvent) and e.agent_id == "test.remote"
]
assert len(remote_events) == 1
assert remote_events[0].uploaded_bytes == 100

# Advance time past the cooldown to trigger the debouncer timer.
# This fires the coalesced event: 500 was replaced by 1000.
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=2))

remote_events = [
e
for e in events
if isinstance(e, UploadBackupEvent) and e.agent_id == "test.remote"
]
assert len(remote_events) == 2
assert remote_events[0].uploaded_bytes == 100
assert remote_events[1].uploaded_bytes == 1000

# Let the upload finish
upload_done.set()
# Fire pending timers so the backup task can complete
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=10), fire_all=True
)
await hass.async_block_till_done()

# Check the final 100% progress event is sent, that is sent for every agent
remote_events = [
e
for e in events
if isinstance(e, UploadBackupEvent) and e.agent_id == "test.remote"
]
assert len(remote_events) == 3
assert remote_events[2].uploaded_bytes == remote_events[2].total_bytes
Loading
Loading