From 0e14403d86693e6ee9e5e209d22fe35b32712bae Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Tue, 12 Dec 2023 09:43:23 +0100 Subject: [PATCH] replication: send VolumeReplication operations to the leading controller Just like NetworkFence operations should be sent to a preferred Controller Server, VolumeReplication operations should be sent there as well. Updates: #422 Signed-off-by: Niels de Vos --- .../volumereplication_controller.go | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/controllers/replication.storage/volumereplication_controller.go b/controllers/replication.storage/volumereplication_controller.go index fc15f6c49..442cc4da6 100644 --- a/controllers/replication.storage/volumereplication_controller.go +++ b/controllers/replication.storage/volumereplication_controller.go @@ -174,7 +174,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info("Replication handle", "ReplicationHandleName", replicationHandle) } - replicationClient, err := r.getReplicationClient(vrcObj.Spec.Provisioner) + replicationClient, err := r.getReplicationClient(ctx, vrcObj.Spec.Provisioner) if err != nil { logger.Error(err, "Failed to get ReplicationClient") @@ -453,27 +453,25 @@ func getInfoReconcileInterval(parameters map[string]string, logger logr.Logger) return scheduleTime / 2 } -func (r *VolumeReplicationReconciler) getReplicationClient(driverName string) (grpcClient.VolumeReplication, error) { - conns := r.Connpool.GetByNodeID(driverName, "") - - // Iterate through the connections and find the one that matches the driver name - // provided in the VolumeReplication spec; so that corresponding - // operations can be performed. - for _, v := range conns { - for _, cap := range v.Capabilities { - // validate if VOLUME_REPLICATION capability is supported by the driver. - if cap.GetVolumeReplication() == nil { - continue - } +func (r *VolumeReplicationReconciler) getReplicationClient(ctx context.Context, driverName string) (grpcClient.VolumeReplication, error) { + conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, driverName) + if err != nil { + return nil, fmt.Errorf("no leader for the ControllerService of driver %q", driverName) + } - // validate of VOLUME_REPLICATION capability is enabled by the storage driver. - if cap.GetVolumeReplication().GetType() == identity.Capability_VolumeReplication_VOLUME_REPLICATION { - return grpcClient.NewReplicationClient(v.Client, r.Timeout), nil - } + for _, cap := range conn.Capabilities { + // validate if VOLUME_REPLICATION capability is supported by the driver. + if cap.GetVolumeReplication() == nil { + continue + } + + // validate of VOLUME_REPLICATION capability is enabled by the storage driver. + if cap.GetVolumeReplication().GetType() == identity.Capability_VolumeReplication_VOLUME_REPLICATION { + return grpcClient.NewReplicationClient(conn.Client, r.Timeout), nil } } - return nil, fmt.Errorf("no connections for driver: %s", driverName) + return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support VolumeReplication", conn.Name, driverName) }