Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use dictionary instead of list for Controller.state['backups'] #175

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions myhoard/backup_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ class BaseBackupFailureReason(str, enum.Enum):
xtrabackup_error = "xtrabackup_error"


class BaseBackup(TypedDict, total=False):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved this from myhoard.Controller and added rest of expected fields. The total=False is mainly in case a basebackup.json file is not containing all keys (ideally shouldn't happen but just in case)

binlog_index: Optional[int]
binlog_name: Optional[str]
binlog_position: Optional[int]
backup_reason: Optional["BackupStream.BackupReason"]
compressed_size: float
encryption_key: bool
end_size: Optional[int]
end_ts: float
gtid: str
gtid_executed: Dict[str, List[List[int]]]
initiated_at: float
lsn_info: Optional[Dict[str, float]]
normalized_backup_time: Optional[str]
number_of_files: int
start_size: Optional[int]
start_ts: float
uploaded_from: int


class BackupStream(threading.Thread):
"""Handles creating a single consistent backup stream. 'stream' here refers to uninterrupted sequence
of backup data that can be used to restore the system to a consistent state. It includes the basebackup,
Expand Down Expand Up @@ -121,7 +141,7 @@ class State(TypedDict):
backup_errors: int
basebackup_errors: int
basebackup_file_metadata: Optional[Dict]
basebackup_info: Dict
basebackup_info: BaseBackup
broken_info: Dict
closed_info: Dict
completed_info: Dict
Expand Down Expand Up @@ -1007,7 +1027,7 @@ def _take_basebackup(self) -> None:
self.log.info("Last basebackup GTID %r, truncating GTID executed %r accordingly", last_gtid, gtid_executed)
truncate_gtid_executed(gtid_executed, last_gtid)

info = {
info: BaseBackup = {
"binlog_index": int(binlog_info["file_name"].split(".")[-1]) if binlog_info else None,
"binlog_name": binlog_info["file_name"] if binlog_info else None,
"binlog_position": binlog_info["file_position"] if binlog_info else None,
Expand Down
147 changes: 63 additions & 84 deletions myhoard/controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from .backup_stream import BackupStream, RemoteBinlogInfo
from .backup_stream import BackupStream, BaseBackup, RemoteBinlogInfo
from .binlog_scanner import BinlogScanner
from .errors import BadRequest, UnknownBackupSite
from .restore_coordinator import BinlogStream, RestoreCoordinator
Expand Down Expand Up @@ -46,10 +46,6 @@
ERR_BACKUP_IN_PROGRESS = 4085


class BaseBackup(TypedDict):
end_ts: float


class Backup(TypedDict):
basebackup_info: BaseBackup
closed_at: Optional[float]
Expand Down Expand Up @@ -83,12 +79,12 @@ class RestoreOptions(TypedDict):
target_time_approximate_ok: bool


def sort_completed_backups(backups: List[Backup]) -> List[Backup]:
def sort_completed_backups(backups: Dict[str, Backup]) -> List[Backup]:
def key(backup):
assert backup["completed_at"] is not None
return backup["completed_at"]

return sorted((backup for backup in backups if backup["completed_at"]), key=key)
return sorted((backup for backup in backups.values() if backup["completed_at"]), key=key)


class Controller(threading.Thread):
Expand Down Expand Up @@ -118,7 +114,7 @@ class Mode(str, enum.Enum):

class State(TypedDict):
backup_request: Optional[BackupRequest]
backups: List[Backup]
backups: Dict[str, Backup]
backups_fetched_at: int
binlogs_purged_at: int
errors: int
Expand Down Expand Up @@ -203,7 +199,7 @@ def __init__(
self.site_transfers: Dict[str, BaseTransfer] = {}
self.state: Controller.State = {
"backup_request": None,
"backups": [],
"backups": {},
"backups_fetched_at": 0,
"binlogs_purged_at": 0,
"errors": 0,
Expand Down Expand Up @@ -298,27 +294,25 @@ def restore_backup(
# Could consider allowing restore request also when mode is `restore`
raise ValueError(f"Current mode is {self.mode}, restore only allowed while in idle mode")

for backup in list(self.state["backups"]):
if backup["stream_id"] != stream_id or backup["site"] != site:
continue
if not backup["basebackup_info"]:
raise ValueError(f"Backup {backup!r} cannot be restored")

if backup.get("broken_at"):
raise ValueError(f"Cannot restore a broken backup: {backup!r}")

if target_time:
if target_time < backup["basebackup_info"]["end_ts"]:
raise ValueError(f"Requested target time {target_time} predates backup completion: {backup!r}")
# Caller must make sure they pick a backup that contains the requested target time. If this backup
# has been closed (will not get any further updates) at a time that is before the requested target
# time it is not possible to satisfy the request
if backup["closed_at"] and target_time > backup["closed_at"]:
raise ValueError(f"Requested target time {target_time} is after backup close: {backup!r}")
break
else:
backup = self.state["backups"].get(stream_id)
if not backup or backup["site"] != site:
raise ValueError(f"Requested backup {stream_id!r} for site {site!r} not found")

if not backup["basebackup_info"]:
raise ValueError(f"Backup {backup!r} cannot be restored")

if backup.get("broken_at"):
raise ValueError(f"Cannot restore a broken backup: {backup!r}")

if target_time:
if target_time < backup["basebackup_info"]["end_ts"]:
raise ValueError(f"Requested target time {target_time} predates backup completion: {backup!r}")
# Caller must make sure they pick a backup that contains the requested target time. If this backup
# has been closed (will not get any further updates) at a time that is before the requested target
# time it is not possible to satisfy the request
if backup["closed_at"] and target_time > backup["closed_at"]:
raise ValueError(f"Requested target time {target_time} is after backup close: {backup!r}")

self.log.info(
"Restoring backup stream %r, target time %r%s",
stream_id,
Expand Down Expand Up @@ -563,38 +557,28 @@ def collect_binlogs_to_purge(
break
return binlogs_to_purge, bool(only_binlogs_without_gtids or only_binlogs_that_are_too_new)

@staticmethod
def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_infos=None, site_transfers=None):
if seen_basebackup_infos is None:
seen_basebackup_infos = {}
if site_transfers is None:
site_transfers = {}
backups = []
for site_name, site_config in backup_sites.items():
file_storage = site_transfers.get(site_name)
def get_backups(self) -> Dict[str, Backup]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed previous arguments... 🤷‍♀️ we were just passing class properties that can be accessed here and it was quite odd.

backups = {}
for site_name, site_config in self.backup_sites.items():
file_storage = self.site_transfers.get(site_name)
if file_storage is None:
file_storage = get_transfer(site_config["object_storage"])
site_transfers[site_name] = file_storage
self.site_transfers[site_name] = file_storage
streams = list(file_storage.list_prefixes(site_name))
for site_and_stream_id in streams:
basebackup_compressed_size = None
basebackup_info = {}
basebackup_info: BaseBackup = self.seen_basebackup_infos.get(site_and_stream_id, {})
broken_info = {}
closed_info = {}
completed_info = {}
preserved_info = {}
for info in file_storage.list_iter(site_and_stream_id):
file_name = info["name"].rsplit("/", 1)[-1]
if file_name == "basebackup.xbstream":
basebackup_compressed_size = info["size"]
elif file_name == "basebackup.json":
if file_name == "basebackup.json" and not basebackup_info:
# The basebackup info json contents never change after creation so we can use cached
# value if available to avoid re-fetching the same content over and over again
basebackup_info = seen_basebackup_infos.get(site_and_stream_id)
if basebackup_info is None:
info_str, _ = file_storage.get_contents_to_string(info["name"])
basebackup_info = json.loads(info_str.decode("utf-8"))
seen_basebackup_infos[site_and_stream_id] = basebackup_info
info_str, _ = file_storage.get_contents_to_string(info["name"])
basebackup_info = json.loads(info_str.decode("utf-8"))
self.seen_basebackup_infos[site_and_stream_id] = basebackup_info
elif file_name == "broken.json":
broken_info = parse_fs_metadata(info["metadata"])
elif file_name == "closed.json":
Expand All @@ -604,25 +588,23 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
elif file_name == "preserved.json":
preserved_info = parse_fs_metadata(info["metadata"])

if basebackup_info and basebackup_compressed_size:
basebackup_info = dict(basebackup_info, compressed_size=basebackup_compressed_size)
resumable = basebackup_info and basebackup_compressed_size
resumable = basebackup_info and basebackup_info["compressed_size"]
completed = resumable and completed_info
closed = completed and closed_info

preserve_until = preserved_info.get("preserve_until")
backups.append(
{
"basebackup_info": basebackup_info,
"broken_at": broken_info.get("broken_at"),
"closed_at": closed_info["closed_at"] if closed else None,
"completed_at": completed_info["completed_at"] if completed else None,
"preserve_until": preserve_until,
"recovery_site": site_config.get("recovery_only", False),
"stream_id": site_and_stream_id.rsplit("/", 1)[-1],
"resumable": bool(resumable),
"site": site_name,
}

stream_id = site_and_stream_id.rsplit("/", 1)[-1]
backups[stream_id] = Backup(
basebackup_info=basebackup_info,
broken_at=broken_info.get("broken_at"),
closed_at=closed_info["closed_at"] if closed else None,
completed_at=completed_info["completed_at"] if completed else None,
preserve_until=preserve_until,
recovery_site=site_config.get("recovery_only", False),
stream_id=stream_id,
resumable=bool(resumable),
site=site_name,
)
return backups

Expand Down Expand Up @@ -1110,13 +1092,14 @@ def _get_upload_backup_site(self):
def _get_site_for_stream_id(self, stream_id: str):
backup = self.get_backup_by_stream_id(stream_id)
if not backup:
KeyError(f"Stream {stream_id} not found in backups")
raise KeyError(f"Stream {stream_id} not found in backups")

return backup["site"]

def get_backup_by_stream_id(self, stream_id: str):
for backup in self.state["backups"]:
if backup["stream_id"] == stream_id:
return backup
with self.lock:
if stream_id in self.state["backups"]:
return self.state["backups"][stream_id]

return None

Expand Down Expand Up @@ -1290,7 +1273,7 @@ def _process_removed_binlogs(self, binlogs):
stream.remove_binlogs(binlogs)

def _purge_old_backups(self):
purgeable = [backup for backup in self.state["backups"] if backup["completed_at"]]
purgeable = sort_completed_backups(self.state["backups"])
broken_backups_count = sum(backup["broken_at"] is not None for backup in purgeable)
# do not consider broken backups for the count, they will still be purged
# but we should only purge when the count of non-broken backups has exceeded the limit.
Expand All @@ -1310,7 +1293,6 @@ def _purge_old_backups(self):
# For simplicity only ever drop one backup here. This function
# is called repeatedly so if there are for any reason more backups
# to drop they will be dropped soon enough
purgeable = sort_completed_backups(purgeable)
backup = purgeable[0]

if not backup["closed_at"]:
Expand Down Expand Up @@ -1342,11 +1324,11 @@ def _purge_old_backups(self):
# lock the controller, this way other requests do not access backups till backup is purged
with self.lock:
self.state_manager.update_state(stream_to_be_purged=None)
current_backups = [
current_backup
for current_backup in self.state["backups"]
if current_backup["stream_id"] != backup["stream_id"]
]
current_backups = {
stream_id: current_backup
for stream_id, current_backup in self.state["backups"].items()
if stream_id != backup["stream_id"]
}
self.state_manager.update_state(backups=current_backups)
owned_stream_ids = [sid for sid in self.state["owned_stream_ids"] if sid != backup["stream_id"]]
self.state_manager.update_state(owned_stream_ids=owned_stream_ids)
Expand Down Expand Up @@ -1450,21 +1432,19 @@ def _purge_old_binlogs(self, *, mysql_maybe_not_running=False):
self.stats.gauge_float("myhoard.binlog.time_since_any_purged", current_time - last_purge)
self.stats.gauge_float("myhoard.binlog.time_since_could_have_purged", current_time - last_could_have_purged)

def _refresh_backups_list(self, force_refresh: bool = False):
def _refresh_backups_list(self, force_refresh: bool = False) -> Optional[Dict[str, Backup]]:
interval = self.backup_refresh_interval_base
if self.mode == self.Mode.active:
interval *= self.BACKUP_REFRESH_ACTIVE_MULTIPLIER

if force_refresh is False and time.time() - self.state["backups_fetched_at"] < interval:
return None

backups = self.get_backup_list(
self.backup_sites, seen_basebackup_infos=self.seen_basebackup_infos, site_transfers=self.site_transfers
)
new_backups_ids = {backup["stream_id"] for backup in backups}
for backup in self.state["backups"]:
if backup["stream_id"] not in new_backups_ids:
self._delete_backup_stream_state(backup["stream_id"])
backups = self.get_backups()

stream_ids_to_delete = set(self.state["backups"]) - set(backups)
for stream_id in stream_ids_to_delete:
self._delete_backup_stream_state(stream_id)

with self.lock:
self.state_manager.update_state(backups=backups, backups_fetched_at=time.time())
Expand Down Expand Up @@ -1518,8 +1498,7 @@ def _refresh_backups_list_and_streams(self):
# Keep any streams that are in basebackup phase because those haven't necessarily
# yet uploaded any files so the remote backup directory might not exist
new_streams = basebackup_streams
for backup in backups:
stream_id = backup["stream_id"]
for stream_id, backup in backups.items():
site_info = self.backup_sites.get(backup["site"])
# We do not create backup streams for recovery sites. Those are only used for restoring
# basic backup data, never to stream any changes. Also, if config is updated not to
Expand Down
2 changes: 1 addition & 1 deletion myhoard/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def backup_list(self, _request):
}
with self.controller.lock:
if self.controller.state["backups_fetched_at"]:
response["backups"] = self.controller.state["backups"]
response["backups"] = list(self.controller.state["backups"].values())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment left as a list here, I don't see a reason for changing the response type

return json_response(response)

async def backup_preserve(self, request):
Expand Down
Loading
Loading