diff --git a/controllers/replication.storage/volumereplication_controller.go b/controllers/replication.storage/volumereplication_controller.go index fc15f6c49..442cc4da6 100644 --- a/controllers/replication.storage/volumereplication_controller.go +++ b/controllers/replication.storage/volumereplication_controller.go @@ -174,7 +174,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info("Replication handle", "ReplicationHandleName", replicationHandle) } - replicationClient, err := r.getReplicationClient(vrcObj.Spec.Provisioner) + replicationClient, err := r.getReplicationClient(ctx, vrcObj.Spec.Provisioner) if err != nil { logger.Error(err, "Failed to get ReplicationClient") @@ -453,27 +453,25 @@ func getInfoReconcileInterval(parameters map[string]string, logger logr.Logger) return scheduleTime / 2 } -func (r *VolumeReplicationReconciler) getReplicationClient(driverName string) (grpcClient.VolumeReplication, error) { - conns := r.Connpool.GetByNodeID(driverName, "") - - // Iterate through the connections and find the one that matches the driver name - // provided in the VolumeReplication spec; so that corresponding - // operations can be performed. - for _, v := range conns { - for _, cap := range v.Capabilities { - // validate if VOLUME_REPLICATION capability is supported by the driver. - if cap.GetVolumeReplication() == nil { - continue - } +func (r *VolumeReplicationReconciler) getReplicationClient(ctx context.Context, driverName string) (grpcClient.VolumeReplication, error) { + conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, driverName) + if err != nil { + return nil, fmt.Errorf("no leader for the ControllerService of driver %q", driverName) + } - // validate of VOLUME_REPLICATION capability is enabled by the storage driver. - if cap.GetVolumeReplication().GetType() == identity.Capability_VolumeReplication_VOLUME_REPLICATION { - return grpcClient.NewReplicationClient(v.Client, r.Timeout), nil - } + for _, cap := range conn.Capabilities { + // validate if VOLUME_REPLICATION capability is supported by the driver. + if cap.GetVolumeReplication() == nil { + continue + } + + // validate of VOLUME_REPLICATION capability is enabled by the storage driver. + if cap.GetVolumeReplication().GetType() == identity.Capability_VolumeReplication_VOLUME_REPLICATION { + return grpcClient.NewReplicationClient(conn.Client, r.Timeout), nil } } - return nil, fmt.Errorf("no connections for driver: %s", driverName) + return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support VolumeReplication", conn.Name, driverName) }