Skip to content

Commit

Permalink
Migrate to official docker client
Browse files Browse the repository at this point in the history
Logs and Exec are commented
  • Loading branch information
stokito committed Nov 27, 2020
1 parent 117c3bc commit 87b6ba3
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 73 deletions.
47 changes: 29 additions & 18 deletions connector/collector/docker.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package collector

import (
"context"
"encoding/json"
"github.com/bcicen/ctop/models"
api "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"io"
)

// Docker collector
type Docker struct {
models.Metrics
id string
client *api.Client
client *client.Client
running bool
stream chan models.Metrics
done chan bool
lastCpu float64
lastSysCpu float64
}

func NewDocker(client *api.Client, id string) *Docker {
func NewDocker(client *client.Client, id string) *Docker {
return &Docker{
Metrics: models.Metrics{},
id: id,
Expand All @@ -28,17 +32,24 @@ func NewDocker(client *api.Client, id string) *Docker {
func (c *Docker) Start() {
c.done = make(chan bool)
c.stream = make(chan models.Metrics)
stats := make(chan *api.Stats)
stats := make(chan *types.StatsJSON)

go func() {
opts := api.StatsOptions{
ID: c.id,
Stats: stats,
Stream: true,
Done: c.done,
ctx := context.Background()
ss, err := c.client.ContainerStats(ctx, c.id, true)
if err == nil {
c.running = false
}
decoder := json.NewDecoder(ss.Body)
cStats := new(types.StatsJSON)

for err := decoder.Decode(cStats); err != io.EOF; err = decoder.Decode(cStats) {
if err != nil {
break
}
stats <- cStats
cStats = new(types.StatsJSON)
}
c.client.Stats(opts)
c.running = false
}()

go func() {
Expand Down Expand Up @@ -75,10 +86,10 @@ func (c *Docker) Stop() {
c.done <- true
}

func (c *Docker) ReadCPU(stats *api.Stats) {
func (c *Docker) ReadCPU(stats *types.StatsJSON) {
ncpus := uint8(len(stats.CPUStats.CPUUsage.PercpuUsage))
total := float64(stats.CPUStats.CPUUsage.TotalUsage)
system := float64(stats.CPUStats.SystemCPUUsage)
system := float64(stats.CPUStats.SystemUsage)

cpudiff := total - c.lastCpu
syscpudiff := system - c.lastSysCpu
Expand All @@ -90,13 +101,13 @@ func (c *Docker) ReadCPU(stats *api.Stats) {
c.Pids = int(stats.PidsStats.Current)
}

func (c *Docker) ReadMem(stats *api.Stats) {
c.MemUsage = int64(stats.MemoryStats.Usage - stats.MemoryStats.Stats.Cache)
func (c *Docker) ReadMem(stats *types.StatsJSON) {
c.MemUsage = int64(stats.MemoryStats.Usage - stats.MemoryStats.Stats["cache"])
c.MemLimit = int64(stats.MemoryStats.Limit)
c.MemPercent = percent(float64(c.MemUsage), float64(c.MemLimit))
}

func (c *Docker) ReadNet(stats *api.Stats) {
func (c *Docker) ReadNet(stats *types.StatsJSON) {
var rx, tx int64
for _, network := range stats.Networks {
rx += int64(network.RxBytes)
Expand All @@ -105,9 +116,9 @@ func (c *Docker) ReadNet(stats *api.Stats) {
c.NetRx, c.NetTx = rx, tx
}

func (c *Docker) ReadIO(stats *api.Stats) {
func (c *Docker) ReadIO(stats *types.StatsJSON) {
var read, write int64
for _, blk := range stats.BlkioStats.IOServiceBytesRecursive {
for _, blk := range stats.BlkioStats.IoServiceBytesRecursive {
if blk.Op == "Read" {
read += int64(blk.Value)
}
Expand Down
16 changes: 6 additions & 10 deletions connector/collector/docker_logs.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package collector

import (
"bufio"
"context"
"io"
"strings"
"github.com/docker/docker/client"
"time"

"github.com/bcicen/ctop/models"
api "github.com/fsouza/go-dockerclient"
)

type DockerLogs struct {
id string
client *api.Client
client *client.Client
done chan bool
}

func NewDockerLogs(id string, client *api.Client) *DockerLogs {
func NewDockerLogs(id string, client *client.Client) *DockerLogs {
return &DockerLogs{
id: id,
client: client,
Expand All @@ -26,9 +22,9 @@ func NewDockerLogs(id string, client *api.Client) *DockerLogs {
}

func (l *DockerLogs) Stream() chan models.Log {
r, w := io.Pipe()
//r, w := io.Pipe()
logCh := make(chan models.Log)
ctx, cancel := context.WithCancel(context.Background())
/*ctx, cancel := context.WithCancel(context.Background())
opts := api.LogsOptions{
Context: ctx,
Expand Down Expand Up @@ -65,7 +61,7 @@ func (l *DockerLogs) Stream() chan models.Log {
go func() {
<-l.done
cancel()
}()
}()*/

log.Infof("log reader started for container: %s", l.id)
return logCh
Expand Down
51 changes: 32 additions & 19 deletions connector/docker.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package connector

import (
"context"
"fmt"
"github.com/op/go-logging"
"strings"
"sync"

"github.com/bcicen/ctop/connector/collector"
"github.com/bcicen/ctop/connector/manager"
"github.com/bcicen/ctop/container"
api "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/op/go-logging"
"strings"
"sync"
"time"
)

func init() { enabled["docker"] = NewDocker }
Expand All @@ -29,7 +33,7 @@ type StatusUpdate struct {
}

type Docker struct {
client *api.Client
client *client.Client
containers map[string]*container.Container
needsRefresh chan string // container IDs requiring refresh
statuses chan StatusUpdate
Expand All @@ -39,7 +43,8 @@ type Docker struct {

func NewDocker() (Connector, error) {
// init docker client
client, err := api.NewClientFromEnv()
ctx := context.Background()
client, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
}
Expand All @@ -53,7 +58,7 @@ func NewDocker() (Connector, error) {
}

// query info as pre-flight healthcheck
info, err := client.Info()
info, err := client.Info(ctx)
if err != nil {
return nil, err
}
Expand All @@ -77,8 +82,10 @@ func (cm *Docker) Wait() struct{} { return <-cm.closed }
// Docker events watcher
func (cm *Docker) watchEvents() {
log.Info("docker event listener starting")
events := make(chan *api.APIEvents)
cm.client.AddEventListener(events)
ctx := context.Background()

eventsOpts := types.EventsOptions{}
events, _ := cm.client.Events(ctx, eventsOpts)

for e := range events {
if e.Type != "container" {
Expand Down Expand Up @@ -130,7 +137,7 @@ func (cm *Docker) watchEvents() {
close(cm.closed)
}

func portsFormat(ports map[api.Port][]api.PortBinding) string {
func portsFormat(ports nat.PortMap) string {
var exposed []string
var published []string

Expand All @@ -148,7 +155,7 @@ func portsFormat(ports map[api.Port][]api.PortBinding) string {
return strings.Join(append(exposed, published...), "\n")
}

func ipsFormat(networks map[string]api.ContainerNetwork) string {
func ipsFormat(networks map[string]*network.EndpointSettings) string {
var ips []string

for k, v := range networks {
Expand All @@ -173,18 +180,23 @@ func (cm *Docker) refresh(c *container.Container) {
c.SetMeta("image", insp.Config.Image)
c.SetMeta("IPs", ipsFormat(insp.NetworkSettings.Networks))
c.SetMeta("ports", portsFormat(insp.NetworkSettings.Ports))
c.SetMeta("created", insp.Created.Format("Mon Jan 2 15:04:05 2006"))
c.SetMeta("health", insp.State.Health.Status)
if created, err := time.Parse(time.RFC3339, insp.Created); err == nil {
c.SetMeta("created", created.Format("Mon Jan 2 15:04:05 2006"))
}
if insp.State.Health != nil {
c.SetMeta("health", insp.State.Health.Status)
}
for _, env := range insp.Config.Env {
c.SetMeta("[ENV-VAR]", env)
}
c.SetState(insp.State.Status)
}

func (cm *Docker) inspect(id string) (insp *api.Container, found bool, failed bool) {
c, err := cm.client.InspectContainer(id)
func (cm *Docker) inspect(id string) (insp types.ContainerJSON, found bool, error bool) {
ctx := context.Background()
c, err := cm.client.ContainerInspect(ctx, id)
if err != nil {
if _, notFound := err.(*api.NoSuchContainer); notFound {
if client.IsErrNotFound(err) {
return c, false, false
}
// other error e.g. connection failed
Expand All @@ -196,8 +208,9 @@ func (cm *Docker) inspect(id string) (insp *api.Container, found bool, failed bo

// Mark all container IDs for refresh
func (cm *Docker) refreshAll() {
opts := api.ListContainersOptions{All: true}
allContainers, err := cm.client.ListContainers(opts)
ctx := context.Background()
opts := types.ContainerListOptions{All: true}
allContainers, err := cm.client.ContainerList(ctx, opts)
if err != nil {
log.Errorf("%s (%T)", err.Error(), err)
return
Expand Down
Loading

0 comments on commit 87b6ba3

Please sign in to comment.