Skip to content

Commit

Permalink
replication: send VolumeReplication operations to the leading controller
Browse files Browse the repository at this point in the history
Just like NetworkFence operations should be sent to a preferred
Controller Server, VolumeReplication operations should be sent
there as well.

Updates: csi-addons#422
Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic authored and mergify[bot] committed Dec 15, 2023
1 parent f5192a6 commit 0e14403
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions controllers/replication.storage/volumereplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
logger.Info("Replication handle", "ReplicationHandleName", replicationHandle)
}

replicationClient, err := r.getReplicationClient(vrcObj.Spec.Provisioner)
replicationClient, err := r.getReplicationClient(ctx, vrcObj.Spec.Provisioner)
if err != nil {
logger.Error(err, "Failed to get ReplicationClient")

Expand Down Expand Up @@ -453,27 +453,25 @@ func getInfoReconcileInterval(parameters map[string]string, logger logr.Logger)
return scheduleTime / 2
}

func (r *VolumeReplicationReconciler) getReplicationClient(driverName string) (grpcClient.VolumeReplication, error) {
conns := r.Connpool.GetByNodeID(driverName, "")

// Iterate through the connections and find the one that matches the driver name
// provided in the VolumeReplication spec; so that corresponding
// operations can be performed.
for _, v := range conns {
for _, cap := range v.Capabilities {
// validate if VOLUME_REPLICATION capability is supported by the driver.
if cap.GetVolumeReplication() == nil {
continue
}
func (r *VolumeReplicationReconciler) getReplicationClient(ctx context.Context, driverName string) (grpcClient.VolumeReplication, error) {
conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, driverName)
if err != nil {
return nil, fmt.Errorf("no leader for the ControllerService of driver %q", driverName)
}

// validate of VOLUME_REPLICATION capability is enabled by the storage driver.
if cap.GetVolumeReplication().GetType() == identity.Capability_VolumeReplication_VOLUME_REPLICATION {
return grpcClient.NewReplicationClient(v.Client, r.Timeout), nil
}
for _, cap := range conn.Capabilities {
// validate if VOLUME_REPLICATION capability is supported by the driver.
if cap.GetVolumeReplication() == nil {
continue
}

// validate of VOLUME_REPLICATION capability is enabled by the storage driver.
if cap.GetVolumeReplication().GetType() == identity.Capability_VolumeReplication_VOLUME_REPLICATION {
return grpcClient.NewReplicationClient(conn.Client, r.Timeout), nil
}
}

return nil, fmt.Errorf("no connections for driver: %s", driverName)
return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support VolumeReplication", conn.Name, driverName)

}

Expand Down

0 comments on commit 0e14403

Please sign in to comment.