-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements feature to stream pod logs in Loki #2
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,7 @@ package runner | |
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"context" | ||
"io" | ||
|
||
"github.com/go-kit/log" | ||
corev1 "k8s.io/api/core/v1" | ||
|
@@ -48,8 +46,7 @@ func NewRunner(params RunnerParams) (Runner, error) { | |
cs: params.ClientSet}, nil | ||
} | ||
|
||
func (r *runner) Run(ctx context.Context) error { | ||
|
||
func (r *runner) Run(cancelCtx context.Context) error { | ||
err := loki.BringUpPod() | ||
if err != nil { | ||
return err | ||
|
@@ -60,68 +57,61 @@ func (r *runner) Run(ctx context.Context) error { | |
return err | ||
} | ||
|
||
namespaceList, err := r.cs.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) | ||
// get list of pods from all namespaces | ||
podList, err := r.cs.CoreV1().Pods("").List(cancelCtx, metav1.ListOptions{}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, ns := range namespaceList.Items { | ||
podList, err := r.cs.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{}) | ||
if err != nil { | ||
return err | ||
} | ||
for _, pod := range podList.Items { | ||
go r.streamPodLogs(cancelCtx, r.cs, pod) | ||
|
||
for _, pod := range podList.Items { | ||
|
||
for _, container := range pod.Spec.Containers { | ||
req := r.cs.CoreV1().Pods(ns.Name).GetLogs(pod.Name, &corev1.PodLogOptions{ | ||
Timestamps: true, | ||
Container: container.Name, | ||
}) | ||
|
||
podLogs, err := req.Stream(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
defer podLogs.Close() | ||
|
||
buf := new(bytes.Buffer) | ||
|
||
_, err = io.Copy(buf, podLogs) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
labels := make(map[string]string) | ||
labels["namespace"] = ns.Name | ||
labels["pod_name"] = pod.Name | ||
labels["container_name"] = container.Name | ||
|
||
err = r.loadLogsToLoki(buf, parser.NewContainerParser(), labels) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *runner) loadLogsToLoki(rawLogs *bytes.Buffer, logParser parser.Parser, labels map[string]string) error { | ||
func (r *runner) loadLogsToLoki(logLine string, logParser parser.Parser, labels map[string]string) error { | ||
tm, labelset, err := logParser.Parse(logLine, labels) | ||
if err != nil { | ||
r.logger.Log("Skipping log due to invalid parse", "Error", err.Error()) | ||
return err | ||
} | ||
r.lokiClient.PostLog(logLine, tm, labelset) | ||
|
||
scanner := bufio.NewScanner(rawLogs) | ||
scanner.Split(bufio.ScanLines) | ||
return nil | ||
} | ||
|
||
for scanner.Scan() { | ||
log_line := scanner.Text() | ||
tm, labels, err := logParser.Parse(log_line, labels) | ||
// streamPodLogs will stream the pod logs and load the logs to loki with relevant | ||
// labels, loglines and timestamp | ||
func (r *runner) streamPodLogs(cancelCtx context.Context, cs kubernetes.Interface, pod corev1.Pod) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zawachte I want to know if I can utilize the |
||
for _, container := range pod.Spec.Containers { | ||
podLogOptions := &corev1.PodLogOptions{ | ||
Follow: true, | ||
Timestamps: true, | ||
} | ||
|
||
req := cs.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, podLogOptions) | ||
stream, err := req.Stream(cancelCtx) | ||
if err != nil { | ||
r.logger.Log("Skipping log due to invalid parse", "Error", err.Error()) | ||
continue | ||
return err | ||
} | ||
r.lokiClient.PostLog(log_line, tm, labels) | ||
} | ||
|
||
reader := bufio.NewScanner(stream) | ||
reader.Split(bufio.ScanLines) | ||
defer stream.Close() | ||
|
||
for reader.Scan() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because the |
||
labels := make(map[string]string) | ||
labels["namespace"] = pod.Namespace | ||
labels["pod_name"] = pod.Name | ||
labels["container_name"] = container.Name | ||
|
||
logLine := reader.Text() | ||
|
||
// ignore the error and continue reading stream & loading to loki | ||
_ = r.loadLogsToLoki(logLine, parser.NewContainerParser(), labels) | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have created this new context with the timeout set to stream-duration value which is used across the methods & functions here.