Skip to content

Commit

Permalink
Implement unix socket for syslog (#333)
Browse files Browse the repository at this point in the history
* Implement unix socket for syslog

* Add test case

* Refactor cleanup logic back into syslog module

---------

Co-authored-by: Martin Helmich <[email protected]>
  • Loading branch information
jkroepke and martin-helmich authored Jul 15, 2023
1 parent 69e2807 commit 950e9e4
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 26 deletions.
2 changes: 1 addition & 1 deletion example-config.hcl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
listen {
port = 4040

// "metrics_endpoint" can be used to configure an alternative metrics URL
// path. Default value is "/metrics".
//
Expand Down
34 changes: 34 additions & 0 deletions features/steps/steps.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import logging.handlers
import os
import subprocess
import time

Expand Down Expand Up @@ -80,6 +82,21 @@ def step_impl(context, port):
time.sleep(.5)


@when(u'the following HTTP request is logged to syslog on socket {socket}')
@when(u'the following HTTP requests are logged to syslog on socket {socket}')
def step_impl(context, socket: str):
log = logging.getLogger('test')
log_handler = logging.handlers.SysLogHandler(
address=socket, facility=logging.handlers.SysLogHandler.LOG_USER)
log.addHandler(log_handler)

lines = [l for l in context.text.split("\n") if l != ""]
for l in lines:
log.info(l)

time.sleep(.5)


@then(u'the exporter should report value {val} for metric {metric}')
@then(u'the exporter should on "{endpoint}" report value {val} for metric {metric}')
def step_impl(context, val, metric, endpoint='/metrics'):
Expand All @@ -105,3 +122,20 @@ def step_impl(context):
rc = context.process.poll()
if rc is not None:
raise AssertionError(f"The process has exited with return code {rc}")

def current_process(context) -> subprocess.Popen:
return context.process

@when("the exporter is stopped")
def step_impl(context):
p = current_process(context)
p.terminate()
p.wait(10)

if p.poll() is None:
raise AssertionError("The process is still running")

@then("the socket {socket} should not exist")
def step_impl(context, socket):
if os.path.exists(socket):
raise AssertionError(f"The socket {socket} exists")
10 changes: 10 additions & 0 deletions features/syslog.feature
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,13 @@ Feature: Can read log entries from syslog
172.17.0.1 - - [23/Jun/2016:16:04:20 +0000] "GET / HTTP/1.1" 200 612 "-" "curl/7.29.0" "-"
"""
Then the exporter should report value 1 for metric nginx_http_response_count_total{method="GET",status="200"}

Scenario: Read from syslog running on a UNIX socket
Given a running exporter listening with configuration file "test-config-syslog-unix.yaml"
When the following HTTP request is logged to syslog on socket /tmp/syslog.sock
"""
172.17.0.1 - - [23/Jun/2016:16:04:20 +0000] "GET / HTTP/1.1" 200 612 "-" "curl/7.29.0" "-"
"""
Then the exporter should report value 1 for metric nginx_http_response_count_total{method="GET",status="200"}
When the exporter is stopped
Then the socket /tmp/syslog.sock should not exist
11 changes: 11 additions & 0 deletions features/test-config-syslog-unix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
listen:
port: 4040

namespaces:
- name: nginx
source:
syslog:
listen_address: unix:///tmp/syslog.sock
tags:
- ""
format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\""
18 changes: 15 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {

fmt.Printf("starting listener for namespace %s\n", namespace.Name)
go func(ns *config.NamespaceConfig) {
processNamespace(ns, &(nsMetrics.Collection))
processNamespace(ns, &(nsMetrics.Collection), stopChan, &stopHandlers)
}(namespace)
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func setupConsul(cfg *config.Config, stopChan <-chan bool, stopHandlers *sync.Wa
stopHandlers.Add(1)
}

func processNamespace(nsCfg *config.NamespaceConfig, metrics *metrics.Collection) error {
func processNamespace(nsCfg *config.NamespaceConfig, metrics *metrics.Collection, stopChan <-chan bool, stopHandlers *sync.WaitGroup) error {
var followers []tail.Follower

parser := parser.NewParser(nsCfg)
Expand All @@ -212,11 +212,23 @@ func processNamespace(nsCfg *config.NamespaceConfig, metrics *metrics.Collection
slCfg := nsCfg.SourceData.Syslog

fmt.Printf("running Syslog server on address %s\n", slCfg.ListenAddress)
channel, server, err := syslog.Listen(slCfg.ListenAddress, slCfg.Format)
channel, server, closeServer, err := syslog.Listen(slCfg.ListenAddress, slCfg.Format)
if err != nil {
panic(err)
}

stopHandlers.Add(1)

go func() {
<-stopChan

if err := closeServer(); err != nil {
fmt.Printf("error while closing syslog server: %s\n", err.Error())
}

stopHandlers.Done()
}()

for _, f := range slCfg.Tags {
t, err := tail.NewSyslogFollower(f, server, channel)
if err != nil {
Expand Down
52 changes: 30 additions & 22 deletions pkg/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,43 @@ package syslog
import (
"fmt"
"net/url"
"os"

"gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format"
)

func openListener(s *syslog.Server, c string) error {
func openListener(s *syslog.Server, c string) (func() error, error) {
u, err := url.Parse(c)
if err != nil {
return err
return nil, err
}

switch u.Scheme {
case "tcp":
err := s.ListenTCP(u.Host)
if err != nil {
return err
}
return nil, s.ListenTCP(u.Host)

case "udp":
err := s.ListenUDP(u.Host)
if err != nil {
return err
}
return nil, s.ListenUDP(u.Host)

case "unix":
return fmt.Errorf("Not implemented")
socketPath := u.Host + u.Path

if err := s.ListenUnixgram(socketPath); err != nil {
return nil, err
}

return func() error {
return os.Remove(socketPath)
}, nil

default:
return fmt.Errorf("syslog server should be in format unix/tcp/udp://127.0.0.1:5533")
return nil, fmt.Errorf("syslog server should be in format unix/tcp/udp://127.0.0.1:5533")
}

return nil
}

// Listen opens up a new syslog server on either a TCP or UDP port
func Listen(conn string, formatSpec string) (syslog.LogPartsChannel, *syslog.Server, error) {
func Listen(conn string, formatSpec string) (syslog.LogPartsChannel, *syslog.Server, func() error, error) {
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)

Expand All @@ -58,22 +59,29 @@ func Listen(conn string, formatSpec string) (syslog.LogPartsChannel, *syslog.Ser
case "":
format = syslog.Automatic
default:
return nil, nil, fmt.Errorf("unknown syslog format: '%s'", format)
return nil, nil, nil, fmt.Errorf("unknown syslog format: '%s'", format)
}

//RFC3164 or RFC5424 or RFC6587. nginx works on RFC3164
server.SetFormat(format)
server.SetHandler(handler)

err := openListener(server, conn)
closeListener, err := openListener(server, conn)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

err = server.Boot()
if err != nil {
return nil, nil, err
if err = server.Boot(); err != nil {
return nil, nil, nil, err
}

stopFn := func() error {
if err := server.Kill(); err != nil {
return fmt.Errorf("failed to kill syslog server: %w", err)
}

return closeListener()
}

return channel, server, nil
return channel, server, stopFn, nil
}

0 comments on commit 950e9e4

Please sign in to comment.