kubernetes aws_loadbalancer 源码

  • 2022-09-18
  • 浏览 (181)

kubernetes aws_loadbalancer 代码

文件路径:/staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go

//go:build !providerless
// +build !providerless

/*
Copyright 2014 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws

import (
	"crypto/sha1"
	"encoding/hex"
	"fmt"
	"reflect"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/awserr"
	"github.com/aws/aws-sdk-go/service/ec2"
	"github.com/aws/aws-sdk-go/service/elb"
	"github.com/aws/aws-sdk-go/service/elbv2"
	v1 "k8s.io/api/core/v1"
	"k8s.io/klog/v2"

	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/sets"
)

const (
	// ProxyProtocolPolicyName is the tag named used for the proxy protocol
	// policy
	ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled"

	// SSLNegotiationPolicyNameFormat is a format string used for the SSL
	// negotiation policy tag name
	SSLNegotiationPolicyNameFormat = "k8s-SSLNegotiationPolicy-%s"

	lbAttrLoadBalancingCrossZoneEnabled = "load_balancing.cross_zone.enabled"
	lbAttrAccessLogsS3Enabled           = "access_logs.s3.enabled"
	lbAttrAccessLogsS3Bucket            = "access_logs.s3.bucket"
	lbAttrAccessLogsS3Prefix            = "access_logs.s3.prefix"

	// defaultEC2InstanceCacheMaxAge is the max age for the EC2 instance cache
	defaultEC2InstanceCacheMaxAge = 10 * time.Minute
)

var (
	// Defaults for ELB Healthcheck
	defaultElbHCHealthyThreshold   = int64(2)
	defaultElbHCUnhealthyThreshold = int64(6)
	defaultElbHCTimeout            = int64(5)
	defaultElbHCInterval           = int64(10)
	defaultNlbHealthCheckInterval  = int64(30)
	defaultNlbHealthCheckTimeout   = int64(10)
	defaultNlbHealthCheckThreshold = int64(3)
	defaultHealthCheckPort         = "traffic-port"
	defaultHealthCheckPath         = "/"

	// Defaults for ELB Target operations
	defaultRegisterTargetsChunkSize   = 100
	defaultDeregisterTargetsChunkSize = 100
)

func isNLB(annotations map[string]string) bool {
	if annotations[ServiceAnnotationLoadBalancerType] == "nlb" {
		return true
	}
	return false
}

func isLBExternal(annotations map[string]string) bool {
	if val := annotations[ServiceAnnotationLoadBalancerType]; val == "nlb-ip" || val == "external" {
		return true
	}
	return false
}

type healthCheckConfig struct {
	Port               string
	Path               string
	Protocol           string
	Interval           int64
	Timeout            int64
	HealthyThreshold   int64
	UnhealthyThreshold int64
}

type nlbPortMapping struct {
	FrontendPort     int64
	FrontendProtocol string

	TrafficPort     int64
	TrafficProtocol string

	SSLCertificateARN string
	SSLPolicy         string
	HealthCheckConfig healthCheckConfig
}

// getKeyValuePropertiesFromAnnotation converts the comma separated list of key-value
// pairs from the specified annotation and returns it as a map.
func getKeyValuePropertiesFromAnnotation(annotations map[string]string, annotation string) map[string]string {
	additionalTags := make(map[string]string)
	if additionalTagsList, ok := annotations[annotation]; ok {
		additionalTagsList = strings.TrimSpace(additionalTagsList)

		// Break up list of "Key1=Val,Key2=Val2"
		tagList := strings.Split(additionalTagsList, ",")

		// Break up "Key=Val"
		for _, tagSet := range tagList {
			tag := strings.Split(strings.TrimSpace(tagSet), "=")

			// Accept "Key=val" or "Key=" or just "Key"
			if len(tag) >= 2 && len(tag[0]) != 0 {
				// There is a key and a value, so save it
				additionalTags[tag[0]] = tag[1]
			} else if len(tag) == 1 && len(tag[0]) != 0 {
				// Just "Key"
				additionalTags[tag[0]] = ""
			}
		}
	}

	return additionalTags
}

// ensureLoadBalancerv2 ensures a v2 load balancer is created
func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) {
	loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName)
	if err != nil {
		return nil, err
	}

	dirty := false

	// Get additional tags set by the user
	tags := getKeyValuePropertiesFromAnnotation(annotations, ServiceAnnotationLoadBalancerAdditionalTags)
	// Add default tags
	tags[TagNameKubernetesService] = namespacedName.String()
	tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)

	if loadBalancer == nil {
		// Create the LB
		createRequest := &elbv2.CreateLoadBalancerInput{
			Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork),
			Name: aws.String(loadBalancerName),
		}
		if internalELB {
			createRequest.Scheme = aws.String("internal")
		}

		var allocationIDs []string
		if eipList, present := annotations[ServiceAnnotationLoadBalancerEIPAllocations]; present {
			allocationIDs = strings.Split(eipList, ",")
			if len(allocationIDs) != len(subnetIDs) {
				return nil, fmt.Errorf("error creating load balancer: Must have same number of EIP AllocationIDs (%d) and SubnetIDs (%d)", len(allocationIDs), len(subnetIDs))
			}
		}

		// We are supposed to specify one subnet per AZ.
		// TODO: What happens if we have more than one subnet per AZ?
		createRequest.SubnetMappings = createSubnetMappings(subnetIDs, allocationIDs)

		for k, v := range tags {
			createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{
				Key: aws.String(k), Value: aws.String(v),
			})
		}

		klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
		createResponse, err := c.elbv2.CreateLoadBalancer(createRequest)
		if err != nil {
			return nil, fmt.Errorf("error creating load balancer: %q", err)
		}

		loadBalancer = createResponse.LoadBalancers[0]
		for i := range mappings {
			// It is easier to keep track of updates by having possibly
			// duplicate target groups where the backend port is the same
			_, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId, tags)
			if err != nil {
				return nil, fmt.Errorf("error creating listener: %q", err)
			}
		}
		if err := c.reconcileLBAttributes(aws.StringValue(loadBalancer.LoadBalancerArn), annotations); err != nil {
			return nil, err
		}
	} else {
		// TODO: Sync internal vs non-internal

		// sync mappings
		{
			listenerDescriptions, err := c.elbv2.DescribeListeners(
				&elbv2.DescribeListenersInput{
					LoadBalancerArn: loadBalancer.LoadBalancerArn,
				},
			)
			if err != nil {
				return nil, fmt.Errorf("error describing listeners: %q", err)
			}

			// actual maps FrontendPort to an elbv2.Listener
			actual := map[int64]map[string]*elbv2.Listener{}
			for _, listener := range listenerDescriptions.Listeners {
				if actual[*listener.Port] == nil {
					actual[*listener.Port] = map[string]*elbv2.Listener{}
				}
				actual[*listener.Port][*listener.Protocol] = listener
			}

			actualTargetGroups, err := c.elbv2.DescribeTargetGroups(
				&elbv2.DescribeTargetGroupsInput{
					LoadBalancerArn: loadBalancer.LoadBalancerArn,
				},
			)
			if err != nil {
				return nil, fmt.Errorf("error listing target groups: %q", err)
			}

			nodePortTargetGroup := map[int64]*elbv2.TargetGroup{}
			for _, targetGroup := range actualTargetGroups.TargetGroups {
				nodePortTargetGroup[*targetGroup.Port] = targetGroup
			}

			// Handle additions/modifications
			for _, mapping := range mappings {
				frontendPort := mapping.FrontendPort
				frontendProtocol := mapping.FrontendProtocol
				nodePort := mapping.TrafficPort
				// modifications
				if listener, ok := actual[frontendPort][frontendProtocol]; ok {
					listenerNeedsModification := false

					if aws.StringValue(listener.Protocol) != mapping.FrontendProtocol {
						listenerNeedsModification = true
					}
					switch mapping.FrontendProtocol {
					case elbv2.ProtocolEnumTls:
						{
							if aws.StringValue(listener.SslPolicy) != mapping.SSLPolicy {
								listenerNeedsModification = true
							}
							if len(listener.Certificates) == 0 || aws.StringValue(listener.Certificates[0].CertificateArn) != mapping.SSLCertificateARN {
								listenerNeedsModification = true
							}
						}
					case elbv2.ProtocolEnumTcp:
						{
							if aws.StringValue(listener.SslPolicy) != "" {
								listenerNeedsModification = true
							}
							if len(listener.Certificates) != 0 {
								listenerNeedsModification = true
							}
						}
					}

					// recreate targetGroup if trafficPort, protocol or HealthCheckProtocol changed
					healthCheckModified := false
					targetGroupRecreated := false
					targetGroup, ok := nodePortTargetGroup[nodePort]

					if targetGroup != nil && (!strings.EqualFold(mapping.HealthCheckConfig.Protocol, aws.StringValue(targetGroup.HealthCheckProtocol)) ||
						mapping.HealthCheckConfig.Interval != aws.Int64Value(targetGroup.HealthCheckIntervalSeconds)) {
						healthCheckModified = true
					}

					if !ok || aws.StringValue(targetGroup.Protocol) != mapping.TrafficProtocol || healthCheckModified {
						// create new target group
						targetGroup, err = c.ensureTargetGroup(
							nil,
							namespacedName,
							mapping,
							instanceIDs,
							*loadBalancer.VpcId,
							tags,
						)
						if err != nil {
							return nil, err
						}
						targetGroupRecreated = true
						listenerNeedsModification = true
					}

					if listenerNeedsModification {
						modifyListenerInput := &elbv2.ModifyListenerInput{
							ListenerArn: listener.ListenerArn,
							Port:        aws.Int64(frontendPort),
							Protocol:    aws.String(mapping.FrontendProtocol),
							DefaultActions: []*elbv2.Action{{
								TargetGroupArn: targetGroup.TargetGroupArn,
								Type:           aws.String("forward"),
							}},
						}
						if mapping.FrontendProtocol == elbv2.ProtocolEnumTls {
							if mapping.SSLPolicy != "" {
								modifyListenerInput.SslPolicy = aws.String(mapping.SSLPolicy)
							}
							modifyListenerInput.Certificates = []*elbv2.Certificate{
								{
									CertificateArn: aws.String(mapping.SSLCertificateARN),
								},
							}
						}
						if _, err := c.elbv2.ModifyListener(modifyListenerInput); err != nil {
							return nil, fmt.Errorf("error updating load balancer listener: %q", err)
						}
					}

					// Delete old targetGroup if needed
					if targetGroupRecreated {
						if _, err := c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{
							TargetGroupArn: listener.DefaultActions[0].TargetGroupArn,
						}); err != nil {
							return nil, fmt.Errorf("error deleting old target group: %q", err)
						}
					} else {
						// Run ensureTargetGroup to make sure instances in service are up-to-date
						_, err = c.ensureTargetGroup(
							targetGroup,
							namespacedName,
							mapping,
							instanceIDs,
							*loadBalancer.VpcId,
							tags,
						)
						if err != nil {
							return nil, err
						}
					}
					dirty = true
					continue
				}

				// Additions
				_, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId, tags)
				if err != nil {
					return nil, err
				}
				dirty = true
			}

			frontEndPorts := map[int64]map[string]bool{}
			for i := range mappings {
				if frontEndPorts[mappings[i].FrontendPort] == nil {
					frontEndPorts[mappings[i].FrontendPort] = map[string]bool{}
				}
				frontEndPorts[mappings[i].FrontendPort][mappings[i].FrontendProtocol] = true
			}

			// handle deletions
			for port := range actual {
				for protocol := range actual[port] {
					if _, ok := frontEndPorts[port][protocol]; !ok {
						err := c.deleteListenerV2(actual[port][protocol])
						if err != nil {
							return nil, err
						}
						dirty = true
					}
				}
			}
		}
		if err := c.reconcileLBAttributes(aws.StringValue(loadBalancer.LoadBalancerArn), annotations); err != nil {
			return nil, err
		}

		// Subnets cannot be modified on NLBs
		if dirty {
			loadBalancers, err := c.elbv2.DescribeLoadBalancers(
				&elbv2.DescribeLoadBalancersInput{
					LoadBalancerArns: []*string{
						loadBalancer.LoadBalancerArn,
					},
				},
			)
			if err != nil {
				return nil, fmt.Errorf("error retrieving load balancer after update: %q", err)
			}
			loadBalancer = loadBalancers.LoadBalancers[0]
		}
	}
	return loadBalancer, nil
}

func (c *Cloud) reconcileLBAttributes(loadBalancerArn string, annotations map[string]string) error {
	desiredLoadBalancerAttributes := map[string]string{}

	desiredLoadBalancerAttributes[lbAttrLoadBalancingCrossZoneEnabled] = "false"
	crossZoneLoadBalancingEnabledAnnotation := annotations[ServiceAnnotationLoadBalancerCrossZoneLoadBalancingEnabled]
	if crossZoneLoadBalancingEnabledAnnotation != "" {
		crossZoneEnabled, err := strconv.ParseBool(crossZoneLoadBalancingEnabledAnnotation)
		if err != nil {
			return fmt.Errorf("error parsing service annotation: %s=%s",
				ServiceAnnotationLoadBalancerCrossZoneLoadBalancingEnabled,
				crossZoneLoadBalancingEnabledAnnotation,
			)
		}

		if crossZoneEnabled {
			desiredLoadBalancerAttributes[lbAttrLoadBalancingCrossZoneEnabled] = "true"
		}
	}

	desiredLoadBalancerAttributes[lbAttrAccessLogsS3Enabled] = "false"
	accessLogsS3EnabledAnnotation := annotations[ServiceAnnotationLoadBalancerAccessLogEnabled]
	if accessLogsS3EnabledAnnotation != "" {
		accessLogsS3Enabled, err := strconv.ParseBool(accessLogsS3EnabledAnnotation)
		if err != nil {
			return fmt.Errorf("error parsing service annotation: %s=%s",
				ServiceAnnotationLoadBalancerAccessLogEnabled,
				accessLogsS3EnabledAnnotation,
			)
		}

		if accessLogsS3Enabled {
			desiredLoadBalancerAttributes[lbAttrAccessLogsS3Enabled] = "true"
		}
	}

	desiredLoadBalancerAttributes[lbAttrAccessLogsS3Bucket] = annotations[ServiceAnnotationLoadBalancerAccessLogS3BucketName]
	desiredLoadBalancerAttributes[lbAttrAccessLogsS3Prefix] = annotations[ServiceAnnotationLoadBalancerAccessLogS3BucketPrefix]

	currentLoadBalancerAttributes := map[string]string{}
	describeAttributesOutput, err := c.elbv2.DescribeLoadBalancerAttributes(&elbv2.DescribeLoadBalancerAttributesInput{
		LoadBalancerArn: aws.String(loadBalancerArn),
	})
	if err != nil {
		return fmt.Errorf("unable to retrieve load balancer attributes during attribute sync: %q", err)
	}
	for _, attr := range describeAttributesOutput.Attributes {
		currentLoadBalancerAttributes[aws.StringValue(attr.Key)] = aws.StringValue(attr.Value)
	}

	var changedAttributes []*elbv2.LoadBalancerAttribute
	if desiredLoadBalancerAttributes[lbAttrLoadBalancingCrossZoneEnabled] != currentLoadBalancerAttributes[lbAttrLoadBalancingCrossZoneEnabled] {
		changedAttributes = append(changedAttributes, &elbv2.LoadBalancerAttribute{
			Key:   aws.String(lbAttrLoadBalancingCrossZoneEnabled),
			Value: aws.String(desiredLoadBalancerAttributes[lbAttrLoadBalancingCrossZoneEnabled]),
		})
	}
	if desiredLoadBalancerAttributes[lbAttrAccessLogsS3Enabled] != currentLoadBalancerAttributes[lbAttrAccessLogsS3Enabled] {
		changedAttributes = append(changedAttributes, &elbv2.LoadBalancerAttribute{
			Key:   aws.String(lbAttrAccessLogsS3Enabled),
			Value: aws.String(desiredLoadBalancerAttributes[lbAttrAccessLogsS3Enabled]),
		})
	}

	// ELBV2 API forbids us to set bucket to an empty bucket, so we keep it unchanged if AccessLogsS3Enabled==false.
	if desiredLoadBalancerAttributes[lbAttrAccessLogsS3Enabled] == "true" {
		if desiredLoadBalancerAttributes[lbAttrAccessLogsS3Bucket] != currentLoadBalancerAttributes[lbAttrAccessLogsS3Bucket] {
			changedAttributes = append(changedAttributes, &elbv2.LoadBalancerAttribute{
				Key:   aws.String(lbAttrAccessLogsS3Bucket),
				Value: aws.String(desiredLoadBalancerAttributes[lbAttrAccessLogsS3Bucket]),
			})
		}
		if desiredLoadBalancerAttributes[lbAttrAccessLogsS3Prefix] != currentLoadBalancerAttributes[lbAttrAccessLogsS3Prefix] {
			changedAttributes = append(changedAttributes, &elbv2.LoadBalancerAttribute{
				Key:   aws.String(lbAttrAccessLogsS3Prefix),
				Value: aws.String(desiredLoadBalancerAttributes[lbAttrAccessLogsS3Prefix]),
			})
		}
	}

	if len(changedAttributes) > 0 {
		klog.V(2).Infof("updating load-balancer attributes for %q", loadBalancerArn)

		_, err = c.elbv2.ModifyLoadBalancerAttributes(&elbv2.ModifyLoadBalancerAttributesInput{
			LoadBalancerArn: aws.String(loadBalancerArn),
			Attributes:      changedAttributes,
		})
		if err != nil {
			return fmt.Errorf("unable to update load balancer attributes during attribute sync: %q", err)
		}
	}
	return nil
}

var invalidELBV2NameRegex = regexp.MustCompile("[^[:alnum:]]")

// buildTargetGroupName will build unique name for targetGroup of service & port.
// the name is in format k8s-{namespace:8}-{name:8}-{uuid:10} (chosen to benefit most common use cases).
// Note: nodePort & targetProtocol & targetType are included since they cannot be modified on existing targetGroup.
func (c *Cloud) buildTargetGroupName(serviceName types.NamespacedName, servicePort int64, nodePort int64, targetProtocol string, targetType string, mapping nlbPortMapping) string {
	hasher := sha1.New()
	_, _ = hasher.Write([]byte(c.tagging.clusterID()))
	_, _ = hasher.Write([]byte(serviceName.Namespace))
	_, _ = hasher.Write([]byte(serviceName.Name))
	_, _ = hasher.Write([]byte(strconv.FormatInt(servicePort, 10)))
	_, _ = hasher.Write([]byte(strconv.FormatInt(nodePort, 10)))
	_, _ = hasher.Write([]byte(targetProtocol))
	_, _ = hasher.Write([]byte(targetType))
	_, _ = hasher.Write([]byte(mapping.HealthCheckConfig.Protocol))
	_, _ = hasher.Write([]byte(strconv.FormatInt(mapping.HealthCheckConfig.Interval, 10)))
	tgUUID := hex.EncodeToString(hasher.Sum(nil))

	sanitizedNamespace := invalidELBV2NameRegex.ReplaceAllString(serviceName.Namespace, "")
	sanitizedServiceName := invalidELBV2NameRegex.ReplaceAllString(serviceName.Name, "")
	return fmt.Sprintf("k8s-%.8s-%.8s-%.10s", sanitizedNamespace, sanitizedServiceName, tgUUID)
}

func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string, tags map[string]string) (listener *elbv2.Listener, err error) {
	target, err := c.ensureTargetGroup(
		nil,
		namespacedName,
		mapping,
		instanceIDs,
		vpcID,
		tags,
	)
	if err != nil {
		return nil, err
	}

	createListernerInput := &elbv2.CreateListenerInput{
		LoadBalancerArn: loadBalancerArn,
		Port:            aws.Int64(mapping.FrontendPort),
		Protocol:        aws.String(mapping.FrontendProtocol),
		DefaultActions: []*elbv2.Action{{
			TargetGroupArn: target.TargetGroupArn,
			Type:           aws.String(elbv2.ActionTypeEnumForward),
		}},
	}
	if mapping.FrontendProtocol == "TLS" {
		if mapping.SSLPolicy != "" {
			createListernerInput.SslPolicy = aws.String(mapping.SSLPolicy)
		}
		createListernerInput.Certificates = []*elbv2.Certificate{
			{
				CertificateArn: aws.String(mapping.SSLCertificateARN),
			},
		}
	}

	klog.Infof("Creating load balancer listener for %v", namespacedName)
	createListenerOutput, err := c.elbv2.CreateListener(createListernerInput)
	if err != nil {
		return nil, fmt.Errorf("error creating load balancer listener: %q", err)
	}
	return createListenerOutput.Listeners[0], nil
}

// cleans up listener and corresponding target group
func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
	_, err := c.elbv2.DeleteListener(&elbv2.DeleteListenerInput{ListenerArn: listener.ListenerArn})
	if err != nil {
		return fmt.Errorf("error deleting load balancer listener: %q", err)
	}
	_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{TargetGroupArn: listener.DefaultActions[0].TargetGroupArn})
	if err != nil {
		return fmt.Errorf("error deleting load balancer target group: %q", err)
	}
	return nil
}

// ensureTargetGroup creates a target group with a set of instances.
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName types.NamespacedName, mapping nlbPortMapping, instances []string, vpcID string, tags map[string]string) (*elbv2.TargetGroup, error) {
	dirty := false
	expectedTargets := c.computeTargetGroupExpectedTargets(instances, mapping.TrafficPort)
	if targetGroup == nil {
		targetType := "instance"
		name := c.buildTargetGroupName(serviceName, mapping.FrontendPort, mapping.TrafficPort, mapping.TrafficProtocol, targetType, mapping)
		klog.Infof("Creating load balancer target group for %v with name: %s", serviceName, name)
		input := &elbv2.CreateTargetGroupInput{
			VpcId:                      aws.String(vpcID),
			Name:                       aws.String(name),
			Port:                       aws.Int64(mapping.TrafficPort),
			Protocol:                   aws.String(mapping.TrafficProtocol),
			TargetType:                 aws.String(targetType),
			HealthCheckIntervalSeconds: aws.Int64(mapping.HealthCheckConfig.Interval),
			HealthCheckPort:            aws.String(mapping.HealthCheckConfig.Port),
			HealthCheckProtocol:        aws.String(mapping.HealthCheckConfig.Protocol),
			HealthyThresholdCount:      aws.Int64(mapping.HealthCheckConfig.HealthyThreshold),
			UnhealthyThresholdCount:    aws.Int64(mapping.HealthCheckConfig.UnhealthyThreshold),
			// HealthCheckTimeoutSeconds:  Currently not configurable, 6 seconds for HTTP, 10 for TCP/HTTPS
		}

		if mapping.HealthCheckConfig.Protocol != elbv2.ProtocolEnumTcp {
			input.HealthCheckPath = aws.String(mapping.HealthCheckConfig.Path)
		}

		result, err := c.elbv2.CreateTargetGroup(input)
		if err != nil {
			return nil, fmt.Errorf("error creating load balancer target group: %q", err)
		}
		if len(result.TargetGroups) != 1 {
			return nil, fmt.Errorf("expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups))
		}

		if len(tags) != 0 {
			targetGroupTags := make([]*elbv2.Tag, 0, len(tags))
			for k, v := range tags {
				targetGroupTags = append(targetGroupTags, &elbv2.Tag{
					Key: aws.String(k), Value: aws.String(v),
				})
			}
			tgArn := aws.StringValue(result.TargetGroups[0].TargetGroupArn)
			if _, err := c.elbv2.AddTags(&elbv2.AddTagsInput{
				ResourceArns: []*string{aws.String(tgArn)},
				Tags:         targetGroupTags,
			}); err != nil {
				return nil, fmt.Errorf("error adding tags for targetGroup %s due to %q", tgArn, err)
			}
		}

		tg := result.TargetGroups[0]
		tgARN := aws.StringValue(tg.TargetGroupArn)
		if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, nil); err != nil {
			return nil, err
		}
		return tg, nil
	}

	// handle instances in service
	{
		tgARN := aws.StringValue(targetGroup.TargetGroupArn)
		actualTargets, err := c.obtainTargetGroupActualTargets(tgARN)
		if err != nil {
			return nil, err
		}
		if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, actualTargets); err != nil {
			return nil, err
		}
	}

	// ensure the health check is correct
	{
		dirtyHealthCheck := false

		input := &elbv2.ModifyTargetGroupInput{
			TargetGroupArn: targetGroup.TargetGroupArn,
		}
		if mapping.HealthCheckConfig.Port != aws.StringValue(targetGroup.HealthCheckPort) {
			input.HealthCheckPort = aws.String(mapping.HealthCheckConfig.Port)
			dirtyHealthCheck = true
		}
		if mapping.HealthCheckConfig.HealthyThreshold != aws.Int64Value(targetGroup.HealthyThresholdCount) {
			dirtyHealthCheck = true
			input.HealthyThresholdCount = aws.Int64(mapping.HealthCheckConfig.HealthyThreshold)
			input.UnhealthyThresholdCount = aws.Int64(mapping.HealthCheckConfig.UnhealthyThreshold)
		}
		if !strings.EqualFold(mapping.HealthCheckConfig.Protocol, elbv2.ProtocolEnumTcp) {
			if mapping.HealthCheckConfig.Path != aws.StringValue(input.HealthCheckPath) {
				input.HealthCheckPath = aws.String(mapping.HealthCheckConfig.Path)
				dirtyHealthCheck = true
			}
		}

		if dirtyHealthCheck {
			_, err := c.elbv2.ModifyTargetGroup(input)
			if err != nil {
				return nil, fmt.Errorf("error modifying target group health check: %q", err)
			}

			dirty = true
		}
	}

	if dirty {
		result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
			TargetGroupArns: []*string{targetGroup.TargetGroupArn},
		})
		if err != nil {
			return nil, fmt.Errorf("error retrieving target group after creation/update: %q", err)
		}
		targetGroup = result.TargetGroups[0]
	}

	return targetGroup, nil
}

func (c *Cloud) ensureTargetGroupTargets(tgARN string, expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) error {
	targetsToRegister, targetsToDeregister := c.diffTargetGroupTargets(expectedTargets, actualTargets)
	if len(targetsToRegister) > 0 {
		targetsToRegisterChunks := c.chunkTargetDescriptions(targetsToRegister, defaultRegisterTargetsChunkSize)
		for _, targetsChunk := range targetsToRegisterChunks {
			req := &elbv2.RegisterTargetsInput{
				TargetGroupArn: aws.String(tgARN),
				Targets:        targetsChunk,
			}
			if _, err := c.elbv2.RegisterTargets(req); err != nil {
				return fmt.Errorf("error trying to register targets in target group: %q", err)
			}
		}
	}
	if len(targetsToDeregister) > 0 {
		targetsToDeregisterChunks := c.chunkTargetDescriptions(targetsToDeregister, defaultDeregisterTargetsChunkSize)
		for _, targetsChunk := range targetsToDeregisterChunks {
			req := &elbv2.DeregisterTargetsInput{
				TargetGroupArn: aws.String(tgARN),
				Targets:        targetsChunk,
			}
			if _, err := c.elbv2.DeregisterTargets(req); err != nil {
				return fmt.Errorf("error trying to deregister targets in target group: %q", err)
			}
		}
	}
	return nil
}

func (c *Cloud) computeTargetGroupExpectedTargets(instanceIDs []string, port int64) []*elbv2.TargetDescription {
	expectedTargets := make([]*elbv2.TargetDescription, 0, len(instanceIDs))
	for _, instanceID := range instanceIDs {
		expectedTargets = append(expectedTargets, &elbv2.TargetDescription{
			Id:   aws.String(instanceID),
			Port: aws.Int64(port),
		})
	}
	return expectedTargets
}

func (c *Cloud) obtainTargetGroupActualTargets(tgARN string) ([]*elbv2.TargetDescription, error) {
	req := &elbv2.DescribeTargetHealthInput{
		TargetGroupArn: aws.String(tgARN),
	}
	resp, err := c.elbv2.DescribeTargetHealth(req)
	if err != nil {
		return nil, fmt.Errorf("error describing target group health: %q", err)
	}
	actualTargets := make([]*elbv2.TargetDescription, 0, len(resp.TargetHealthDescriptions))
	for _, targetDesc := range resp.TargetHealthDescriptions {
		if targetDesc.TargetHealth.Reason != nil && aws.StringValue(targetDesc.TargetHealth.Reason) == elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress {
			continue
		}
		actualTargets = append(actualTargets, targetDesc.Target)
	}
	return actualTargets, nil
}

// diffTargetGroupTargets computes the targets to register and targets to deregister based on existingTargets and desired instances.
func (c *Cloud) diffTargetGroupTargets(expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) (targetsToRegister []*elbv2.TargetDescription, targetsToDeregister []*elbv2.TargetDescription) {
	expectedTargetsByUID := make(map[string]*elbv2.TargetDescription, len(expectedTargets))
	for _, target := range expectedTargets {
		targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
		expectedTargetsByUID[targetUID] = target
	}
	actualTargetsByUID := make(map[string]*elbv2.TargetDescription, len(actualTargets))
	for _, target := range actualTargets {
		targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
		actualTargetsByUID[targetUID] = target
	}

	expectedTargetsUIDs := sets.StringKeySet(expectedTargetsByUID)
	actualTargetsUIDs := sets.StringKeySet(actualTargetsByUID)
	for _, targetUID := range expectedTargetsUIDs.Difference(actualTargetsUIDs).List() {
		targetsToRegister = append(targetsToRegister, expectedTargetsByUID[targetUID])
	}
	for _, targetUID := range actualTargetsUIDs.Difference(expectedTargetsUIDs).List() {
		targetsToDeregister = append(targetsToDeregister, actualTargetsByUID[targetUID])
	}
	return targetsToRegister, targetsToDeregister
}

// chunkTargetDescriptions will split slice of TargetDescription into chunks
func (c *Cloud) chunkTargetDescriptions(targets []*elbv2.TargetDescription, chunkSize int) [][]*elbv2.TargetDescription {
	var chunks [][]*elbv2.TargetDescription
	for i := 0; i < len(targets); i += chunkSize {
		end := i + chunkSize
		if end > len(targets) {
			end = len(targets)
		}
		chunks = append(chunks, targets[i:end])
	}
	return chunks
}

// updateInstanceSecurityGroupsForNLB will adjust securityGroup's settings to allow inbound traffic into instances from clientCIDRs and portMappings.
// TIP: if either instances or clientCIDRs or portMappings are nil, then the securityGroup rules for lbName are cleared.
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
	if c.cfg.Global.DisableSecurityGroupIngress {
		return nil
	}

	clusterSGs, err := c.getTaggedSecurityGroups()
	if err != nil {
		return fmt.Errorf("error querying for tagged security groups: %q", err)
	}
	// scan instances for groups we want to open
	desiredSGIDs := sets.String{}
	for _, instance := range instances {
		sg, err := findSecurityGroupForInstance(instance, clusterSGs)
		if err != nil {
			return err
		}
		if sg == nil {
			klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId))
			continue
		}
		desiredSGIDs.Insert(aws.StringValue(sg.GroupId))
	}

	// TODO(@M00nF1sh): do we really needs to support SG without cluster tag at current version?
	// findSecurityGroupForInstance might return SG that are not tagged.
	{
		for sgID := range desiredSGIDs.Difference(sets.StringKeySet(clusterSGs)) {
			sg, err := c.findSecurityGroup(sgID)
			if err != nil {
				return fmt.Errorf("error finding instance group: %q", err)
			}
			clusterSGs[sgID] = sg
		}
	}

	{
		clientPorts := sets.Int64{}
		clientProtocol := "tcp"
		healthCheckPorts := sets.Int64{}
		for _, port := range portMappings {
			clientPorts.Insert(port.TrafficPort)
			hcPort := port.TrafficPort
			if port.HealthCheckConfig.Port != defaultHealthCheckPort {
				var err error
				if hcPort, err = strconv.ParseInt(port.HealthCheckConfig.Port, 10, 0); err != nil {
					return fmt.Errorf("Invalid health check port %v", port.HealthCheckConfig.Port)
				}
			}
			healthCheckPorts.Insert(hcPort)
			if port.TrafficProtocol == string(v1.ProtocolUDP) {
				clientProtocol = "udp"
			}
		}
		clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
		healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
		for sgID, sg := range clusterSGs {
			sgPerms := NewIPPermissionSet(sg.IpPermissions...).Ungroup()
			if desiredSGIDs.Has(sgID) {
				// If the client rule is 1) all addresses 2) tcp and 3) has same ports as the healthcheck,
				// then the health rules are a subset of the client rule and are not needed.
				if len(clientCIDRs) != 1 || clientCIDRs[0] != "0.0.0.0/0" || clientProtocol != "tcp" || !healthCheckPorts.Equal(clientPorts) {
					if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", healthCheckPorts, subnetCIDRs); err != nil {
						return err
					}
				}
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, clientPorts, clientCIDRs); err != nil {
					return err
				}
			} else {
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", nil, nil); err != nil {
					return err
				}
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, nil, nil); err != nil {
					return err
				}
			}
			if !sgPerms.Equal(NewIPPermissionSet(sg.IpPermissions...).Ungroup()) {
				if err := c.updateInstanceSecurityGroupForNLBMTU(sgID, sgPerms); err != nil {
					return err
				}
			}
		}
	}
	return nil
}

// updateInstanceSecurityGroupForNLBTraffic will manage permissions set(identified by ruleDesc) on securityGroup to match desired set(allow protocol traffic from ports/cidr).
// Note: sgPerms will be updated to reflect the current permission set on SG after update.
func (c *Cloud) updateInstanceSecurityGroupForNLBTraffic(sgID string, sgPerms IPPermissionSet, ruleDesc string, protocol string, ports sets.Int64, cidrs []string) error {
	desiredPerms := NewIPPermissionSet()
	for port := range ports {
		for _, cidr := range cidrs {
			desiredPerms.Insert(&ec2.IpPermission{
				IpProtocol: aws.String(protocol),
				FromPort:   aws.Int64(port),
				ToPort:     aws.Int64(port),
				IpRanges: []*ec2.IpRange{
					{
						CidrIp:      aws.String(cidr),
						Description: aws.String(ruleDesc),
					},
				},
			})
		}
	}

	permsToGrant := desiredPerms.Difference(sgPerms)
	permsToRevoke := sgPerms.Difference(desiredPerms)
	permsToRevoke.DeleteIf(IPPermissionNotMatch{IPPermissionMatchDesc{ruleDesc}})
	if len(permsToRevoke) > 0 {
		permsToRevokeList := permsToRevoke.List()
		changed, err := c.removeSecurityGroupIngress(sgID, permsToRevokeList)
		if err != nil {
			klog.Warningf("Error remove traffic permission from security group: %q", err)
			return err
		}
		if !changed {
			klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", sgID)
		}
		sgPerms.Delete(permsToRevokeList...)
	}
	if len(permsToGrant) > 0 {
		permsToGrantList := permsToGrant.List()
		changed, err := c.addSecurityGroupIngress(sgID, permsToGrantList)
		if err != nil {
			klog.Warningf("Error add traffic permission to security group: %q", err)
			return err
		}
		if !changed {
			klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", sgID)
		}
		sgPerms.Insert(permsToGrantList...)
	}
	return nil
}

// Note: sgPerms will be updated to reflect the current permission set on SG after update.
func (c *Cloud) updateInstanceSecurityGroupForNLBMTU(sgID string, sgPerms IPPermissionSet) error {
	desiredPerms := NewIPPermissionSet()
	for _, perm := range sgPerms {
		for _, ipRange := range perm.IpRanges {
			if strings.Contains(aws.StringValue(ipRange.Description), NLBClientRuleDescription) {
				desiredPerms.Insert(&ec2.IpPermission{
					IpProtocol: aws.String("icmp"),
					FromPort:   aws.Int64(3),
					ToPort:     aws.Int64(4),
					IpRanges: []*ec2.IpRange{
						{
							CidrIp:      ipRange.CidrIp,
							Description: aws.String(NLBMtuDiscoveryRuleDescription),
						},
					},
				})
			}
		}
	}

	permsToGrant := desiredPerms.Difference(sgPerms)
	permsToRevoke := sgPerms.Difference(desiredPerms)
	permsToRevoke.DeleteIf(IPPermissionNotMatch{IPPermissionMatchDesc{NLBMtuDiscoveryRuleDescription}})
	if len(permsToRevoke) > 0 {
		permsToRevokeList := permsToRevoke.List()
		changed, err := c.removeSecurityGroupIngress(sgID, permsToRevokeList)
		if err != nil {
			klog.Warningf("Error remove MTU permission from security group: %q", err)
			return err
		}
		if !changed {
			klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", sgID)
		}

		sgPerms.Delete(permsToRevokeList...)
	}
	if len(permsToGrant) > 0 {
		permsToGrantList := permsToGrant.List()
		changed, err := c.addSecurityGroupIngress(sgID, permsToGrantList)
		if err != nil {
			klog.Warningf("Error add MTU permission to security group: %q", err)
			return err
		}
		if !changed {
			klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", sgID)
		}
		sgPerms.Insert(permsToGrantList...)
	}
	return nil
}

func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool, loadBalancerAttributes *elb.LoadBalancerAttributes, annotations map[string]string) (*elb.LoadBalancerDescription, error) {
	loadBalancer, err := c.describeLoadBalancer(loadBalancerName)
	if err != nil {
		return nil, err
	}

	dirty := false

	if loadBalancer == nil {
		createRequest := &elb.CreateLoadBalancerInput{}
		createRequest.LoadBalancerName = aws.String(loadBalancerName)

		createRequest.Listeners = listeners

		if internalELB {
			createRequest.Scheme = aws.String("internal")
		}

		// We are supposed to specify one subnet per AZ.
		// TODO: What happens if we have more than one subnet per AZ?
		if subnetIDs == nil {
			createRequest.Subnets = nil
		} else {
			createRequest.Subnets = aws.StringSlice(subnetIDs)
		}

		if securityGroupIDs == nil {
			createRequest.SecurityGroups = nil
		} else {
			createRequest.SecurityGroups = aws.StringSlice(securityGroupIDs)
		}

		// Get additional tags set by the user
		tags := getKeyValuePropertiesFromAnnotation(annotations, ServiceAnnotationLoadBalancerAdditionalTags)

		// Add default tags
		tags[TagNameKubernetesService] = namespacedName.String()
		tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)

		for k, v := range tags {
			createRequest.Tags = append(createRequest.Tags, &elb.Tag{
				Key: aws.String(k), Value: aws.String(v),
			})
		}

		klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
		_, err := c.elb.CreateLoadBalancer(createRequest)
		if err != nil {
			return nil, err
		}

		if proxyProtocol {
			err = c.createProxyProtocolPolicy(loadBalancerName)
			if err != nil {
				return nil, err
			}

			for _, listener := range listeners {
				klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to true", *listener.InstancePort)
				err := c.setBackendPolicies(loadBalancerName, *listener.InstancePort, []*string{aws.String(ProxyProtocolPolicyName)})
				if err != nil {
					return nil, err
				}
			}
		}

		dirty = true
	} else {
		// TODO: Sync internal vs non-internal

		{
			// Sync subnets
			expected := sets.NewString(subnetIDs...)
			actual := stringSetFromPointers(loadBalancer.Subnets)

			additions := expected.Difference(actual)
			removals := actual.Difference(expected)

			if removals.Len() != 0 {
				request := &elb.DetachLoadBalancerFromSubnetsInput{}
				request.LoadBalancerName = aws.String(loadBalancerName)
				request.Subnets = stringSetToPointers(removals)
				klog.V(2).Info("Detaching load balancer from removed subnets")
				_, err := c.elb.DetachLoadBalancerFromSubnets(request)
				if err != nil {
					return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %q", err)
				}
				dirty = true
			}

			if additions.Len() != 0 {
				request := &elb.AttachLoadBalancerToSubnetsInput{}
				request.LoadBalancerName = aws.String(loadBalancerName)
				request.Subnets = stringSetToPointers(additions)
				klog.V(2).Info("Attaching load balancer to added subnets")
				_, err := c.elb.AttachLoadBalancerToSubnets(request)
				if err != nil {
					return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %q", err)
				}
				dirty = true
			}
		}

		{
			// Sync security groups
			expected := sets.NewString(securityGroupIDs...)
			actual := stringSetFromPointers(loadBalancer.SecurityGroups)

			if !expected.Equal(actual) {
				// This call just replaces the security groups, unlike e.g. subnets (!)
				request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
				request.LoadBalancerName = aws.String(loadBalancerName)
				if securityGroupIDs == nil {
					request.SecurityGroups = nil
				} else {
					request.SecurityGroups = aws.StringSlice(securityGroupIDs)
				}
				klog.V(2).Info("Applying updated security groups to load balancer")
				_, err := c.elb.ApplySecurityGroupsToLoadBalancer(request)
				if err != nil {
					return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %q", err)
				}
				dirty = true
			}
		}

		{
			additions, removals := syncElbListeners(loadBalancerName, listeners, loadBalancer.ListenerDescriptions)

			if len(removals) != 0 {
				request := &elb.DeleteLoadBalancerListenersInput{}
				request.LoadBalancerName = aws.String(loadBalancerName)
				request.LoadBalancerPorts = removals
				klog.V(2).Info("Deleting removed load balancer listeners")
				if _, err := c.elb.DeleteLoadBalancerListeners(request); err != nil {
					return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %q", err)
				}
				dirty = true
			}

			if len(additions) != 0 {
				request := &elb.CreateLoadBalancerListenersInput{}
				request.LoadBalancerName = aws.String(loadBalancerName)
				request.Listeners = additions
				klog.V(2).Info("Creating added load balancer listeners")
				if _, err := c.elb.CreateLoadBalancerListeners(request); err != nil {
					return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %q", err)
				}
				dirty = true
			}
		}

		{
			// Sync proxy protocol state for new and existing listeners

			proxyPolicies := make([]*string, 0)
			if proxyProtocol {
				// Ensure the backend policy exists

				// NOTE The documentation for the AWS API indicates we could get an HTTP 400
				// back if a policy of the same name already exists. However, the aws-sdk does not
				// seem to return an error to us in these cases. Therefore, this will issue an API
				// request every time.
				err := c.createProxyProtocolPolicy(loadBalancerName)
				if err != nil {
					return nil, err
				}

				proxyPolicies = append(proxyPolicies, aws.String(ProxyProtocolPolicyName))
			}

			foundBackends := make(map[int64]bool)
			proxyProtocolBackends := make(map[int64]bool)
			for _, backendListener := range loadBalancer.BackendServerDescriptions {
				foundBackends[*backendListener.InstancePort] = false
				proxyProtocolBackends[*backendListener.InstancePort] = proxyProtocolEnabled(backendListener)
			}

			for _, listener := range listeners {
				setPolicy := false
				instancePort := *listener.InstancePort

				if currentState, ok := proxyProtocolBackends[instancePort]; !ok {
					// This is a new ELB backend so we only need to worry about
					// potentially adding a policy and not removing an
					// existing one
					setPolicy = proxyProtocol
				} else {
					foundBackends[instancePort] = true
					// This is an existing ELB backend so we need to determine
					// if the state changed
					setPolicy = (currentState != proxyProtocol)
				}

				if setPolicy {
					klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to %t", instancePort, proxyProtocol)
					err := c.setBackendPolicies(loadBalancerName, instancePort, proxyPolicies)
					if err != nil {
						return nil, err
					}
					dirty = true
				}
			}

			// We now need to figure out if any backend policies need removed
			// because these old policies will stick around even if there is no
			// corresponding listener anymore
			for instancePort, found := range foundBackends {
				if !found {
					klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to false", instancePort)
					err := c.setBackendPolicies(loadBalancerName, instancePort, []*string{})
					if err != nil {
						return nil, err
					}
					dirty = true
				}
			}
		}

		{
			// Add additional tags
			klog.V(2).Infof("Creating additional load balancer tags for %s", loadBalancerName)
			tags := getKeyValuePropertiesFromAnnotation(annotations, ServiceAnnotationLoadBalancerAdditionalTags)
			if len(tags) > 0 {
				err := c.addLoadBalancerTags(loadBalancerName, tags)
				if err != nil {
					return nil, fmt.Errorf("unable to create additional load balancer tags: %v", err)
				}
			}
		}
	}

	// Whether the ELB was new or existing, sync attributes regardless. This accounts for things
	// that cannot be specified at the time of creation and can only be modified after the fact,
	// e.g. idle connection timeout.
	{
		describeAttributesRequest := &elb.DescribeLoadBalancerAttributesInput{}
		describeAttributesRequest.LoadBalancerName = aws.String(loadBalancerName)
		describeAttributesOutput, err := c.elb.DescribeLoadBalancerAttributes(describeAttributesRequest)
		if err != nil {
			klog.Warning("Unable to retrieve load balancer attributes during attribute sync")
			return nil, err
		}

		foundAttributes := &describeAttributesOutput.LoadBalancerAttributes

		// Update attributes if they're dirty
		if !reflect.DeepEqual(loadBalancerAttributes, foundAttributes) {
			klog.V(2).Infof("Updating load-balancer attributes for %q", loadBalancerName)

			modifyAttributesRequest := &elb.ModifyLoadBalancerAttributesInput{}
			modifyAttributesRequest.LoadBalancerName = aws.String(loadBalancerName)
			modifyAttributesRequest.LoadBalancerAttributes = loadBalancerAttributes
			_, err = c.elb.ModifyLoadBalancerAttributes(modifyAttributesRequest)
			if err != nil {
				return nil, fmt.Errorf("Unable to update load balancer attributes during attribute sync: %q", err)
			}
			dirty = true
		}
	}

	if dirty {
		loadBalancer, err = c.describeLoadBalancer(loadBalancerName)
		if err != nil {
			klog.Warning("Unable to retrieve load balancer after creation/update")
			return nil, err
		}
	}

	return loadBalancer, nil
}

// syncElbListeners computes a plan to reconcile the desired vs actual state of the listeners on an ELB
// NOTE: there exists an O(nlgn) implementation for this function. However, as the default limit of
//
//	listeners per elb is 100, this implementation is reduced from O(m*n) => O(n).
func syncElbListeners(loadBalancerName string, listeners []*elb.Listener, listenerDescriptions []*elb.ListenerDescription) ([]*elb.Listener, []*int64) {
	foundSet := make(map[int]bool)
	removals := []*int64{}
	additions := []*elb.Listener{}

	for _, listenerDescription := range listenerDescriptions {
		actual := listenerDescription.Listener
		if actual == nil {
			klog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName)
			continue
		}

		found := false
		for i, expected := range listeners {
			if expected == nil {
				klog.Warning("Ignoring empty desired listener for loadbalancer: ", loadBalancerName)
				continue
			}
			if elbListenersAreEqual(actual, expected) {
				// The current listener on the actual
				// elb is in the set of desired listeners.
				foundSet[i] = true
				found = true
				break
			}
		}
		if !found {
			removals = append(removals, actual.LoadBalancerPort)
		}
	}

	for i := range listeners {
		if !foundSet[i] {
			additions = append(additions, listeners[i])
		}
	}

	return additions, removals
}

func elbListenersAreEqual(actual, expected *elb.Listener) bool {
	if !elbProtocolsAreEqual(actual.Protocol, expected.Protocol) {
		return false
	}
	if !elbProtocolsAreEqual(actual.InstanceProtocol, expected.InstanceProtocol) {
		return false
	}
	if aws.Int64Value(actual.InstancePort) != aws.Int64Value(expected.InstancePort) {
		return false
	}
	if aws.Int64Value(actual.LoadBalancerPort) != aws.Int64Value(expected.LoadBalancerPort) {
		return false
	}
	if !awsArnEquals(actual.SSLCertificateId, expected.SSLCertificateId) {
		return false
	}
	return true
}

func createSubnetMappings(subnetIDs []string, allocationIDs []string) []*elbv2.SubnetMapping {
	response := []*elbv2.SubnetMapping{}

	for index, id := range subnetIDs {
		sm := &elbv2.SubnetMapping{SubnetId: aws.String(id)}
		if len(allocationIDs) > 0 {
			sm.AllocationId = aws.String(allocationIDs[index])
		}
		response = append(response, sm)
	}

	return response
}

// elbProtocolsAreEqual checks if two ELB protocol strings are considered the same
// Comparison is case insensitive
func elbProtocolsAreEqual(l, r *string) bool {
	if l == nil || r == nil {
		return l == r
	}
	return strings.EqualFold(aws.StringValue(l), aws.StringValue(r))
}

// awsArnEquals checks if two ARN strings are considered the same
// Comparison is case insensitive
func awsArnEquals(l, r *string) bool {
	if l == nil || r == nil {
		return l == r
	}
	return strings.EqualFold(aws.StringValue(l), aws.StringValue(r))
}

// getExpectedHealthCheck returns an elb.Healthcheck for the provided target
// and using either sensible defaults or overrides via Service annotations
func (c *Cloud) getExpectedHealthCheck(target string, annotations map[string]string) (*elb.HealthCheck, error) {
	healthcheck := &elb.HealthCheck{Target: &target}
	getOrDefault := func(annotation string, defaultValue int64) (*int64, error) {
		i64 := defaultValue
		var err error
		if s, ok := annotations[annotation]; ok {
			i64, err = strconv.ParseInt(s, 10, 0)
			if err != nil {
				return nil, fmt.Errorf("failed parsing health check annotation value: %v", err)
			}
		}
		return &i64, nil
	}
	var err error
	healthcheck.HealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCHealthyThreshold, defaultElbHCHealthyThreshold)
	if err != nil {
		return nil, err
	}
	healthcheck.UnhealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCUnhealthyThreshold, defaultElbHCUnhealthyThreshold)
	if err != nil {
		return nil, err
	}
	healthcheck.Timeout, err = getOrDefault(ServiceAnnotationLoadBalancerHCTimeout, defaultElbHCTimeout)
	if err != nil {
		return nil, err
	}
	healthcheck.Interval, err = getOrDefault(ServiceAnnotationLoadBalancerHCInterval, defaultElbHCInterval)
	if err != nil {
		return nil, err
	}
	if err = healthcheck.Validate(); err != nil {
		return nil, fmt.Errorf("some of the load balancer health check parameters are invalid: %v", err)
	}
	return healthcheck, nil
}

// Makes sure that the health check for an ELB matches the configured health check node port
func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDescription, protocol string, port int32, path string, annotations map[string]string) error {
	name := aws.StringValue(loadBalancer.LoadBalancerName)

	actual := loadBalancer.HealthCheck
	// Override healthcheck protocol, port and path based on annotations
	if s, ok := annotations[ServiceAnnotationLoadBalancerHealthCheckProtocol]; ok {
		protocol = s
	}
	if s, ok := annotations[ServiceAnnotationLoadBalancerHealthCheckPort]; ok && s != defaultHealthCheckPort {
		p, err := strconv.ParseInt(s, 10, 0)
		if err != nil {
			return err
		}
		port = int32(p)
	}
	switch strings.ToUpper(protocol) {
	case "HTTP", "HTTPS":
		if path == "" {
			path = defaultHealthCheckPath
		}
		if s := annotations[ServiceAnnotationLoadBalancerHealthCheckPath]; s != "" {
			path = s
		}
	default:
		path = ""
	}
	expectedTarget := protocol + ":" + strconv.FormatInt(int64(port), 10) + path
	expected, err := c.getExpectedHealthCheck(expectedTarget, annotations)
	if err != nil {
		return fmt.Errorf("cannot update health check for load balancer %q: %q", name, err)
	}

	// comparing attributes 1 by 1 to avoid breakage in case a new field is
	// added to the HC which breaks the equality
	if aws.StringValue(expected.Target) == aws.StringValue(actual.Target) &&
		aws.Int64Value(expected.HealthyThreshold) == aws.Int64Value(actual.HealthyThreshold) &&
		aws.Int64Value(expected.UnhealthyThreshold) == aws.Int64Value(actual.UnhealthyThreshold) &&
		aws.Int64Value(expected.Interval) == aws.Int64Value(actual.Interval) &&
		aws.Int64Value(expected.Timeout) == aws.Int64Value(actual.Timeout) {
		return nil
	}

	request := &elb.ConfigureHealthCheckInput{}
	request.HealthCheck = expected
	request.LoadBalancerName = loadBalancer.LoadBalancerName

	_, err = c.elb.ConfigureHealthCheck(request)
	if err != nil {
		return fmt.Errorf("error configuring load balancer health check for %q: %q", name, err)
	}

	return nil
}

// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[InstanceID]*ec2.Instance) error {
	expected := sets.NewString()
	for id := range instanceIDs {
		expected.Insert(string(id))
	}

	actual := sets.NewString()
	for _, lbInstance := range lbInstances {
		actual.Insert(aws.StringValue(lbInstance.InstanceId))
	}

	additions := expected.Difference(actual)
	removals := actual.Difference(expected)

	addInstances := []*elb.Instance{}
	for _, instanceID := range additions.List() {
		addInstance := &elb.Instance{}
		addInstance.InstanceId = aws.String(instanceID)
		addInstances = append(addInstances, addInstance)
	}

	removeInstances := []*elb.Instance{}
	for _, instanceID := range removals.List() {
		removeInstance := &elb.Instance{}
		removeInstance.InstanceId = aws.String(instanceID)
		removeInstances = append(removeInstances, removeInstance)
	}

	if len(addInstances) > 0 {
		registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
		registerRequest.Instances = addInstances
		registerRequest.LoadBalancerName = aws.String(loadBalancerName)
		_, err := c.elb.RegisterInstancesWithLoadBalancer(registerRequest)
		if err != nil {
			return err
		}
		klog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName)
	}

	if len(removeInstances) > 0 {
		deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
		deregisterRequest.Instances = removeInstances
		deregisterRequest.LoadBalancerName = aws.String(loadBalancerName)
		_, err := c.elb.DeregisterInstancesFromLoadBalancer(deregisterRequest)
		if err != nil {
			return err
		}
		klog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName)
	}

	return nil
}

func (c *Cloud) getLoadBalancerTLSPorts(loadBalancer *elb.LoadBalancerDescription) []int64 {
	ports := []int64{}

	for _, listenerDescription := range loadBalancer.ListenerDescriptions {
		protocol := aws.StringValue(listenerDescription.Listener.Protocol)
		if protocol == "SSL" || protocol == "HTTPS" {
			ports = append(ports, aws.Int64Value(listenerDescription.Listener.LoadBalancerPort))
		}
	}
	return ports
}

func (c *Cloud) ensureSSLNegotiationPolicy(loadBalancer *elb.LoadBalancerDescription, policyName string) error {
	klog.V(2).Info("Describing load balancer policies on load balancer")
	result, err := c.elb.DescribeLoadBalancerPolicies(&elb.DescribeLoadBalancerPoliciesInput{
		LoadBalancerName: loadBalancer.LoadBalancerName,
		PolicyNames: []*string{
			aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)),
		},
	})
	if err != nil {
		if aerr, ok := err.(awserr.Error); ok {
			switch aerr.Code() {
			case elb.ErrCodePolicyNotFoundException:
			default:
				return fmt.Errorf("error describing security policies on load balancer: %q", err)
			}
		}
	}

	if len(result.PolicyDescriptions) > 0 {
		return nil
	}

	klog.V(2).Infof("Creating SSL negotiation policy '%s' on load balancer", fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName))
	// there is an upper limit of 98 policies on an ELB, we're pretty safe from
	// running into it
	_, err = c.elb.CreateLoadBalancerPolicy(&elb.CreateLoadBalancerPolicyInput{
		LoadBalancerName: loadBalancer.LoadBalancerName,
		PolicyName:       aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)),
		PolicyTypeName:   aws.String("SSLNegotiationPolicyType"),
		PolicyAttributes: []*elb.PolicyAttribute{
			{
				AttributeName:  aws.String("Reference-Security-Policy"),
				AttributeValue: aws.String(policyName),
			},
		},
	})
	if err != nil {
		return fmt.Errorf("error creating security policy on load balancer: %q", err)
	}
	return nil
}

func (c *Cloud) setSSLNegotiationPolicy(loadBalancerName, sslPolicyName string, port int64) error {
	policyName := fmt.Sprintf(SSLNegotiationPolicyNameFormat, sslPolicyName)
	request := &elb.SetLoadBalancerPoliciesOfListenerInput{
		LoadBalancerName: aws.String(loadBalancerName),
		LoadBalancerPort: aws.Int64(port),
		PolicyNames: []*string{
			aws.String(policyName),
		},
	}
	klog.V(2).Infof("Setting SSL negotiation policy '%s' on load balancer", policyName)
	_, err := c.elb.SetLoadBalancerPoliciesOfListener(request)
	if err != nil {
		return fmt.Errorf("error setting SSL negotiation policy '%s' on load balancer: %q", policyName, err)
	}
	return nil
}

func (c *Cloud) createProxyProtocolPolicy(loadBalancerName string) error {
	request := &elb.CreateLoadBalancerPolicyInput{
		LoadBalancerName: aws.String(loadBalancerName),
		PolicyName:       aws.String(ProxyProtocolPolicyName),
		PolicyTypeName:   aws.String("ProxyProtocolPolicyType"),
		PolicyAttributes: []*elb.PolicyAttribute{
			{
				AttributeName:  aws.String("ProxyProtocol"),
				AttributeValue: aws.String("true"),
			},
		},
	}
	klog.V(2).Info("Creating proxy protocol policy on load balancer")
	_, err := c.elb.CreateLoadBalancerPolicy(request)
	if err != nil {
		return fmt.Errorf("error creating proxy protocol policy on load balancer: %q", err)
	}

	return nil
}

func (c *Cloud) setBackendPolicies(loadBalancerName string, instancePort int64, policies []*string) error {
	request := &elb.SetLoadBalancerPoliciesForBackendServerInput{
		InstancePort:     aws.Int64(instancePort),
		LoadBalancerName: aws.String(loadBalancerName),
		PolicyNames:      policies,
	}
	if len(policies) > 0 {
		klog.V(2).Infof("Adding AWS loadbalancer backend policies on node port %d", instancePort)
	} else {
		klog.V(2).Infof("Removing AWS loadbalancer backend policies on node port %d", instancePort)
	}
	_, err := c.elb.SetLoadBalancerPoliciesForBackendServer(request)
	if err != nil {
		return fmt.Errorf("error adjusting AWS loadbalancer backend policies: %q", err)
	}

	return nil
}

func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool {
	for _, policy := range backend.PolicyNames {
		if aws.StringValue(policy) == ProxyProtocolPolicyName {
			return true
		}
	}

	return false
}

// findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB
// We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider,
// and we ignore instances which are not found
func (c *Cloud) findInstancesForELB(nodes []*v1.Node, annotations map[string]string) (map[InstanceID]*ec2.Instance, error) {

	targetNodes := filterTargetNodes(nodes, annotations)

	// Map to instance ids ignoring Nodes where we cannot find the id (but logging)
	instanceIDs := mapToAWSInstanceIDsTolerant(targetNodes)

	cacheCriteria := cacheCriteria{
		MaxAge:       defaultEC2InstanceCacheMaxAge,
		HasInstances: instanceIDs, // Refresh if any of the instance ids are missing
	}
	snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria)
	if err != nil {
		return nil, err
	}

	instances := snapshot.FindInstances(instanceIDs)
	// We ignore instances that cannot be found

	return instances, nil
}

// filterTargetNodes uses node labels to filter the nodes that should be targeted by the ELB,
// checking if all the labels provided in an annotation are present in the nodes
func filterTargetNodes(nodes []*v1.Node, annotations map[string]string) []*v1.Node {

	targetNodeLabels := getKeyValuePropertiesFromAnnotation(annotations, ServiceAnnotationLoadBalancerTargetNodeLabels)

	if len(targetNodeLabels) == 0 {
		return nodes
	}

	targetNodes := make([]*v1.Node, 0, len(nodes))

	for _, node := range nodes {
		if node.Labels != nil && len(node.Labels) > 0 {
			allFiltersMatch := true

			for targetLabelKey, targetLabelValue := range targetNodeLabels {
				if nodeLabelValue, ok := node.Labels[targetLabelKey]; !ok || (nodeLabelValue != targetLabelValue && targetLabelValue != "") {
					allFiltersMatch = false
					break
				}
			}

			if allFiltersMatch {
				targetNodes = append(targetNodes, node)
			}
		}
	}

	return targetNodes
}

相关信息

kubernetes 源码目录

相关文章

kubernetes aws 源码

kubernetes aws_assumerole_provider 源码

kubernetes aws_assumerole_provider_test 源码

kubernetes aws_fakes 源码

kubernetes aws_instancegroups 源码

kubernetes aws_loadbalancer_test 源码

kubernetes aws_metrics 源码

kubernetes aws_routes 源码

kubernetes aws_test 源码

kubernetes aws_utils 源码

0  赞