Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a "local interface IP mode" to the AWS cloud provider. #518

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion CLOUDPROVIDERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,51 @@ All configuration is in a stanza named after the backend, and takes simple key v

aws
---
### TODO
#### Overview

The AWS cloud provider uses the IP addresses of incoming metric datagrams
to lookup the tags of EC2 instances with matching private IP addresses and
decorates the associated metrics with said tags. A "local IP mode" is also
provided that can be used to lookup tags of the local EC2 instance on which
`gostatsd` is running.

Tags are queried using the EC2 metadata service to execute a
[`DescribeInstances`](https://docs.aws.amazon.com/sdk-for-go/api/service/ec2/#EC2.DescribeInstances)
operation. Unless local IP mode is enabled, instances are located by
setting the `private-ip-address` in the `DescribeInstances` filter to the values
of the incoming IP addresses. In local IP mode, instances are located by setting
the `instance-id` in the `DescribeInstances` filter to the instance ID returned
by the [EC2 instance identity document](https://docs.aws.amazon.com/sdk-for-go/api/aws/ec2metadata/#EC2InstanceIdentityDocument).

#### Important details

Like the k8s provider, `ignore-host` must be set to `false` for the cloud
provider to work at all! This is because it works based off the
source IP address of incoming metrics, and these are dropped if
`ignore-host=true`.

#### Example with defaults

```$toml
cloud-provider = 'aws'

[aws]
max_retries = 3
client_timeout = 9000000000
max_instances_batch = 32
local_ip_mode = "never"
local_ip_whitelist = "127.0.0.1 localhost 172.17.0.1"
```

The configuration settings are as follows:
- `max_retries`: The maximum number of times that a request will be retried by the AWS client before failure
- `client_timeout`: The timeout (in nanoseconds) for the HTTP client passed to the AWS client
- `max_instances_batch`: The maximum number of instances that can be requested at a time by the Cloud Provider pipeline handler
- `local_ip_mode`: If set to `deny`, will never try to lookup tags of the local instance, even if datagrams
are sent with local interface addresses. If set to `allow`, will lookup tags of the local instance if datagrams
are sent with addresses on the local interface IP whitelist.
- `local_ip_whitelist`: A string separated list of local interface IPs that should trigger
the lookup of tags on the local instance.

k8s
---
Expand Down
251 changes: 211 additions & 40 deletions pkg/cloudproviders/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync/atomic"
"time"

Expand All @@ -24,11 +25,38 @@ import (
"github.com/atlassian/gostatsd/pkg/stats"
)

type LocalIPMode int

const (
Deny LocalIPMode = iota
Allow
)

func NewLocalIPMode(str string) LocalIPMode {
s := strings.ToLower(str)
switch s {
case "deny":
return Deny
case "allow":
return Allow
}
return Deny
}

var (
defaultLocalIPWhitelist = []string{
"127.0.0.1",
"localhost",
"172.17.0.1", // docker gateway
}
)

const (
// ProviderName is the name of AWS cloud provider.
ProviderName = "aws"
defaultClientTimeout = 9 * time.Second
defaultMaxInstancesBatch = 32
defaultLocalIPMode = Deny
)

// Provider represents an AWS provider.
Expand All @@ -41,9 +69,11 @@ type Provider struct {

logger logrus.FieldLogger

Metadata *ec2metadata.EC2Metadata
Ec2 *ec2.EC2
MaxInstances int
Metadata *ec2metadata.EC2Metadata
Ec2 *ec2.EC2
MaxInstances int
localIPMode LocalIPMode
localIPWhitelist []string
}

func (p *Provider) EstimatedTags() int {
Expand Down Expand Up @@ -74,11 +104,21 @@ func (p *Provider) RunMetrics(ctx context.Context, statser stats.Statser) {
// map is returned even in case of errors because it may contain partial data.
func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gostatsd.Source]*gostatsd.Instance, error) {
instances := make(map[gostatsd.Source]*gostatsd.Instance, len(IP))
values := make([]*string, len(IP))
for i, ip := range IP {
ips := make([]*string, len(IP))
errors := make([]error, 0, 2)
n := 0
lookupLocal := false
for _, ip := range IP {
instances[ip] = nil // initialize map. Used for lookups to see if info for IP was requested
values[i] = aws.String(string(ip))
if !p.isLocalIP(ip) {
ips[n] = aws.String(string(ip))
n = n + 1
} else {
lookupLocal = true
}
}

values := ips[:n]
input := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
{
Expand All @@ -89,7 +129,7 @@ func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gos
}

atomic.AddUint64(&p.describeInstanceCount, 1)
atomic.AddUint64(&p.describeInstanceInstances, uint64(len(IP)))
atomic.AddUint64(&p.describeInstanceInstances, uint64(len(values)))
instancesFound := uint64(0)
pages := uint64(0)

Expand All @@ -104,37 +144,12 @@ func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gos
continue
}
instancesFound++
region, err := azToRegion(aws.StringValue(instance.Placement.AvailabilityZone))
if err != nil {
p.logger.Errorf("Error getting instance region: %v", err)
}
tags := make(gostatsd.Tags, len(instance.Tags)+1)
for idx, tag := range instance.Tags {
tags[idx] = fmt.Sprintf("%s:%s",
gostatsd.NormalizeTagKey(aws.StringValue(tag.Key)),
aws.StringValue(tag.Value))
}
tags[len(tags)-1] = "region:" + region
instances[ip] = &gostatsd.Instance{
ID: gostatsd.Source(aws.StringValue(instance.InstanceId)),
Tags: tags,
}
p.logger.WithFields(logrus.Fields{
"instance": instance.InstanceId,
"ip": ip,
"tags": tags,
}).Debug("Added tags")
instances[ip] = p.gostatsdInstanceFromInstance(ip, instance)
}
}
return true
})

