Skip to content

Commit

Permalink
Add synchronous jobs
Browse files Browse the repository at this point in the history
We needed a possibility to execute some jobs / queries synchronously every time
/metrics is requested. So I added the infrastructure to execute jobs with an
interval <= 0 synchronously when the http-handler is called.

interval: '0s' # an interval <= 0 will make the queries synchronous

Some typos fixed by Felix Hillingshaeuser, thanks!
  • Loading branch information
Alexander Sosna authored and mbanck-ntap committed Jul 17, 2024
1 parent 0001728 commit be16a1f
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ jobs:
# each job needs a unique name, it's used for logging and as a default label
- name: "example"
# interval defined the pause between the runs of this job
# set to 0 to make the queries synchronous
interval: '5m'
# cron_schedule when to execute the job in the standard CRON syntax
# if specified, the interval is ignored
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (c *cronConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type Job struct {
log log.Logger
conns []*connection
Trigger chan bool // used to trigger execution
Done chan bool // used to tell state
Name string `yaml:"name"` // name of this job
KeepAlive bool `yaml:"keepalive"` // keep connection between runs?
Interval time.Duration `yaml:"interval"` // interval at which this job is run
Expand Down
35 changes: 35 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)

// handlerFunc can be used as handler for http.HandleFunc()
// all synchronous jobs will be triggered and waited for,
// then the promhttp handler is executed
func (ex *Exporter) handlerFunc(w http.ResponseWriter, req *http.Request) {
// pull all triggers on jobs with interval 0
for _, job := range ex.jobs {
// if job is nil or is async then continue to next job
if job == nil || job.Interval > 0 {
continue
}
job.Trigger <- true
}

// wait for all sync jobs to finish
for _, job := range ex.jobs {
if job == nil || job.Interval > 0 {
continue
}
<-job.Done
}

// get the prometheus handler
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{})

// execute the ServeHTTP function
handler.ServeHTTP(w, req)
}
37 changes: 30 additions & 7 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
}

func (j *Job) updateConnections() {
// if the interval is not set > 0, create needed channels
if j.Interval <= 0 {
if j.Trigger == nil {
j.Trigger = make(chan bool)
}

if j.Done == nil {
j.Done = make(chan bool)
}
}
// if there are no connection URLs for this job it can't be run
if j.Connections == nil {
level.Error(j.log).Log("msg", "no connections for job", "job_name", j.Name)
Expand Down Expand Up @@ -405,13 +415,26 @@ func (j *Job) markFailed(conn *connection) {

// Run the job queries with exponential backoff, implements the cron.Job interface
func (j *Job) Run() {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
if bo.MaxElapsedTime == 0 {
bo.MaxElapsedTime = time.Minute
}
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
// if the interval is 0 or lower, wait to be triggered
if j.Interval <= 0 {
// wait for trigger
<-j.Trigger
if err := j.runOnce(); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}

// send true into done channel
j.Done <- true
} else {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
level.Info(j.log).Log("msg", "In Run()")
if bo.MaxElapsedTime == 0 {
bo.MaxElapsedTime = time.Minute
}
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
)

Expand Down Expand Up @@ -62,8 +61,8 @@ func main() {
}
prometheus.MustRegister(exporter)

// setup and start webserver
http.Handle(*metricsPath, promhttp.Handler())
// setup and start webserver with custom function
http.HandleFunc(*metricsPath, exporter.handlerFunc)
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { http.Error(w, "OK", http.StatusOK) })
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
Expand Down

0 comments on commit be16a1f

Please sign in to comment.