Skip to content

Commit

Permalink
batch update kmesh metrics to prometheus
Browse files Browse the repository at this point in the history
Signed-off-by: LiZhenCheng9527 <[email protected]>
  • Loading branch information
LiZhenCheng9527 committed Aug 31, 2024
1 parent 835d85e commit 423688c
Show file tree
Hide file tree
Showing 2 changed files with 471 additions and 48 deletions.
209 changes: 184 additions & 25 deletions pkg/controller/telemetry/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ var osStartTime time.Time

type MetricController struct {
workloadCache cache.WorkloadCache
metricCache metricInfoCache
}

type metricInfoCache struct {
WorkloadConnOpened map[workloadMetricLabels]float64
WorkloadConnClosed map[workloadMetricLabels]float64
WorkloadConnSentBytes map[workloadMetricLabels]float64
WorkloadConnReceivedBytes map[workloadMetricLabels]float64
WorkloadConnFailed map[workloadMetricLabels]float64
ServiceConnOpened map[serviceMetricLabels]float64
ServiceConnClosed map[serviceMetricLabels]float64
ServiceConnSentBytes map[serviceMetricLabels]float64
ServiceConnReceivedBytes map[serviceMetricLabels]float64
ServiceConnFailed map[serviceMetricLabels]float64
}

