Skip to content

Commit

Permalink
Merge pull request #588 from nilo19/chore/fix-log
Browse files Browse the repository at this point in the history
fix: support sharing the primary slb when there are both external and internal load balancers in the cluster
  • Loading branch information
k8s-ci-robot authored Apr 19, 2021
2 parents 94849ca + 982e796 commit 8d3bdbd
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 115 deletions.
37 changes: 37 additions & 0 deletions pkg/provider/azure_backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package provider

import (
"errors"
"fmt"
"net/http"
"regexp"
"strings"
Expand All @@ -29,6 +30,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -256,6 +258,41 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer)
return rerr.Error()
}

// ListAgentPoolLBs invokes az.LoadBalancerClient.List and filter out
// those that are not managed by cloud provider azure or not associated to a managed VMSet.
func (az *Cloud) ListAgentPoolLBs(service *v1.Service, nodes []*v1.Node, clusterName string) ([]network.LoadBalancer, error) {
allLBs, err := az.ListLB(service)
if err != nil {
return nil, err
}

if allLBs == nil {
klog.Warningf("ListAgentPoolLBs: no LBs found")
return nil, nil
}

agentPoolLBs := make([]network.LoadBalancer, 0)
agentPoolVMSetNames, err := az.VMSet.GetAgentPoolVMSetNames(nodes)
if err != nil {
return nil, fmt.Errorf("ListAgentPoolLBs: failed to get agent pool vmSet names: %w", err)
}
agentPoolVMSetNamesSet := sets.NewString()
if agentPoolVMSetNames != nil && len(*agentPoolVMSetNames) > 0 {
for _, vmSetName := range *agentPoolVMSetNames {
agentPoolVMSetNamesSet.Insert(strings.ToLower(vmSetName))
}
}

for _, lb := range allLBs {
vmSetNameFromLBName := az.mapLoadBalancerNameToVMSet(to.String(lb.Name), clusterName)
if agentPoolVMSetNamesSet.Has(strings.ToLower(vmSetNameFromLBName)) {
agentPoolLBs = append(agentPoolLBs, lb)
}
}

return agentPoolLBs, nil
}

// ListLB invokes az.LoadBalancerClient.List with exponential backoff retry
func (az *Cloud) ListLB(service *v1.Service) ([]network.LoadBalancer, error) {
ctx, cancel := getContextWithCancel()
Expand Down
36 changes: 30 additions & 6 deletions pkg/provider/azure_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,15 @@ func TestCreateOrUpdateLB(t *testing.T) {
}
}