for ip, instance := range instances {
if instance == nil {
p.logger.WithField("ip", ip).Debug("No results looking up instance")
}
}

atomic.AddUint64(&p.describeInstancePages, pages)
atomic.AddUint64(&p.describeInstanceFound, instancesFound)

Expand All @@ -143,11 +158,35 @@ func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gos

// Avoid spamming logs if instance id is not visible yet due to eventual consistency.
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html#CommonErrors
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidInstanceID.NotFound" {
return instances, nil
if !isEventualConsistencyErr(err) {
errors = append(errors, fmt.Errorf("error listing AWS instances: %v", err))
}
}

var localInstance *gostatsd.Instance

if lookupLocal {
localInstance, err = p.instanceFromMetadata(ctx)
if err != nil && !isEventualConsistencyErr(err) {
errors = append(errors, fmt.Errorf("error inspecting local instance: %v", err))
}
}

for ip, instance := range instances {
if instance == nil {
if localInstance != nil && p.isLocalIP(ip) {
p.logger.WithField("ip", ip).Debugf("Using local instance for IP %v", ip)
instances[ip] = localInstance
} else {
p.logger.WithField("ip", ip).Debug("No results looking up instance")
}
}
return instances, fmt.Errorf("error listing AWS instances: %v", err)
}

if len(errors) > 0 {
return instances, multiError(errors)
}

return instances, nil
}

Expand Down Expand Up @@ -187,6 +226,93 @@ func (p *Provider) Name() string {
return ProviderName
}

func (p *Provider) instanceFromMetadata(ctx context.Context) (*gostatsd.Instance, error) {
identityDoc, err := p.Metadata.GetInstanceIdentityDocument()
if err != nil {
return nil, err
}

values := []*string{&identityDoc.InstanceID}

input := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("instance-id"),
Values: values,
},
},
}

atomic.AddUint64(&p.describeInstanceCount, 1)
atomic.AddUint64(&p.describeInstanceInstances, 1)

var cachedInstance *gostatsd.Instance

p.logger.Debugf("Looking up instance for local instance ID %v", identityDoc.InstanceID)
err = p.Ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
reservationCount := len(page.Reservations)
if reservationCount > 0 {
if reservationCount > 1 {
p.logger.WithFields(logrus.Fields{
"instance": identityDoc.InstanceID,
}).Warnf("Found more than one reservation for local instance ID %v. Using first.", identityDoc.InstanceID)
}

reservation := page.Reservations[0]
instanceCount := len(reservation.Instances)
if instanceCount > 0 {
if instanceCount > 1 {
p.logger.WithFields(logrus.Fields{
"instance": identityDoc.InstanceID,
"reservationId": reservation.ReservationId,
}).Warnf("Found more than one instance for local instance ID %v and reservation ID %v. Using first.", identityDoc.InstanceID, reservation.ReservationId)
}

cachedInstance = p.gostatsdInstanceFromInstance(gostatsd.Source(identityDoc.PrivateIP), reservation.Instances[0])
}
}
return false
})

