Skip to content

Commit

Permalink
chore: migrate to AWS SDK v2, likely fixes TylerBrock#51
Browse files Browse the repository at this point in the history
  • Loading branch information
a-h committed Sep 14, 2022
1 parent a6b1843 commit ef7e22d
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 152 deletions.
150 changes: 71 additions & 79 deletions blade/blade.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blade

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -9,10 +10,9 @@ import (

"github.com/TylerBrock/colorjson"
"github.com/TylerBrock/saw/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/fatih/color"
)

Expand All @@ -21,58 +21,47 @@ type Blade struct {
config *config.Configuration
aws *config.AWSConfiguration
output *config.OutputConfiguration
cwl *cloudwatchlogs.CloudWatchLogs
cwl *cloudwatchlogs.Client
}

// NewBlade creates a new Blade with CloudWatchLogs instance from provided config
func NewBlade(
ctx context.Context,
config *config.Configuration,
awsConfig *config.AWSConfiguration,
outputConfig *config.OutputConfiguration,
) *Blade {
) (*Blade, error) {
blade := Blade{}
awsCfg := aws.Config{}

if awsConfig.Region != "" {
awsCfg.Region = &awsConfig.Region
}

awsSessionOpts := session.Options{
Config: awsCfg,
AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
SharedConfigState: session.SharedConfigEnable,
}

var opts []func(*awsconfig.LoadOptions) error
if awsConfig.Profile != "" {
awsSessionOpts.Profile = awsConfig.Profile
opts = append(opts, awsconfig.WithSharedConfigProfile(awsConfig.Profile))
}
if awsConfig.Region != "" {
opts = append(opts, awsconfig.WithRegion(awsConfig.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)

sess := session.Must(session.NewSessionWithOptions(awsSessionOpts))

blade.cwl = cloudwatchlogs.New(sess)
blade.cwl = cloudwatchlogs.NewFromConfig(awsCfg)
blade.config = config
blade.output = outputConfig

return &blade
return &blade, err
}

// GetLogGroups gets the log groups from AWS given the blade configuration
func (b *Blade) GetLogGroups() []*cloudwatchlogs.LogGroup {
func (b *Blade) GetLogGroups(ctx context.Context) (groups []types.LogGroup, err error) {
input := b.config.DescribeLogGroupsInput()
groups := make([]*cloudwatchlogs.LogGroup, 0)
b.cwl.DescribeLogGroupsPages(input, func(
out *cloudwatchlogs.DescribeLogGroupsOutput,
lastPage bool,
) bool {
for _, group := range out.LogGroups {
if b.config.Fuzzy && !groupNameMatches(*group.LogGroupName, b.config.Group) {
continue
}
groups = append(groups, group)
logGroupsPaginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(b.cwl, input)
var page *cloudwatchlogs.DescribeLogGroupsOutput
for logGroupsPaginator.HasMorePages() {
page, err = logGroupsPaginator.NextPage(ctx)
if err != nil {
return
}
return !lastPage
})
return groups
groups = append(groups, page.LogGroups...)
}
return
}

func groupNameMatches(s, substr string) bool {
Expand Down Expand Up @@ -117,46 +106,50 @@ func filterGroupNames(groups []*cloudwatchlogs.LogGroup, group string) (op []str
}

// GetLogStreams gets the log streams from AWS given the blade configuration
func (b *Blade) GetLogStreams() (streams []*cloudwatchlogs.LogStream, err error) {
func (b *Blade) GetLogStreams(ctx context.Context) (streams []types.LogStream, err error) {
if err := b.ResolveFuzzyGroupName(); err != nil {
return nil, err
}
input := b.config.DescribeLogStreamsInput()
b.cwl.DescribeLogStreamsPages(input, func(
out *cloudwatchlogs.DescribeLogStreamsOutput,
lastPage bool,
) bool {
for _, stream := range out.LogStreams {
streams = append(streams, stream)
logStreamsPaginator := cloudwatchlogs.NewDescribeLogStreamsPaginator(b.cwl, input)
var page *cloudwatchlogs.DescribeLogStreamsOutput
for logStreamsPaginator.HasMorePages() {
page, err = logStreamsPaginator.NextPage(ctx)
if err != nil {
return
}
return !lastPage
})
return streams, err
streams = append(streams, page.LogStreams...)
}
return
}

// GetEvents gets events from AWS given the blade configuration
func (b *Blade) GetEvents() (err error) {
func (b *Blade) GetEvents(ctx context.Context) (err error) {
if err := b.ResolveFuzzyGroupName(); err != nil {
return err
}
formatter := b.output.Formatter()
input := b.config.FilterLogEventsInput()

handlePage := func(page *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
logEventsPaginator := cloudwatchlogs.NewFilterLogEventsPaginator(b.cwl, input)
var page *cloudwatchlogs.FilterLogEventsOutput
for logEventsPaginator.HasMorePages() {
page, err = logEventsPaginator.NextPage(ctx)
if err != nil {
return
}
for _, event := range page.Events {
if b.output.Pretty {
fmt.Println(strings.TrimRight(formatEvent(formatter, event), "\n"))
} else {
fmt.Println(strings.TrimRight(*event.Message, "\n"))
}
}
return !lastPage
}
return b.cwl.FilterLogEventsPages(input, handlePage)
return
}

// StreamEvents continuously prints log events to the console
func (b *Blade) StreamEvents() (err error) {
func (b *Blade) StreamEvents(ctx context.Context) (err error) {
var lastSeenTime *int64
var seenEventIDs map[string]bool
formatter := b.output.Formatter()
Expand All @@ -177,46 +170,45 @@ func (b *Blade) StreamEvents() (err error) {
}
}

handlePage := func(page *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
for _, event := range page.Events {
updateLastSeenTime(event.Timestamp)
if _, seen := seenEventIDs[*event.EventId]; !seen {
var message string
if b.output.Raw {
message = *event.Message
} else {
message = formatEvent(formatter, event)
for {
logEventsPaginator := cloudwatchlogs.NewFilterLogEventsPaginator(b.cwl, input)
var page *cloudwatchlogs.FilterLogEventsOutput
for logEventsPaginator.HasMorePages() {
page, err = logEventsPaginator.NextPage(ctx)
if err != nil {
return
}
for _, event := range page.Events {
updateLastSeenTime(event.Timestamp)
if _, seen := seenEventIDs[*event.EventId]; !seen {
var message string
if b.output.Raw {
message = *event.Message
} else {
message = formatEvent(formatter, event)
}
message = strings.TrimRight(message, "\n")
fmt.Println(message)
addSeenEventIDs(event.EventId)
}
message = strings.TrimRight(message, "\n")
fmt.Println(message)
addSeenEventIDs(event.EventId)
}
}
return !lastPage
}

for {
err = b.cwl.FilterLogEventsPages(input, handlePage)
if err != nil {
return
}
if lastSeenTime != nil {
input.SetStartTime(*lastSeenTime)
input.StartTime = lastSeenTime
}
time.Sleep(1 * time.Second)
}
}

// formatEvent returns a CloudWatch log event as a formatted string using the provided formatter
func formatEvent(formatter *colorjson.Formatter, event *cloudwatchlogs.FilteredLogEvent) string {
func formatEvent(formatter *colorjson.Formatter, event types.FilteredLogEvent) string {
red := color.New(color.FgRed).SprintFunc()
white := color.New(color.FgWhite).SprintFunc()

str := aws.StringValue(event.Message)
bytes := []byte(str)
date := aws.MillisecondsTimeValue(event.Timestamp)
dateStr := date.Format(time.RFC3339)
streamStr := aws.StringValue(event.LogStreamName)
str := *event.Message
bytes := []byte(*event.Message)
dateStr := time.UnixMilli(*event.Timestamp).Format(time.RFC3339)
streamStr := *event.LogStreamName
jl := map[string]interface{}{}

if err := json.Unmarshal(bytes, &jl); err != nil {
Expand Down
11 changes: 7 additions & 4 deletions cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ var getCommand = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
getConfig.Group = args[0]
b := blade.NewBlade(&getConfig, &awsConfig, &getOutputConfig)
b, err := blade.NewBlade(cmd.Context(), &getConfig, &awsConfig, &getOutputConfig)
if err != nil {
return
}
if getConfig.Prefix != "" {
streams, err := b.GetLogStreams()
streams, err := b.GetLogStreams(cmd.Context())
if err != nil {
return err
return fmt.Errorf("failed to get log streams: %w", err)
}
if len(streams) == 0 {
fmt.Printf("No streams found in %s with prefix %s\n", getConfig.Group, getConfig.Prefix)
Expand All @@ -38,7 +41,7 @@ var getCommand = &cobra.Command{
}
getConfig.Streams = streams
}
return b.GetEvents()
return b.GetEvents(cmd.Context())
},
}

Expand Down
14 changes: 10 additions & 4 deletions cmd/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ var groupsCommand = &cobra.Command{
Use: "groups",
Short: "List log groups",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, args []string) (err error) {
if groupsConfig.Fuzzy {
if len(args) < 1 {
return errors.New("listing groups with fuzzy search requires log group argument")
}
groupsConfig.Group = args[0]
}
b := blade.NewBlade(&groupsConfig, &awsConfig, nil)
logGroups := b.GetLogGroups()
b, err := blade.NewBlade(cmd.Context(), &groupsConfig, &awsConfig, nil)
if err != nil {
return
}
logGroups, err := b.GetLogGroups(cmd.Context())
if err != nil {
return fmt.Errorf("failed to get log groups: %w", err)
}
for _, group := range logGroups {
fmt.Println(*group.LogGroupName)
}
return nil
return
},
}

Expand Down
9 changes: 6 additions & 3 deletions cmd/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ var streamsCommand = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
streamsConfig.Group = args[0]
b := blade.NewBlade(&streamsConfig, &awsConfig, nil)
b, err := blade.NewBlade(cmd.Context(), &streamsConfig, &awsConfig, nil)
if err != nil {
return
}

logStreams, err := b.GetLogStreams()
logStreams, err := b.GetLogStreams(cmd.Context())
if err != nil {
return err
return fmt.Errorf("failed to get log streams: %w", err)
}
for _, stream := range logStreams {
fmt.Println(*stream.LogStreamName)
Expand Down
12 changes: 7 additions & 5 deletions cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ var watchCommand = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
watchConfig.Group = args[0]
b := blade.NewBlade(&watchConfig, &awsConfig, &watchOutputConfig)
b, err := blade.NewBlade(cmd.Context(), &watchConfig, &awsConfig, &watchOutputConfig)
if err != nil {
return
}
if watchConfig.Prefix != "" {
streams, err := b.GetLogStreams()
streams, err := b.GetLogStreams(cmd.Context())
if err != nil {
return err
return fmt.Errorf("failed to get log streams: %w", err)
}
if len(streams) == 0 {
fmt.Printf("No streams found in %s with prefix %s\n", watchConfig.Group, watchConfig.Prefix)
Expand All @@ -39,8 +42,7 @@ var watchCommand = &cobra.Command{
}
watchConfig.Streams = streams
}
b.StreamEvents()
return
return b.StreamEvents(cmd.Context())
},
}

Expand Down
Loading

0 comments on commit ef7e22d

Please sign in to comment.