Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(local-ic): stream logs #1269

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions local-interchain/interchain/handlers/log_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package handlers

import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"net/http"
"os"
"time"

"go.uber.org/zap"
)

const tailLines = 25

type LogStream struct {
fName string
authKey string
logger *zap.Logger
}

func NewLogSteam(logger *zap.Logger, file string, authKey string) *LogStream {
return &LogStream{
fName: file,
authKey: authKey,
logger: logger,
}
}

func (ls *LogStream) StreamLogs(w http.ResponseWriter, r *http.Request) {
// Set headers to keep the connection open for SSE (Server-Sent Events)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

// ensure ?auth_key=<authKey> is provided
if r.URL.Query().Get("auth_key") != ls.authKey {
http.Error(w, "Unauthorized, incorrect or no ?auth_key= provided", http.StatusUnauthorized)
return
}

// Flush ensures data is sent to the client immediately
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}

// Open the log file
file, err := os.Open(ls.fName)
if err != nil {
http.Error(w, "Unable to open log file", http.StatusInternalServerError)
return
}
defer file.Close()

// Seek to the end of the file to read only new log entries
file.Seek(0, io.SeekEnd)

// Read new lines from the log file
reader := bufio.NewReader(file)

// print last out to the user on request (i.e. new connections)
tail := TailFile(ls.logger, ls.fName, tailLines)
for _, line := range tail {
fmt.Fprintf(w, "%s\n", line)
}
flusher.Flush()

for {
select {
// In case client closes the connection, break out of loop
case <-r.Context().Done():
return
default:
// Try to read a line
line, err := reader.ReadString('\n')
if err == nil {
// Send the log line to the client
fmt.Fprintf(w, "%s\n", line)
flusher.Flush() // Send to client immediately
} else {
// If no new log is available, wait for a short period before retrying
time.Sleep(100 * time.Millisecond)
}
}
}
}

func TailFile(logger *zap.Logger, logFile string, lines uint64) []string {
// read the last n lines of a file
file, err := os.Open(logFile)
if err != nil {
log.Fatal(err)
}
defer file.Close()

totalLines, err := lineCounter(file)
if err != nil {
log.Fatal(err)
}

if lines > uint64(totalLines) {
lines = uint64(totalLines)
}

file.Seek(0, io.SeekStart)
reader := bufio.NewReader(file)

var logs []string
for i := 0; i < int(totalLines)-int(lines); i++ {
_, _, err := reader.ReadLine()
if err != nil {
logger.Fatal("error reading log file", zap.Error(err))
}
}

for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
logs = append(logs, string(line))
}

return logs
}

func lineCounter(r io.Reader) (int, error) {
buf := make([]byte, 32*1024)
count := 0
lineSep := []byte{'\n'}

for {
c, err := r.Read(buf)
count += bytes.Count(buf[:c], lineSep)

switch {
case err == io.EOF:
return count, nil

case err != nil:
return count, err
}
}
}
32 changes: 19 additions & 13 deletions local-interchain/interchain/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,27 @@ func DumpChainsInfoToLogs(configDir string, config *types.Config, chains []ibc.C
}

// == Zap Logger ==
func getLoggerConfig() zap.Config {
config := zap.NewDevelopmentConfig()
func InitLogger(logFile *os.File) (*zap.Logger, error) {
// config := zap.NewDevelopmentConfig()
// config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
// config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
// logger, err := config.Build()
// if err != nil {
// return nil, err
// }

config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
// Production logger that saves logs to file and console.
pe := zap.NewProductionEncoderConfig()

return config
}
fileEncoder := zapcore.NewJSONEncoder(pe)
consoleEncoder := zapcore.NewConsoleEncoder(pe)

func InitLogger() (*zap.Logger, error) {
config := getLoggerConfig()
logger, err := config.Build()
if err != nil {
return nil, err
}
level := zap.InfoLevel

core := zapcore.NewTee(
zapcore.NewCore(fileEncoder, zapcore.AddSync(logFile), level),
zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), level),
)

return logger, nil
return zap.New(core), nil
}
31 changes: 21 additions & 10 deletions local-interchain/interchain/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/strangelove-ventures/interchaintest/v8"
"github.com/strangelove-ventures/interchaintest/v8/chain/cosmos"
"github.com/strangelove-ventures/interchaintest/v8/ibc"
"go.uber.org/zap"

"github.com/strangelove-ventures/interchaintest/local-interchain/interchain/handlers"
)
Expand All @@ -23,22 +24,32 @@ type Route struct {
Methods []string `json:"methods" yaml:"methods"`
}

type RouterConfig struct {
ibc.RelayerExecReporter

Config *ictypes.Config
CosmosChains map[string]*cosmos.CosmosChain
Vals map[string][]*cosmos.ChainNode
Relayer ibc.Relayer
AuthKey string
InstallDir string
LogFile string
Logger *zap.Logger
}