atomic.AddUint64(&p.describeInstancePages, 1)
if cachedInstance != nil {
atomic.AddUint64(&p.describeInstanceFound, 1)
}

if err != nil {
atomic.AddUint64(&p.describeInstanceErrors, 1)

return nil, err
}

return cachedInstance, nil
}

func (p *Provider) gostatsdInstanceFromInstance(ip gostatsd.Source, instance *ec2.Instance) *gostatsd.Instance {
region, err := azToRegion(aws.StringValue(instance.Placement.AvailabilityZone))
if err != nil {
p.logger.Errorf("Error getting instance region: %v", err)
}
tags := make(gostatsd.Tags, len(instance.Tags)+1)
for idx, tag := range instance.Tags {
tags[idx] = fmt.Sprintf("%s:%s",
gostatsd.NormalizeTagKey(aws.StringValue(tag.Key)),
aws.StringValue(tag.Value))
}
tags[len(tags)-1] = "region:" + region

p.logger.WithFields(logrus.Fields{
"instance": instance.InstanceId,
"ip": ip,
"tags": tags,
}).Debug("Added tags")

return &gostatsd.Instance{
ID: gostatsd.Source(aws.StringValue(instance.InstanceId)),
Tags: tags,
}
}

// Derives the region from a valid az name.
// Returns an error if the az is known invalid (empty).
func azToRegion(az string) (string, error) {
Expand All @@ -197,12 +323,48 @@ func azToRegion(az string) (string, error) {
return region, nil
}

func isEventualConsistencyErr(err error) bool {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidInstanceID.NotFound" {
return true
}
return false
}

func multiError(errors []error) error {
errs := make([]string, 0, len(errors)+1)
errs = append(errs, fmt.Sprintf("%d errors occurred", len(errors)))
for _, err := range errors {
errs = append(errs, err.Error())
}
return fmt.Errorf(strings.Join(errs, ", "))
}

func (p *Provider) isLocalIP(ip gostatsd.Source) bool {
if p.localIPMode == Deny {
return false
}

return contains(p.localIPWhitelist, string(ip))
}

// contains checks if item is within slice
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

// NewProviderFromViper returns a new aws provider.
func NewProviderFromViper(v *viper.Viper, logger logrus.FieldLogger, _ string) (gostatsd.CloudProvider, error) {
a := util.GetSubViper(v, "aws")
a.SetDefault("max_retries", 3)
a.SetDefault("client_timeout", defaultClientTimeout)
a.SetDefault("max_instances_batch", defaultMaxInstancesBatch)
a.SetDefault("local_ip_mode", defaultLocalIPMode)
a.SetDefault("local_ip_whitelist", defaultLocalIPWhitelist)
httpTimeout := a.GetDuration("client_timeout")
if httpTimeout <= 0 {
return nil, errors.New("client timeout must be positive")
Expand All @@ -211,6 +373,13 @@ func NewProviderFromViper(v *viper.Viper, logger logrus.FieldLogger, _ string) (
if maxInstances <= 0 {
return nil, errors.New("max number of instances per batch must be positive")
}
localIPMode := NewLocalIPMode(a.GetString("local_ip_mode"))

var localIPWhitelist []string

if localIPMode == Allow {
localIPWhitelist = a.GetStringSlice("local_ip_whitelist")
}

// This is the main config without credentials.
transport := &http.Transport{
Expand Down Expand Up @@ -254,9 +423,11 @@ func NewProviderFromViper(v *viper.Viper, logger logrus.FieldLogger, _ string) (
return nil, fmt.Errorf("error creating a new EC2 session: %v", err)
}
return &Provider{
Metadata: metadata,
Ec2: ec2.New(ec2Session),
MaxInstances: maxInstances,
logger: logger,
Metadata: metadata,
Ec2: ec2.New(ec2Session),
MaxInstances: maxInstances,
logger: logger,
localIPMode: localIPMode,
localIPWhitelist: localIPWhitelist,
}, nil
}