func TestListLB(t *testing.T) {
func TestListAgentPoolLBs(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
clientErr *retry.Error
expectedErr error
existingLBs, expectedLBs []network.LoadBalancer
callTimes int
clientErr *retry.Error
expectedErr error
}{
{
clientErr: &retry.Error{HTTPStatusCode: http.StatusInternalServerError},
Expand All @@ -308,15 +310,37 @@ func TestListLB(t *testing.T) {
clientErr: &retry.Error{HTTPStatusCode: http.StatusNotFound},
expectedErr: nil,
},
{
existingLBs: []network.LoadBalancer{
{Name: to.StringPtr("kubernetes")},
{Name: to.StringPtr("kubernetes-internal")},
{Name: to.StringPtr("vmas-1")},
{Name: to.StringPtr("vmas-1-internal")},
{Name: to.StringPtr("unmanaged")},
{Name: to.StringPtr("unmanaged-internal")},
},
expectedLBs: []network.LoadBalancer{
{Name: to.StringPtr("kubernetes")},
{Name: to.StringPtr("kubernetes-internal")},
{Name: to.StringPtr("vmas-1")},
{Name: to.StringPtr("vmas-1-internal")},
},
callTimes: 1,
},
}
for _, test := range tests {
az := GetTestCloud(ctrl)

mockLBClient := az.LoadBalancerClient.(*mockloadbalancerclient.MockInterface)
mockLBClient.EXPECT().List(gomock.Any(), az.ResourceGroup).Return(nil, test.clientErr)
mockLBClient.EXPECT().List(gomock.Any(), az.ResourceGroup).Return(test.existingLBs, test.clientErr)
mockVMSet := NewMockVMSet(ctrl)
mockVMSet.EXPECT().GetAgentPoolVMSetNames(gomock.Any()).Return(&[]string{"vmas-0", "vmas-1"}, nil).Times(test.callTimes)
mockVMSet.EXPECT().GetPrimaryVMSetName().Return("vmas-0").AnyTimes()
az.VMSet = mockVMSet

lbs, err := az.ListLB(&v1.Service{})
lbs, err := az.ListAgentPoolLBs(&v1.Service{}, []*v1.Node{}, "kubernetes")
assert.Equal(t, test.expectedErr, err)
assert.Empty(t, lbs)
assert.Equal(t, test.expectedLBs, lbs)
}
}

Expand Down
197 changes: 127 additions & 70 deletions pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser

// UpdateLoadBalancer updates hosts under the specified load balancer.
func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
if !az.shouldUpdateLoadBalancer(clusterName, service) {
if !az.shouldUpdateLoadBalancer(clusterName, service, nodes) {
klog.V(2).Infof("UpdateLoadBalancer: skipping service %s because it is either being deleted or does not exist anymore", service.Name)
return nil
}
Expand Down Expand Up @@ -435,15 +435,64 @@ func (az *Cloud) cleanOrphanedLoadBalancer(lb *network.LoadBalancer, service *v1
return nil
}

// safeDeleteLoadBalancer deletes the load balancer after decoupling it from the vmSet
func (az *Cloud) safeDeleteLoadBalancer(lb network.LoadBalancer, clusterName, vmSetName string, service *v1.Service) error {
lbBackendPoolID := az.getBackendPoolID(to.String(lb.Name), az.getLoadBalancerResourceGroup(), getBackendPoolName(clusterName, service))
err := az.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools)
if err != nil {
return fmt.Errorf("deleteDedicatedLoadBalancer: failed to EnsureBackendPoolDeleted: %w", err)
}

klog.V(2).Infof("deleteDedicatedLoadBalancer: deleting LB %s because the corresponding vmSet is supposed to be in the primary SLB", to.String(lb.Name))
err = az.DeleteLB(service, to.String(lb.Name))
if err != nil {
return fmt.Errorf("deleteDedicatedLoadBalancer : failed to DeleteLB: %w", err)
}
_ = az.lbCache.Delete(to.String(lb.Name))

return nil
}

func extractBackendIPConfigurationIDsFromLB(lb network.LoadBalancer, lbBackendPoolName string) []string {
result := make([]string, 0)
if lb.LoadBalancerPropertiesFormat != nil &&
lb.BackendAddressPools != nil {
for i := 0; i < len(*lb.BackendAddressPools); i++ {
backendPool := (*lb.BackendAddressPools)[i]
if strings.EqualFold(to.String(backendPool.Name), lbBackendPoolName) {
if backendPool.BackendAddressPoolPropertiesFormat != nil &&
backendPool.BackendIPConfigurations != nil {
for _, ipConfiguration := range *backendPool.BackendIPConfigurations {
if ipConfiguration.ID != nil {
result = append(result, to.String(ipConfiguration.ID))
}
}
}
}
}
}

return result
}

// reconcileSharedLoadBalancer deletes the dedicated SLBs of the non-primary vmSets. There are
// two scenarios where this operation is needed:
// 1. Using multiple slbs and the vmSet is supposed to share the primary slb.
// 2. When migrating from multiple slbs to single slb mode.
// It also ensures those vmSets are joint the backend pools of the primary SLBs.
// It runs only once everytime the cloud controller manager restarts.
func (az *Cloud) reconcileSharedLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node) ([]network.LoadBalancer, error) {
var (
primarySLB network.LoadBalancer
changed bool
ipConfigIDsToBeAddedToPrimarySLB []string
existingLBs []network.LoadBalancer
err error
primarySLBs, existingLBs []network.LoadBalancer
changed bool
err error
)

// skip this operation when wantLb=false
if nodes == nil {
return nil, nil
}

// only run once since the controller manager rebooted
if az.isSharedLoadBalancerSynced {
return nil, nil
Expand All @@ -454,84 +503,92 @@ func (az *Cloud) reconcileSharedLoadBalancer(service *v1.Service, clusterName st
}
}()

// skip if the cluster doesn't enable the multiple slbs mode
useMultipleSLBs := az.useStandardLoadBalancer() && az.EnableMultipleStandardLoadBalancers
if !useMultipleSLBs {
// skip if the cluster is using basic LB
if !az.useStandardLoadBalancer() {
return nil, nil
}

existingLBs, err = az.ListLB(service)
existingLBs, err = az.ListAgentPoolLBs(service, nodes, clusterName)
if err != nil {
return nil, fmt.Errorf("reconcileSharedLoadBalancer: failed to list LB: %w", err)
}

lbBackendPoolName := getBackendPoolName(clusterName, service)
lbNamesToBeDeleted := sets.NewString()
// loop 1: delete unwanted LBs
for _, lb := range existingLBs {
lbNamePrefix := strings.TrimSuffix(to.String(lb.Name), consts.InternalLoadBalancerNameSuffix)

// skip the internal or external primary load balancer
if strings.EqualFold(lbNamePrefix, clusterName) {
primarySLBs = append(primarySLBs, lb)
continue
}

for i := len(existingLBs) - 1; i >= 0; i-- {
lb := existingLBs[i]

// skip the primary load balancer
if strings.EqualFold(to.String(lb.Name), clusterName) {
primarySLB = lb
// skip if the multiple slbs mode is enabled and
// the vmSet is supposed to have dedicated SLBs
vmSetName := strings.ToLower(az.mapLoadBalancerNameToVMSet(to.String(lb.Name), clusterName))
if az.EnableMultipleStandardLoadBalancers && !az.getVMSetNamesSharingPrimarySLB().Has(vmSetName) {
continue
}

// For non-primary load balancer, the lb name is the name of the VMSet.
// If the VMSet name is in az.NodePoolsWithoutDedicatedSLB, we should
// decouple the VMSet from the lb and delete the lb. Then we should
// add the VMSet to the backend pool of the primary slb.
vmSetName := strings.ToLower(to.String(lb.Name))
lbBackendPoolID := az.getBackendPoolID(to.String(lb.Name), az.getLoadBalancerResourceGroup(), getBackendPoolName(clusterName, service))
if az.getVMSetNamesSharingPrimarySLB().Has(vmSetName) {
err = az.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools)
if err != nil {
return nil, fmt.Errorf("reconcileSharedLoadBalancer: failed to EnsureBackendPoolDeleted: %w", err)
}
// decouple the VMSet from the lb and delete the lb. Then adding the VMSet
// to the backend pool of the primary slb.
err = az.safeDeleteLoadBalancer(lb, clusterName, vmSetName, service)
if err != nil {
return nil, err
}

klog.V(2).Infof("reconcileSharedLoadBalancer: deleting LB %s because the corresponding vmSet is sharing the primary SLB", to.String(lb.Name))
err = az.DeleteLB(service, to.String(lb.Name))
if err != nil {
return nil, fmt.Errorf("reconcileSharedLoadBalancer: failed to DeleteLB: %w", err)
}
_ = az.lbCache.Delete(to.String(lb.Name))
// remove the deleted lb from the list and construct a new primary
// lb, so that getServiceLoadBalancer doesn't have to call list api again
lbNamesToBeDeleted.Insert(to.String(lb.Name))
changed = true
}

primaryLBBackendPoolID := az.getBackendPoolID(clusterName, az.getLoadBalancerResourceGroup(), getBackendPoolName(clusterName, service))
err = az.VMSet.EnsureHostsInPool(service, nodes, primaryLBBackendPoolID, vmSetName, false)
if err != nil {
return nil, fmt.Errorf("reconcileSharedLoadBalancer: failed to EnsureHostsInPool: %w", err)
}

if lb.LoadBalancerPropertiesFormat != nil &&
lb.BackendAddressPools != nil {
for i := 0; i < len(*lb.BackendAddressPools); i++ {
backendPool := (*lb.BackendAddressPools)[i]
if strings.EqualFold(to.String(backendPool.Name), lbBackendPoolName) {
if backendPool.BackendAddressPoolPropertiesFormat != nil &&
backendPool.BackendIPConfigurations != nil {
for _, ipConfiguration := range *backendPool.BackendIPConfigurations {
if ipConfiguration.ID != nil {
ipConfigIDsToBeAddedToPrimarySLB = append(ipConfigIDsToBeAddedToPrimarySLB, to.String(ipConfiguration.ID))
}
}
}
}
}
}
if !changed {
klog.V(4).Infof("reconcileSharedLoadBalancer: no changes made, return now")
return existingLBs, nil
}

// remove the deleted lb from the list and construct a new primary
// lb, so that getServiceLoadBalancer doesn't have to call list api again
existingLBs = append(existingLBs[:i], existingLBs[i+1:]...)
changed = true
ipConfigIDsToBeAddedToPrimarySLB := sets.NewString()
// loop2: add nodes to the backend pool of the primary SLBs
for i := len(existingLBs) - 1; i >= 0; i-- {
lb := existingLBs[i]
if !lbNamesToBeDeleted.Has(to.String(lb.Name)) {
continue
}

vmSetName := strings.ToLower(az.mapLoadBalancerNameToVMSet(to.String(lb.Name), clusterName))
isInternalLB := strings.HasSuffix(to.String(lb.Name), consts.InternalLoadBalancerNameSuffix)
primarySLBName := clusterName
if isInternalLB {
primarySLBName = fmt.Sprintf("%s%s", clusterName, consts.InternalLoadBalancerNameSuffix)
}
primaryLBBackendPoolID := az.getBackendPoolID(primarySLBName, az.getLoadBalancerResourceGroup(), getBackendPoolName(clusterName, service))

klog.V(2).Infof("reconcileSharedLoadBalancer: binding the vmSet %s to the backend pool %s", vmSetName, primaryLBBackendPoolID)
err = az.VMSet.EnsureHostsInPool(service, nodes, primaryLBBackendPoolID, vmSetName, false)
if err != nil {
return nil, fmt.Errorf("reconcileSharedLoadBalancer: failed to EnsureHostsInPool: %w", err)
}

for _, id := range extractBackendIPConfigurationIDsFromLB(lb, lbBackendPoolName) {
ipConfigIDsToBeAddedToPrimarySLB.Insert(id)
}

// remove the deleted LB from the list
existingLBs = append(existingLBs[:i], existingLBs[i+1:]...)
}

if changed {
for _, primarySLB := range primarySLBs {
if primarySLB.LoadBalancerPropertiesFormat != nil &&
primarySLB.BackendAddressPools != nil {
for i := 0; i < len(*primarySLB.BackendAddressPools); i++ {
if strings.EqualFold(to.String((*primarySLB.BackendAddressPools)[i].Name), lbBackendPoolName) {
backendPoolIPConfigs := (*primarySLB.BackendAddressPools)[i].BackendIPConfigurations
for _, id := range ipConfigIDsToBeAddedToPrimarySLB {
for _, id := range ipConfigIDsToBeAddedToPrimarySLB.List() {
*backendPoolIPConfigs = append(*backendPoolIPConfigs, network.InterfaceIPConfiguration{
ID: to.StringPtr(id),
})
Expand All @@ -540,16 +597,17 @@ func (az *Cloud) reconcileSharedLoadBalancer(service *v1.Service, clusterName st
}
}
}
}

for i, existingLB := range existingLBs {
if strings.EqualFold(to.String(existingLB.Name), clusterName) {
for i, existingLB := range existingLBs {
for _, primarySLB := range primarySLBs {
if strings.EqualFold(to.String(existingLB.Name), to.String(primarySLB.Name)) {
// Proactively disable the etag to prevent etag mismatch error when putting lb later.
// This could be happen because when we remove the hosts from the lb, the nrp
// would put the lb to remove the backend references as well.
primarySLB.Etag = nil

existingLBs[i] = primarySLB
return existingLBs, nil
}
}
}
Expand Down Expand Up @@ -580,19 +638,18 @@ func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string,
// check if the service already has a load balancer
for i := range existingLBs {
existingLB := existingLBs[i]
existingLBNamePrefix := strings.TrimSuffix(to.String(existingLB.Name), consts.InternalLoadBalancerNameSuffix)

// for the primary standard load balancer, when enabled multiple slbs
if strings.EqualFold(to.String(existingLB.Name), clusterName) && useMultipleSLBs {
// there are two conditions we need to remove the vmSet from the
// backend pool of the primary SLB when enabling multiple SLBs:
// for the primary standard load balancer (internal or external), when enabled multiple slbs
if strings.EqualFold(existingLBNamePrefix, clusterName) && useMultipleSLBs {
shouldRemoveVMSetFromSLB := func(vmSetName string) bool {
// condition 1: not removing the vmSet from the primary SLB
// not removing the vmSet from the primary SLB
// if it is supposed to share the primary SLB.
if az.getVMSetNamesSharingPrimarySLB().Has(strings.ToLower(vmSetName)) {
return false
}

// condition 2: removing the vmSet from the primary SLB if
// removing the vmSet from the primary SLB if
// it is not the primary vmSet. There are two situations:
// 1. when migrating from single SLB to multiple SLBs, we
// need to remove all non-primary vmSets from the primary SLB;
Expand Down Expand Up @@ -2332,8 +2389,8 @@ func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, s
return expectedSecurityRules, nil
}

func (az *Cloud) shouldUpdateLoadBalancer(clusterName string, service *v1.Service) bool {
_, _, existsLb, _ := az.getServiceLoadBalancer(service, clusterName, nil, false, []network.LoadBalancer{})
func (az *Cloud) shouldUpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) bool {
_, _, existsLb, _ := az.getServiceLoadBalancer(service, clusterName, nodes, false, []network.LoadBalancer{})
return existsLb && service.ObjectMeta.DeletionTimestamp == nil
}

Expand Down
Loading

0 comments on commit 8d3bdbd

Please sign in to comment.