func NewRouter(
ctx context.Context,
ic *interchaintest.Interchain,
config *ictypes.Config,
cosmosChains map[string]*cosmos.CosmosChain,
vals map[string][]*cosmos.ChainNode,
relayer ibc.Relayer,
authKey string,
eRep ibc.RelayerExecReporter,
installDir string,
rc *RouterConfig,
) *mux.Router {
r := mux.NewRouter()

infoH := handlers.NewInfo(config, installDir, ctx, ic, cosmosChains, vals, relayer, eRep)
infoH := handlers.NewInfo(rc.Config, rc.InstallDir, ctx, ic, rc.CosmosChains, rc.Vals, rc.Relayer, rc.RelayerExecReporter)
r.HandleFunc("/info", infoH.GetInfo).Methods(http.MethodGet)

logStream := handlers.NewLogSteam(rc.Logger, rc.LogFile, rc.AuthKey)
r.HandleFunc("/logs", logStream.StreamLogs).Methods(http.MethodGet)

wd, err := os.Getwd()
if err != nil {
panic(err)
Expand All @@ -60,10 +71,10 @@ func NewRouter(
log.Printf("chain_registry_assets.json not found in %s, not exposing endpoint.", wd)
}

actionsH := handlers.NewActions(ctx, ic, cosmosChains, vals, relayer, eRep, authKey)
actionsH := handlers.NewActions(ctx, ic, rc.CosmosChains, rc.Vals, rc.Relayer, rc.RelayerExecReporter, rc.AuthKey)
r.HandleFunc("/", actionsH.PostActions).Methods(http.MethodPost)

uploaderH := handlers.NewUploader(ctx, vals, authKey)
uploaderH := handlers.NewUploader(ctx, rc.Vals, rc.AuthKey)
r.HandleFunc("/upload", uploaderH.PostUpload).Methods(http.MethodPost)

availableRoutes := getAllMethods(*r)
Expand Down
52 changes: 40 additions & 12 deletions local-interchain/interchain/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package interchain
import (
"context"
"fmt"
"log"
"math"
"net/http"
"os"
Expand All @@ -12,7 +11,9 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/google/uuid"
"github.com/gorilla/handlers"
"github.com/strangelove-ventures/interchaintest/local-interchain/interchain/router"
"github.com/strangelove-ventures/interchaintest/local-interchain/interchain/types"
Expand Down Expand Up @@ -53,11 +54,27 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
}
}()

// very unique file to ensure if multiple start at the same time.
logFile, err := interchaintest.CreateLogFile(fmt.Sprintf("%d-%s.json", time.Now().Unix(), uuid.New()))
if err != nil {
panic(err)
}
defer func() {
if err := logFile.Close(); err != nil {
fmt.Println("Error closing log file: ", err)
}

if err := os.Remove(logFile.Name()); err != nil {
fmt.Println("Error deleting log file: ", err)
}
}()

// Logger for ICTest functions only.
logger, err := InitLogger()
logger, err := InitLogger(logFile)
if err != nil {
panic(err)
}
logger.Info("Log file created", zap.String("file", logFile.Name()))

config := ac.Cfg

Expand Down Expand Up @@ -88,7 +105,7 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
}

if err := VerifyIBCPaths(ibcpaths); err != nil {
log.Fatal("VerifyIBCPaths", err)
logger.Fatal("VerifyIBCPaths", zap.Error(err))
}

// Create chain factory for all the chains
Expand All @@ -98,7 +115,7 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {

chains, err := cf.Chains(testName)
if err != nil {
log.Fatal("cf.Chains", err)
logger.Fatal("cf.Chains", zap.Error(err))
}

for _, chain := range chains {
Expand All @@ -111,7 +128,8 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
}

// Base setup
rep := testreporter.NewNopReporter()

rep := testreporter.NewReporter(logFile)
eRep = rep.RelayerExecReporter(&fakeT)

client, network := interchaintest.DockerSetup(fakeT)
Expand Down Expand Up @@ -181,7 +199,7 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
SkipPathCreation: false,
})
if err != nil {
log.Fatalf("ic.Build: %v", err)
logger.Fatal("ic.Build", zap.Error(err))
}

if relayer != nil && len(ibcpaths) > 0 {
Expand All @@ -191,12 +209,12 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
}

if err := relayer.StartRelayer(ctx, eRep, paths...); err != nil {
log.Fatal("relayer.StartRelayer", err)
logger.Fatal("relayer.StartRelayer", zap.Error(err))
}

defer func() {
if err := relayer.StopRelayer(ctx, eRep); err != nil {
log.Fatal("relayer.StopRelayer", err)
logger.Error("relayer.StopRelayer", zap.Error(err))
}
}()
}
Expand All @@ -215,7 +233,7 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
for ibcPath, chain := range icsProviderPaths {
if provider, ok := chain.(*cosmos.CosmosChain); ok {
if err := provider.FinishICSProviderSetup(ctx, relayer, eRep, ibcPath); err != nil {
log.Fatal("FinishICSProviderSetup", err)
logger.Error("FinishICSProviderSetup", zap.Error(err))
}
}
}
Expand All @@ -230,7 +248,17 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
}
}

r := router.NewRouter(ctx, ic, config, cosmosChains, vals, relayer, ac.AuthKey, eRep, installDir)
r := router.NewRouter(ctx, ic, &router.RouterConfig{
RelayerExecReporter: eRep,
Config: config,
CosmosChains: cosmosChains,
Vals: vals,
Relayer: relayer,
AuthKey: ac.AuthKey,
InstallDir: installDir,
LogFile: logFile.Name(),
Logger: logger,
})

config.Server = types.RestServer{
Host: ac.Address,
Expand All @@ -256,7 +284,7 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
)

if err := http.ListenAndServe(serverAddr, corsHandler(r)); err != nil {
log.Default().Println(err)
logger.Error("http.ListenAndServe", zap.Error(err))
}
}()

Expand All @@ -270,7 +298,7 @@ func StartChain(installDir, chainCfgFile string, ac *types.AppStartConfig) {
// Save to logs.json file for runtime chain information.
DumpChainsInfoToLogs(installDir, config, chains, connections)

log.Println("\nLocal-IC API is running on ", fmt.Sprintf("http://%s:%s", config.Server.Host, config.Server.Port))
logger.Info("Local-IC API is running on ", zap.String("url", fmt.Sprintf("http://%s:%s", config.Server.Host, config.Server.Port)))

if err = testutil.WaitForBlocks(ctx, math.MaxInt, chains[0]); err != nil {
// when the network is stopped / killed (ctrl + c), ignore error
Expand Down
Loading