type connectionDataV4 struct {
Expand Down Expand Up @@ -154,6 +168,22 @@ type serviceMetricLabels struct {
func NewMetric(workloadCache cache.WorkloadCache) *MetricController {
return &MetricController{
workloadCache: workloadCache,
metricCache: newMetricCache(),
}
}

func newMetricCache() metricInfoCache {
return metricInfoCache{
WorkloadConnOpened: map[workloadMetricLabels]float64{},
WorkloadConnClosed: map[workloadMetricLabels]float64{},
WorkloadConnSentBytes: map[workloadMetricLabels]float64{},
WorkloadConnReceivedBytes: map[workloadMetricLabels]float64{},
WorkloadConnFailed: map[workloadMetricLabels]float64{},
ServiceConnOpened: map[serviceMetricLabels]float64{},
ServiceConnClosed: map[serviceMetricLabels]float64{},
ServiceConnSentBytes: map[serviceMetricLabels]float64{},
ServiceConnReceivedBytes: map[serviceMetricLabels]float64{},
ServiceConnFailed: map[serviceMetricLabels]float64{},
}
}

Expand Down Expand Up @@ -181,6 +211,21 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {

// Register metrics to Prometheus and start Prometheus server
go RunPrometheusClient(ctx)
go func() {
for {
select {
case <-ctx.Done():
return
default:
// Metrics updated every 3 seconds
time.Sleep(3 * time.Second)
err := m.updatePrometheusMetric()
if err != nil {
log.Errorf("update Kmesh metrics failed: %v", err)
}
}
}
}()

for {
select {
Expand Down Expand Up @@ -231,8 +276,8 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
if data.state == TCP_CLOSTED {
OutputAccesslog(data, accesslog)
}
buildWorkloadMetricsToPrometheus(data, workloadLabels)
buildServiceMetricsToPrometheus(data, serviceLabels)
m.buildWorkloadMetricsToPrometheus(data, workloadLabels)
m.buildServiceMetricsToPrometheus(data, serviceLabels)
}
}
}
Expand Down Expand Up @@ -429,36 +474,150 @@ func buildPrincipal(workload *workloadapi.Workload) string {
return "-"
}

func buildWorkloadMetricsToPrometheus(data requestMetric, labels workloadMetricLabels) {
commonLabels := struct2map(labels)

if data.state == TCP_ESTABLISHED {
tcpConnectionOpenedInWorkload.With(commonLabels).Add(float64(1))
}
if data.state == TCP_CLOSTED {
tcpConnectionClosedInWorkload.With(commonLabels).Add(float64(1))
func (m *MetricController) buildWorkloadMetricsToPrometheus(data requestMetric, labels workloadMetricLabels) {
// commonLabels := struct2map(labels)

// if data.state == TCP_ESTABLISHED {
// tcpConnectionOpenedInWorkload.With(commonLabels).Add(float64(1))
// }
// if data.state == TCP_CLOSTED {
// tcpConnectionClosedInWorkload.With(commonLabels).Add(float64(1))
// }
// if data.success != connection_success {
// tcpConnectionFailedInWorkload.With(commonLabels).Add(float64(1))
// }
// tcpReceivedBytesInWorkload.With(commonLabels).Add(float64(data.receivedBytes))
// tcpSentBytesInWorkload.With(commonLabels).Add(float64(data.sentBytes))
_, ok := m.metricCache.WorkloadConnReceivedBytes[labels]
if ok {
if data.state == TCP_ESTABLISHED {
m.metricCache.WorkloadConnOpened[labels] = m.metricCache.WorkloadConnOpened[labels] + 1
}
if data.state == TCP_CLOSTED {
m.metricCache.WorkloadConnClosed[labels] = m.metricCache.WorkloadConnClosed[labels] + 1
}
if data.success != connection_success {
m.metricCache.WorkloadConnFailed[labels] = m.metricCache.WorkloadConnFailed[labels] + 1
}
m.metricCache.WorkloadConnReceivedBytes[labels] = m.metricCache.WorkloadConnReceivedBytes[labels] + float64(data.receivedBytes)
m.metricCache.WorkloadConnSentBytes[labels] = m.metricCache.WorkloadConnSentBytes[labels] + float64(data.sentBytes)
} else {
if data.state == TCP_ESTABLISHED {
m.metricCache.WorkloadConnOpened[labels] = 1
}
if data.state == TCP_CLOSTED {
m.metricCache.WorkloadConnClosed[labels] = 1
}
if data.success != connection_success {
m.metricCache.WorkloadConnFailed[labels] = 1
}
m.metricCache.WorkloadConnReceivedBytes[labels] = float64(data.receivedBytes)
m.metricCache.WorkloadConnSentBytes[labels] = float64(data.sentBytes)
}
if data.success != connection_success {
tcpConnectionFailedInWorkload.With(commonLabels).Add(float64(1))
}

func (m *MetricController) buildServiceMetricsToPrometheus(data requestMetric, labels serviceMetricLabels) {
// commonLabels := struct2map(labels)

// if data.state == TCP_ESTABLISHED {
// tcpConnectionOpenedInService.With(commonLabels).Add(float64(1))
// }
// if data.state == TCP_CLOSTED {
// tcpConnectionClosedInService.With(commonLabels).Add(float64(1))
// }
// if data.success != uint32(1) {
// tcpConnectionFailedInService.With(commonLabels).Add(float64(1))
// }
// tcpReceivedBytesInService.With(commonLabels).Add(float64(data.receivedBytes))
// tcpSentBytesInService.With(commonLabels).Add(float64(data.sentBytes))
_, ok := m.metricCache.ServiceConnReceivedBytes[labels]
if ok {
if data.state == TCP_ESTABLISHED {
m.metricCache.ServiceConnOpened[labels] = m.metricCache.ServiceConnOpened[labels] + 1
}
if data.state == TCP_CLOSTED {
m.metricCache.ServiceConnClosed[labels] = m.metricCache.ServiceConnClosed[labels] + 1
}
if data.success != connection_success {
m.metricCache.ServiceConnFailed[labels] = m.metricCache.ServiceConnFailed[labels] + 1
}
m.metricCache.ServiceConnReceivedBytes[labels] = m.metricCache.ServiceConnReceivedBytes[labels] + float64(data.receivedBytes)
m.metricCache.ServiceConnSentBytes[labels] = m.metricCache.ServiceConnSentBytes[labels] + float64(data.sentBytes)
} else {
if data.state == TCP_ESTABLISHED {
m.metricCache.ServiceConnOpened[labels] = 1
}
if data.state == TCP_CLOSTED {
m.metricCache.ServiceConnClosed[labels] = 1
}
if data.success != connection_success {
m.metricCache.ServiceConnFailed[labels] = 1
}
m.metricCache.ServiceConnReceivedBytes[labels] = float64(data.receivedBytes)
m.metricCache.ServiceConnSentBytes[labels] = float64(data.sentBytes)
}
tcpReceivedBytesInWorkload.With(commonLabels).Add(float64(data.receivedBytes))
tcpSentBytesInWorkload.With(commonLabels).Add(float64(data.sentBytes))
}

func buildServiceMetricsToPrometheus(data requestMetric, labels serviceMetricLabels) {
commonLabels := struct2map(labels)
func (m *MetricController) updatePrometheusMetric() error {
val := reflect.ValueOf(m.metricCache)
typ := reflect.TypeOf(m.metricCache)

// check if has pointer in struct
if typ.Kind() == reflect.Ptr {
val = val.Elem()
typ = typ.Elem()

if data.state == TCP_ESTABLISHED {
tcpConnectionOpenedInService.With(commonLabels).Add(float64(1))
}
if data.state == TCP_CLOSTED {
tcpConnectionClosedInService.With(commonLabels).Add(float64(1))
}
if data.success != uint32(1) {
tcpConnectionFailedInService.With(commonLabels).Add(float64(1))
num := val.NumField()
for i := 0; i < num; i++ {
sType := typ.Field(i)
sVal := val.Field(i).Interface()
workloadMap, isWorkload := sVal.(map[workloadMetricLabels]float64)
// fmt.Printf("\n ------- %v, \n%v -------- \n", workloadMap, isWorkload)
if isWorkload {
for k, v := range workloadMap {
name := sType.Name
commonLabels := struct2map(k)
// fmt.Printf("name is %v, value is: %v", name, v)
switch name {
case "WorkloadConnOpened":
tcpConnectionOpenedInWorkload.With(commonLabels).Set(float64(v))
case "WorkloadConnClosed":
tcpConnectionClosedInWorkload.With(commonLabels).Set(float64(v))
case "WorkloadConnSentBytes":
tcpSentBytesInWorkload.With(commonLabels).Set(float64(v))
case "WorkloadConnReceivedBytes":
tcpReceivedBytesInWorkload.With(commonLabels).Set(float64(v))
case "WorkloadConnFailed":
tcpConnectionFailedInWorkload.With(commonLabels).Set(float64(v))
}
}
}
serviceMap, isService := sVal.(map[serviceMetricLabels]float64)
// fmt.Printf("\n ------- %v, \n%v -------- \n", serviceMap, isService)
if isService {
for k, v := range serviceMap {
name := sType.Name
commonLabels := struct2map(k)
switch name {
case "ServiceConnOpened":
tcpConnectionOpenedInService.With(commonLabels).Set(float64(v))
case "ServiceConnClosed":
tcpConnectionClosedInService.With(commonLabels).Set(float64(v))
case "ServiceConnSentBytes":
tcpSentBytesInService.With(commonLabels).Set(float64(v))
case "ServiceConnReceivedBytes":
tcpReceivedBytesInService.With(commonLabels).Set(float64(v))
case "ServiceConnFailed":
tcpConnectionFailedInService.With(commonLabels).Set(float64(v))
}
}
}
if !isWorkload && !isService {
return fmt.Errorf("get metricCahce data failed")
}
}
tcpReceivedBytesInService.With(commonLabels).Add(float64(data.receivedBytes))
tcpSentBytesInService.With(commonLabels).Add(float64(data.sentBytes))
return nil
}

func struct2map(labels interface{}) map[string]string {
Expand Down
Loading

0 comments on commit 423688c

Please sign in to comment.