Skip to content

Commit

Permalink
chore: migrate to AWS SDK v2, likely fixes TylerBrock#51
Browse files Browse the repository at this point in the history
  • Loading branch information
a-h committed Sep 14, 2022
1 parent 77bbab2 commit 404484c
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 133 deletions.
155 changes: 71 additions & 84 deletions blade/blade.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package blade

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"github.com/TylerBrock/colorjson"
"github.com/TylerBrock/saw/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/fatih/color"
)

Expand All @@ -21,98 +20,88 @@ type Blade struct {
config *config.Configuration
aws *config.AWSConfiguration
output *config.OutputConfiguration
cwl *cloudwatchlogs.CloudWatchLogs
cwl *cloudwatchlogs.Client
}

// NewBlade creates a new Blade with CloudWatchLogs instance from provided config
func NewBlade(
ctx context.Context,
config *config.Configuration,
awsConfig *config.AWSConfiguration,
outputConfig *config.OutputConfiguration,
) *Blade {
) (*Blade, error) {
blade := Blade{}
awsCfg := aws.Config{}

if awsConfig.Region != "" {
awsCfg.Region = &awsConfig.Region
}

awsSessionOpts := session.Options{
Config: awsCfg,
AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
SharedConfigState: session.SharedConfigEnable,
}

var opts []func(*awsconfig.LoadOptions) error
if awsConfig.Profile != "" {
awsSessionOpts.Profile = awsConfig.Profile
opts = append(opts, awsconfig.WithSharedConfigProfile(awsConfig.Profile))
}
if awsConfig.Region != "" {
opts = append(opts, awsconfig.WithRegion(awsConfig.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)

sess := session.Must(session.NewSessionWithOptions(awsSessionOpts))

blade.cwl = cloudwatchlogs.New(sess)
blade.cwl = cloudwatchlogs.NewFromConfig(awsCfg)
blade.config = config
blade.output = outputConfig

return &blade
return &blade, err
}

// GetLogGroups gets the log groups from AWS given the blade configuration
func (b *Blade) GetLogGroups() []*cloudwatchlogs.LogGroup {
func (b *Blade) GetLogGroups(ctx context.Context) (groups []types.LogGroup, err error) {
input := b.config.DescribeLogGroupsInput()
groups := make([]*cloudwatchlogs.LogGroup, 0)
b.cwl.DescribeLogGroupsPages(input, func(
out *cloudwatchlogs.DescribeLogGroupsOutput,
lastPage bool,
) bool {
for _, group := range out.LogGroups {
groups = append(groups, group)
logGroupsPaginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(b.cwl, input)
var page *cloudwatchlogs.DescribeLogGroupsOutput
for logGroupsPaginator.HasMorePages() {
page, err = logGroupsPaginator.NextPage(ctx)
if err != nil {
return
}
return !lastPage
})
return groups
groups = append(groups, page.LogGroups...)
}
return
}

// GetLogStreams gets the log streams from AWS given the blade configuration
func (b *Blade) GetLogStreams() []*cloudwatchlogs.LogStream {
func (b *Blade) GetLogStreams(ctx context.Context) (streams []types.LogStream, err error) {
input := b.config.DescribeLogStreamsInput()
streams := make([]*cloudwatchlogs.LogStream, 0)
b.cwl.DescribeLogStreamsPages(input, func(
out *cloudwatchlogs.DescribeLogStreamsOutput,
lastPage bool,
) bool {
for _, stream := range out.LogStreams {
streams = append(streams, stream)
logStreamsPaginator := cloudwatchlogs.NewDescribeLogStreamsPaginator(b.cwl, input)
var page *cloudwatchlogs.DescribeLogStreamsOutput
for logStreamsPaginator.HasMorePages() {
page, err = logStreamsPaginator.NextPage(ctx)
if err != nil {
return
}
return !lastPage
})

return streams
streams = append(streams, page.LogStreams...)
}
return
}

// GetEvents gets events from AWS given the blade configuration
func (b *Blade) GetEvents() {
func (b *Blade) GetEvents(ctx context.Context) (err error) {
formatter := b.output.Formatter()
input := b.config.FilterLogEventsInput()

handlePage := func(page *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
logEventsPaginator := cloudwatchlogs.NewFilterLogEventsPaginator(b.cwl, input)
var page *cloudwatchlogs.FilterLogEventsOutput
for logEventsPaginator.HasMorePages() {
page, err = logEventsPaginator.NextPage(ctx)
if err != nil {
return
}
for _, event := range page.Events {
if b.output.Pretty {
fmt.Println(formatEvent(formatter, event))
} else {
fmt.Println(*event.Message)
}
}
return !lastPage
}
err := b.cwl.FilterLogEventsPages(input, handlePage)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}
return
}

// StreamEvents continuously prints log events to the console
func (b *Blade) StreamEvents() {
func (b *Blade) StreamEvents(ctx context.Context) (err error) {
var lastSeenTime *int64
var seenEventIDs map[string]bool
formatter := b.output.Formatter()
Expand All @@ -133,47 +122,45 @@ func (b *Blade) StreamEvents() {
}
}

handlePage := func(page *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
for _, event := range page.Events {
updateLastSeenTime(event.Timestamp)
if _, seen := seenEventIDs[*event.EventId]; !seen {
var message string
if b.output.Raw {
message = *event.Message
} else {
message = formatEvent(formatter, event)
for {
logEventsPaginator := cloudwatchlogs.NewFilterLogEventsPaginator(b.cwl, input)
var page *cloudwatchlogs.FilterLogEventsOutput
for logEventsPaginator.HasMorePages() {
page, err = logEventsPaginator.NextPage(ctx)
if err != nil {
return
}
for _, event := range page.Events {
updateLastSeenTime(event.Timestamp)
if _, seen := seenEventIDs[*event.EventId]; !seen {
var message string
if b.output.Raw {
message = *event.Message
} else {
message = formatEvent(formatter, event)
}
message = strings.TrimRight(message, "\n")
fmt.Println(message)
addSeenEventIDs(event.EventId)
}
message = strings.TrimRight(message, "\n")
fmt.Println(message)
addSeenEventIDs(event.EventId)
}
}
return !lastPage
}

for {
err := b.cwl.FilterLogEventsPages(input, handlePage)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}
if lastSeenTime != nil {
input.SetStartTime(*lastSeenTime)
input.StartTime = lastSeenTime
}
time.Sleep(1 * time.Second)
}
}

// formatEvent returns a CloudWatch log event as a formatted string using the provided formatter
func formatEvent(formatter *colorjson.Formatter, event *cloudwatchlogs.FilteredLogEvent) string {
func formatEvent(formatter *colorjson.Formatter, event types.FilteredLogEvent) string {
red := color.New(color.FgRed).SprintFunc()
white := color.New(color.FgWhite).SprintFunc()

str := aws.StringValue(event.Message)
bytes := []byte(str)
date := aws.MillisecondsTimeValue(event.Timestamp)
dateStr := date.Format(time.RFC3339)
streamStr := aws.StringValue(event.LogStreamName)
str := *event.Message
bytes := []byte(*event.Message)
dateStr := time.UnixMilli(*event.Timestamp).Format(time.RFC3339)
streamStr := *event.LogStreamName
jl := map[string]interface{}{}

if err := json.Unmarshal(bytes, &jl); err != nil {
Expand Down
14 changes: 10 additions & 4 deletions cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,25 @@ var getCommand = &cobra.Command{
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) (err error) {
getConfig.Group = args[0]
b := blade.NewBlade(&getConfig, &awsConfig, &getOutputConfig)
b, err := blade.NewBlade(cmd.Context(), &getConfig, &awsConfig, &getOutputConfig)
if err != nil {
return
}
if getConfig.Prefix != "" {
streams := b.GetLogStreams()
streams, err := b.GetLogStreams(cmd.Context())
if err != nil {
return fmt.Errorf("failed to get log streams: %w", err)
}
if len(streams) == 0 {
fmt.Printf("No streams found in %s with prefix %s\n", getConfig.Group, getConfig.Prefix)
fmt.Printf("To view available streams: `saw streams %s`\n", getConfig.Group)
os.Exit(3)
}
getConfig.Streams = streams
}
b.GetEvents()
return b.GetEvents(cmd.Context())
},
}

Expand Down
13 changes: 10 additions & 3 deletions cmd/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ var groupsCommand = &cobra.Command{
Use: "groups",
Short: "List log groups",
Long: "",
Run: func(cmd *cobra.Command, args []string) {
b := blade.NewBlade(&groupsConfig, &awsConfig, nil)
logGroups := b.GetLogGroups()
RunE: func(cmd *cobra.Command, args []string) (err error) {
b, err := blade.NewBlade(cmd.Context(), &groupsConfig, &awsConfig, nil)
if err != nil {
return
}
logGroups, err := b.GetLogGroups(cmd.Context())
if err != nil {
return fmt.Errorf("failed to get log groups: %w", err)
}
for _, group := range logGroups {
fmt.Println(*group.LogGroupName)
}
return
},
}

Expand Down
13 changes: 10 additions & 3 deletions cmd/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ var streamsCommand = &cobra.Command{
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) (err error) {
streamsConfig.Group = args[0]
b := blade.NewBlade(&streamsConfig, &awsConfig, nil)
b, err := blade.NewBlade(cmd.Context(), &streamsConfig, &awsConfig, nil)
if err != nil {
return
}

logStreams := b.GetLogStreams()
logStreams, err := b.GetLogStreams(cmd.Context())
if err != nil {
return fmt.Errorf("failed to get log streams: %w", err)
}
for _, stream := range logStreams {
fmt.Println(*stream.LogStreamName)
}
return
},
}

Expand Down
14 changes: 10 additions & 4 deletions cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ var watchCommand = &cobra.Command{
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) (err error) {
watchConfig.Group = args[0]
b := blade.NewBlade(&watchConfig, &awsConfig, &watchOutputConfig)
b, err := blade.NewBlade(cmd.Context(), &watchConfig, &awsConfig, &watchOutputConfig)
if err != nil {
return
}
if watchConfig.Prefix != "" {
streams := b.GetLogStreams()
streams, err := b.GetLogStreams(cmd.Context())
if err != nil {
return fmt.Errorf("failed to get log streams: %w", err)
}
if len(streams) == 0 {
fmt.Printf("No streams found in %s with prefix %s\n", watchConfig.Group, watchConfig.Prefix)
fmt.Printf("To view available streams: `saw streams %s`\n", watchConfig.Group)
os.Exit(3)
}
watchConfig.Streams = streams
}
b.StreamEvents()
return b.StreamEvents(cmd.Context())
},
}

Expand Down
Loading

0 comments on commit 404484c

Please sign in to comment.