Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose mutation metrics to prometheus #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Exporter struct {
asyncMetricsURI string
eventsURI string
partsURI string
mutationsURI string
client *http.Client

scrapeFailures prometheus.Counter
Expand All @@ -52,12 +53,17 @@ func NewExporter(uri url.URL, insecure bool, user, password string) *Exporter {
partsURI := uri
q.Set("query", "select database, table, sum(bytes) as bytes, count() as parts, sum(rows) as rows from system.parts where active = 1 group by database, table")
partsURI.RawQuery = q.Encode()


mutationsURI := uri
q.Set("query", "select database, table, count() as mutations, sum(parts_to_do) as parts_to_do from system.mutations where is_done = 0 group by database, table")
mutationsURI.RawQuery = q.Encode()

return &Exporter{
metricsURI: metricsURI.String(),
asyncMetricsURI: asyncMetricsURI.String(),
eventsURI: eventsURI.String(),
partsURI: partsURI.String(),
mutationsURI: mutationsURI.String(),
scrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "exporter_scrape_failures_total",
Expand Down Expand Up @@ -172,6 +178,29 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) error {
newRowsMetric.Collect(ch)
}

mutations, err := e.parseMutationsResponse(e.mutationsURI)
if err != nil {
return fmt.Errorf("Error scraping clickhouse url %v: %v", e.mutationsURI, err)
}

for _, mut := range mutations {
newCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_mutations_count",
Help: "Number of mutations of the table",
}, []string{"database", "table"}).WithLabelValues(mut.database, mut.table)
newCountMetric.Set(float64(mut.mutations))
newCountMetric.Collect(ch)

newPartsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_mutations_parts",
Help: "Number of pending mutation parts to do of the table",
}, []string{"database", "table"}).WithLabelValues(mut.database, mut.table)
newPartsMetric.Set(float64(mut.partsToDo))
newPartsMetric.Collect(ch)
}

return nil
}

Expand All @@ -197,7 +226,7 @@ func (e *Exporter) handleResponse(uri string) ([]byte, error) {
}
return nil, fmt.Errorf("Status %s (%d): %s", resp.Status, resp.StatusCode, data)
}

return data, nil
}

Expand Down Expand Up @@ -285,6 +314,50 @@ func (e *Exporter) parsePartsResponse(uri string) ([]partsResult, error) {
return results, nil
}

type mutationsResult struct {
database string
table string
mutations int
partsToDo int
}

func (e *Exporter) parseMutationsResponse(uri string) ([]mutationsResult, error) {
data, err := e.handleResponse(uri)
if err != nil {
return nil, err
}

// Parsing results
lines := strings.Split(string(data), "\n")
var results = make([]mutationsResult, 0)

for i, line := range lines {
parts := strings.Fields(line)
if len(parts) == 0 {
continue
}
if len(parts) != 4 {
return nil, fmt.Errorf("parseMutationsResponse: unexpected %d line: %s", i, line)
}
database := strings.TrimSpace(parts[0])
table := strings.TrimSpace(parts[1])

mutations, err := strconv.Atoi(strings.TrimSpace(parts[2]))
if err != nil {
return nil, err
}

partsToDo, err := strconv.Atoi(strings.TrimSpace(parts[3]))
if err != nil {
return nil, err
}

results = append(results, mutationsResult{database, table, mutations, partsToDo})
}

return results, nil
}

// Collect fetches the stats from configured clickhouse location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
Expand Down