Skip to content

Commit

Permalink
Optimize and Expand RDR Acceptance Tests (#10230)
Browse files Browse the repository at this point in the history
- Consolidate subscription and appset workload tests into single test cases
 - Introduce tests for CephFS-based subscription and appset workloads

Signed-off-by: Sidhant Agrawal <[email protected]>
  • Loading branch information
sidhant-agrawal authored Sep 30, 2024
1 parent a380c7b commit 92040eb
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 339 deletions.
59 changes: 59 additions & 0 deletions ocs_ci/helpers/dr_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import tempfile
import time
from datetime import datetime

from ocs_ci.framework import config
from ocs_ci.ocs import constants, ocp
Expand Down Expand Up @@ -963,6 +964,64 @@ def get_all_drpolicy():
return drpolicy_list


def verify_last_group_sync_time(
drpc_obj, scheduling_interval, initial_last_group_sync_time=None
):
"""
Verifies that the lastGroupSyncTime for a given DRPC object is within the expected range.
Args:
drpc_obj (obj): DRPC object
scheduling_interval (int): The scheduling interval in minutes
initial_last_group_sync_time (str): Previous lastGroupSyncTime value (optional).
Returns:
str: Current lastGroupSyncTime
Raises:
AssertionError: If the lastGroupSyncTime is outside the expected range
(greater than or equal to three times the scheduling interval)
"""
restore_index = config.cur_index
config.switch_acm_ctx()
if initial_last_group_sync_time:
for last_group_sync_time in TimeoutSampler(
(3 * scheduling_interval * 60), 15, drpc_obj.get_last_group_sync_time
):
if last_group_sync_time:
if last_group_sync_time != initial_last_group_sync_time:
logger.info(
f"Verified: Current lastGroupSyncTime {last_group_sync_time} is different from "
f"previous value {initial_last_group_sync_time}"
)
break
logger.info(
"The value of lastGroupSyncTime in drpc is not updated. Retrying..."
)
else:
last_group_sync_time = drpc_obj.get_last_group_sync_time()

# Verify lastGroupSyncTime
time_format = "%Y-%m-%dT%H:%M:%SZ"
last_group_sync_time_formatted = datetime.strptime(
last_group_sync_time, time_format
)
current_time = datetime.strptime(
datetime.utcnow().strftime(time_format), time_format
)
time_since_last_sync = (
current_time - last_group_sync_time_formatted
).total_seconds() / 60
logger.info(f"Time in minutes since the last sync {time_since_last_sync}")
assert (
time_since_last_sync < 3 * scheduling_interval
), "The syncing of volumes is exceeding three times the scheduled snapshot interval"
logger.info("Verified lastGroupSyncTime value within expected range")
config.switch_ctx(restore_index)
return last_group_sync_time


def get_all_drclusters():
"""
Get all DRClusters
Expand Down
12 changes: 12 additions & 0 deletions ocs_ci/ocs/resources/drpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ def wait_for_progression_status(self, status):
result=True
), f"Progression status is not expected current status {self.get_progression_status()} expected status {status}"

def get_last_group_sync_time(self):
"""
Fetch lastGroupSyncTime from DRPC
Returns:
str: lastGroupSyncTime
"""
last_group_sync_time = self.get().get("status").get("lastGroupSyncTime")
logger.info(f"Current lastGroupSyncTime is {last_group_sync_time}.")
return last_group_sync_time


def get_drpc_name(namespace, switch_ctx=None):
"""
Expand Down
138 changes: 62 additions & 76 deletions tests/functional/disaster-recovery/regional-dr/test_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import pytest

from ocs_ci.framework import config
from ocs_ci.framework.testlib import acceptance, tier1
from ocs_ci.framework.pytest_customization.marks import turquoise_squad
from ocs_ci.framework.testlib import acceptance, tier1
from ocs_ci.helpers import dr_helpers
from ocs_ci.helpers.dr_helpers import (
wait_for_replication_destinations_creation,
Expand Down Expand Up @@ -42,55 +42,41 @@ class TestFailover:
"""

@pytest.mark.parametrize(
argnames=["primary_cluster_down", "pvc_interface", "workload_type"],
argnames=["primary_cluster_down", "pvc_interface"],
argvalues=[
pytest.param(
*[False, constants.CEPHBLOCKPOOL, constants.SUBSCRIPTION],
*[False, constants.CEPHBLOCKPOOL],
marks=pytest.mark.polarion_id(polarion_id_primary_up),
id="primary_up-rbd-subscription",
id="primary_up-rbd",
),
pytest.param(
*[True, constants.CEPHBLOCKPOOL, constants.SUBSCRIPTION],
*[True, constants.CEPHBLOCKPOOL],
marks=pytest.mark.polarion_id(polarion_id_primary_down),
id="primary_down-rbd-subscription",
),
pytest.param(
*[False, constants.CEPHBLOCKPOOL, constants.APPLICATION_SET],
marks=pytest.mark.polarion_id("OCS-5006"),
id="primary_up-rbd-appset",
id="primary_down-rbd",
),
pytest.param(
*[True, constants.CEPHBLOCKPOOL, constants.APPLICATION_SET],
marks=pytest.mark.polarion_id("OCS-5008"),
id="primary_down-rbd-appset",
),
pytest.param(
*[False, constants.CEPHFILESYSTEM, constants.SUBSCRIPTION],
*[False, constants.CEPHFILESYSTEM],
marks=pytest.mark.polarion_id("OCS-4729"),
id="primary_up-cephfs-subscription",
id="primary_up-cephfs",
),
pytest.param(
*[True, constants.CEPHFILESYSTEM, constants.SUBSCRIPTION],
*[True, constants.CEPHFILESYSTEM],
marks=pytest.mark.polarion_id("OCS-4726"),
id="primary_down-cephfs-subscription",
id="primary_down-cephfs",
),
],
)
def test_failover(
self,
primary_cluster_down,
pvc_interface,
workload_type,
setup_acm_ui,
dr_workload,
nodes_multicluster,
node_restart_teardown,
):
"""
Tests to verify application failover between managed clusters
There are two test cases:
1) Failover to secondary cluster when primary cluster is UP
2) Failover to secondary cluster when primary cluster is DOWN
Tests to verify application failover between managed clusters when the primary cluster is either UP or DOWN.
This test is also compatible to be run from ACM UI,
pass the yaml conf/ocsci/dr_ui.yaml to trigger it.
Expand All @@ -99,35 +85,30 @@ def test_failover(
if config.RUN.get("rdr_failover_via_ui"):
acm_obj = AcmAddClusters()

if workload_type == constants.SUBSCRIPTION:
rdr_workload = dr_workload(
num_of_subscription=1, pvc_interface=pvc_interface
)[0]
else:
rdr_workload = dr_workload(
num_of_subscription=0, num_of_appset=1, pvc_interface=pvc_interface
)[0]
workloads = dr_workload(
num_of_subscription=1, num_of_appset=1, pvc_interface=pvc_interface
)

primary_cluster_name = dr_helpers.get_current_primary_cluster_name(
rdr_workload.workload_namespace, workload_type
workloads[0].workload_namespace, workloads[0].workload_type
)
config.switch_to_cluster_by_name(primary_cluster_name)
primary_cluster_index = config.cur_index
primary_cluster_nodes = get_node_objs()
secondary_cluster_name = dr_helpers.get_current_secondary_cluster_name(
rdr_workload.workload_namespace, workload_type
workloads[0].workload_namespace, workloads[0].workload_type
)

if pvc_interface == constants.CEPHFILESYSTEM:
# Verify the creation of ReplicationDestination resources on secondary cluster
config.switch_to_cluster_by_name(secondary_cluster_name)
wait_for_replication_destinations_creation(
rdr_workload.workload_pvc_count, rdr_workload.workload_namespace
)
config.switch_to_cluster_by_name(primary_cluster_name)
for wl in workloads:
wait_for_replication_destinations_creation(
wl.workload_pvc_count, wl.workload_namespace
)

scheduling_interval = dr_helpers.get_scheduling_interval(
rdr_workload.workload_namespace, workload_type
workloads[0].workload_namespace, workloads[0].workload_type
)
wait_time = 2 * scheduling_interval # Time in minutes
logger.info(f"Waiting for {wait_time} minutes to run IOs")
Expand All @@ -140,6 +121,7 @@ def test_failover(

# Stop primary cluster nodes
if primary_cluster_down:
config.switch_to_cluster_by_name(primary_cluster_name)
logger.info(f"Stopping nodes of primary cluster: {primary_cluster_name}")
nodes_multicluster[primary_cluster_index].stop_nodes(primary_cluster_nodes)

Expand All @@ -154,36 +136,39 @@ def test_failover(
elif config.RUN.get("rdr_failover_via_ui"):
check_cluster_status_on_acm_console(acm_obj)

if config.RUN.get("rdr_failover_via_ui"):
# Failover via ACM UI
failover_relocate_ui(
acm_obj,
scheduling_interval=scheduling_interval,
workload_to_move=f"{rdr_workload.workload_name}-1",
policy_name=rdr_workload.dr_policy_name,
failover_or_preferred_cluster=secondary_cluster_name,
)
else:
# Failover action via CLI
dr_helpers.failover(
secondary_cluster_name,
rdr_workload.workload_namespace,
workload_type,
rdr_workload.appset_placement_name
if workload_type != constants.SUBSCRIPTION
else None,
)
for wl in workloads:
if config.RUN.get("rdr_failover_via_ui"):
# Failover via ACM UI
failover_relocate_ui(
acm_obj,
scheduling_interval=scheduling_interval,
workload_to_move=f"{wl.workload_name}-1",
policy_name=wl.dr_policy_name,
failover_or_preferred_cluster=secondary_cluster_name,
)
else:
# Failover action via CLI
dr_helpers.failover(
secondary_cluster_name,
wl.workload_namespace,
wl.workload_type,
wl.appset_placement_name
if wl.workload_type == constants.APPLICATION_SET
else None,
)

# Verify resources creation on secondary cluster (failoverCluster)
config.switch_to_cluster_by_name(secondary_cluster_name)
dr_helpers.wait_for_all_resources_creation(
rdr_workload.workload_pvc_count,
rdr_workload.workload_pod_count,
rdr_workload.workload_namespace,
)
for wl in workloads:
dr_helpers.wait_for_all_resources_creation(
wl.workload_pvc_count,
wl.workload_pod_count,
wl.workload_namespace,
)

# Verify resources deletion from primary cluster
config.switch_to_cluster_by_name(primary_cluster_name)

# Start nodes if cluster is down
if primary_cluster_down:
logger.info(
Expand All @@ -202,25 +187,26 @@ def test_failover(
), "Not all the pods reached running state"
logger.info("Checking for Ceph Health OK")
ceph_health_check()
dr_helpers.wait_for_all_resources_deletion(rdr_workload.workload_namespace)

for wl in workloads:
dr_helpers.wait_for_all_resources_deletion(wl.workload_namespace)

if pvc_interface == constants.CEPHFILESYSTEM:
config.switch_to_cluster_by_name(secondary_cluster_name)
# Verify the deletion of ReplicationDestination resources on secondary cluster
wait_for_replication_destinations_deletion(rdr_workload.workload_namespace)
config.switch_to_cluster_by_name(primary_cluster_name)
# Verify the creation of ReplicationDestination resources on primary cluster(current secondary)
wait_for_replication_destinations_creation(
rdr_workload.workload_pvc_count, rdr_workload.workload_namespace
)
for wl in workloads:
# Verify the deletion of ReplicationDestination resources on secondary cluster
config.switch_to_cluster_by_name(secondary_cluster_name)
wait_for_replication_destinations_deletion(wl.workload_namespace)
# Verify the creation of ReplicationDestination resources on primary cluster
config.switch_to_cluster_by_name(primary_cluster_name)
wait_for_replication_destinations_creation(
wl.workload_pvc_count, wl.workload_namespace
)

if pvc_interface == constants.CEPHBLOCKPOOL:
dr_helpers.wait_for_mirroring_status_ok(
replaying_images=rdr_workload.workload_pvc_count
replaying_images=sum([wl.workload_pvc_count for wl in workloads])
)

if config.RUN.get("rdr_failover_via_ui"):
config.switch_acm_ctx()
verify_failover_relocate_status_ui(acm_obj)

# TODO: Add data integrity checks
Loading

0 comments on commit 92040eb

Please sign in to comment.