From f8bef1486095138bd3b8b4cfeb8d69e977b4c0ce Mon Sep 17 00:00:00 2001 From: prashanthjbabu Date: Wed, 19 Jun 2019 04:26:55 +0530 Subject: [PATCH] Add docker log plugin (#4773) --- plugins/inputs/all/all.go | 1 + plugins/inputs/docker_log/README.md | 60 +++ plugins/inputs/docker_log/client.go | 63 +++ plugins/inputs/docker_log/docker_logs.go | 472 +++++++++++++++++++++++ 4 files changed, 596 insertions(+) create mode 100644 plugins/inputs/docker_log/README.md create mode 100644 plugins/inputs/docker_log/client.go create mode 100644 plugins/inputs/docker_log/docker_logs.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index ef032fe47..487f92b1f 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -32,6 +32,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/dmcache" _ "github.com/influxdata/telegraf/plugins/inputs/dns_query" _ "github.com/influxdata/telegraf/plugins/inputs/docker" + _ "github.com/influxdata/telegraf/plugins/inputs/docker_log" _ "github.com/influxdata/telegraf/plugins/inputs/dovecot" _ "github.com/influxdata/telegraf/plugins/inputs/ecs" _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" diff --git a/plugins/inputs/docker_log/README.md b/plugins/inputs/docker_log/README.md new file mode 100644 index 000000000..d04adba33 --- /dev/null +++ b/plugins/inputs/docker_log/README.md @@ -0,0 +1,60 @@ +# Docker Log Input Plugin + +The docker log plugin uses the Docker Engine API to get logs on running +docker containers. + +The docker plugin uses the [Official Docker Client](https://github.com/moby/moby/tree/master/client) +to gather logs from the [Engine API](https://docs.docker.com/engine/api/v1.24/). +Note: This plugin works only for containers with the `local` or `json-file` or `journald` logging driver. +### Configuration: + +```toml +# Read metrics about docker containers +[[inputs.docker_log]] + ## Docker Endpoint + ## To use TCP, set endpoint = "tcp://[ip]:[port]" + ## To use environment variables (ie, docker-machine), set endpoint = "ENV" + endpoint = "unix:///var/run/docker.sock" + + ## Containers to include and exclude. Collect all if empty. Globs accepted. + container_name_include = [] + container_name_exclude = [] + + ## Container states to include and exclude. Globs accepted. + ## When empty only containers in the "running" state will be captured. + # container_state_include = [] + # container_state_exclude = [] + + ## docker labels to include and exclude as tags. Globs accepted. + ## Note that an empty array for both will include all labels as tags + docker_label_include = [] + docker_label_exclude = [] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` + +#### Environment Configuration + +When using the `"ENV"` endpoint, the connection is configured using the +[cli Docker environment variables](https://godoc.org/github.com/moby/moby/client#NewEnvClient). + +### Metrics: + +- docker_log + - tags: + - container_id + - container_name + - stream + - fields: + - message +### Example Output: + +``` +docker_log,com.docker.compose.config-hash=e19e13df8fd01ba2d7c1628158fca45cc91afbbe9661b2d30550547eb53a861e,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=influxdb,com.docker.compose.version=1.21.2,containerId=fce475bbfa4c8380ff858d5d767f78622ca6de955b525477624c2b7896a5b8e4,containerName=aicon-influxdb,host=prash-laptop,logType=stderr log=" [httpd] 172.23.0.2 - aicon_admin [13/Apr/2019:08:35:53 +0000] \"POST /query?db=&q=SHOW+SUBSCRIPTIONS HTTP/1.1\" 200 232 \"-\" \"KapacitorInfluxDBClient\" 2661bc9c-5dc7-11e9-82f8-0242ac170007 1360\n" 1555144553541000000 +docker_log,com.docker.compose.config-hash=fd91b3b096c7ab346971c681b88fe1357c609dcc6850e4ea5b1287ad28a57e5d,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=kapacitor,com.docker.compose.version=1.21.2,containerId=6514d1cf6d19e7ecfedf894941f0a2ea21b8aac5e6f48e64f19dbc9bb2805a25,containerName=aicon-kapacitor,host=prash-laptop,logType=stderr log=" ts=2019-04-13T08:36:00.019Z lvl=info msg=\"http request\" service=http host=172.23.0.7 username=- start=2019-04-13T08:36:00.013835165Z method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=2a3eb481-5dc7-11e9-825b-000000000000 duration=5.814404ms\n" 1555144560024000000 +``` diff --git a/plugins/inputs/docker_log/client.go b/plugins/inputs/docker_log/client.go new file mode 100644 index 000000000..7667c6e4d --- /dev/null +++ b/plugins/inputs/docker_log/client.go @@ -0,0 +1,63 @@ +package docker_log + +import ( + "context" + "crypto/tls" + "io" + "net/http" + + "github.com/docker/docker/api/types" + docker "github.com/docker/docker/client" +) + +/*This file is inherited from telegraf docker input plugin*/ +var ( + version = "1.24" + defaultHeaders = map[string]string{"User-Agent": "engine-api-cli-1.0"} +) + +type Client interface { + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + ContainerLogs(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) + ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) +} + +func NewEnvClient() (Client, error) { + client, err := docker.NewClientWithOpts(docker.FromEnv) + if err != nil { + return nil, err + } + return &SocketClient{client}, nil +} + +func NewClient(host string, tlsConfig *tls.Config) (Client, error) { + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + } + httpClient := &http.Client{Transport: transport} + client, err := docker.NewClientWithOpts( + docker.WithHTTPHeaders(defaultHeaders), + docker.WithHTTPClient(httpClient), + docker.WithVersion(version), + docker.WithHost(host)) + + if err != nil { + return nil, err + } + return &SocketClient{client}, nil +} + +type SocketClient struct { + client *docker.Client +} + +func (c *SocketClient) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return c.client.ContainerList(ctx, options) +} + +func (c *SocketClient) ContainerLogs(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + return c.client.ContainerLogs(ctx, containerID, options) +} +func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return c.client.ContainerInspect(ctx, containerID) +} diff --git a/plugins/inputs/docker_log/docker_logs.go b/plugins/inputs/docker_log/docker_logs.go new file mode 100644 index 000000000..813b868ee --- /dev/null +++ b/plugins/inputs/docker_log/docker_logs.go @@ -0,0 +1,472 @@ +package docker_log + +import ( + "context" + "crypto/tls" + "encoding/binary" + "errors" + "fmt" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + tlsint "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "io" + "strings" + "sync" + "time" +) + +type StdType byte + +const ( + Stdin StdType = iota + Stdout + Stderr + Systemerr + + stdWriterPrefixLen = 8 + stdWriterFdIndex = 0 + stdWriterSizeIndex = 4 + + startingBufLen = 32*1024 + stdWriterPrefixLen + 1 + + ERR_PREFIX = "E! [inputs.docker_log]" + defaultEndpoint = "unix:///var/run/docker.sock" + logBytesMax = 1000 +) + +type DockerLogs struct { + Endpoint string + + Timeout internal.Duration + + LabelInclude []string `toml:"docker_label_include"` + LabelExclude []string `toml:"docker_label_exclude"` + + ContainerInclude []string `toml:"container_name_include"` + ContainerExclude []string `toml:"container_name_exclude"` + + ContainerStateInclude []string `toml:"container_state_include"` + ContainerStateExclude []string `toml:"container_state_exclude"` + + tlsint.ClientConfig + + newEnvClient func() (Client, error) + newClient func(string, *tls.Config) (Client, error) + + client Client + filtersCreated bool + labelFilter filter.Filter + containerFilter filter.Filter + stateFilter filter.Filter + opts types.ContainerListOptions + wg sync.WaitGroup + mu sync.Mutex + containerList map[string]io.ReadCloser +} + +var ( + containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} +) + +var sampleConfig = ` + ## Docker Endpoint + ## To use TCP, set endpoint = "tcp://[ip]:[port]" + ## To use environment variables (ie, docker-machine), set endpoint = "ENV" + endpoint = "unix:///var/run/docker.sock" + ## Containers to include and exclude. Globs accepted. + ## Note that an empty array for both will include all containers + container_name_include = [] + container_name_exclude = [] + ## Container states to include and exclude. Globs accepted. + ## When empty only containers in the "running" state will be captured. + # container_state_include = [] + # container_state_exclude = [] + + ## docker labels to include and exclude as tags. Globs accepted. + ## Note that an empty array for both will include all labels as tags + docker_label_include = [] + docker_label_exclude = [] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +func (d *DockerLogs) Description() string { + return "Plugin to get docker logs" +} + +func (d *DockerLogs) SampleConfig() string { + return sampleConfig +} + +func (d *DockerLogs) Gather(acc telegraf.Accumulator) error { + /*Check to see if any new containers have been created since last time*/ + return d.containerListUpdate(acc) +} + +/*Following few functions have been inherited from telegraf docker input plugin*/ +func (d *DockerLogs) createContainerFilters() error { + filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) + if err != nil { + return err + } + d.containerFilter = filter + return nil +} + +func (d *DockerLogs) createLabelFilters() error { + filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude) + if err != nil { + return err + } + d.labelFilter = filter + return nil +} + +func (d *DockerLogs) createContainerStateFilters() error { + if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 { + d.ContainerStateInclude = []string{"running"} + } + filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude) + if err != nil { + return err + } + d.stateFilter = filter + return nil +} + +func (d *DockerLogs) addToContainerList(containerId string, logReader io.ReadCloser) error { + d.mu.Lock() + defer d.mu.Unlock() + d.containerList[containerId] = logReader + return nil +} + +func (d *DockerLogs) removeFromContainerList(containerId string) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.containerList, containerId) + return nil +} + +func (d *DockerLogs) containerInContainerList(containerId string) bool { + if _, ok := d.containerList[containerId]; ok { + return true + } + return false +} + +func (d *DockerLogs) stopAllReaders() error { + for _, container := range d.containerList { + container.Close() + } + return nil +} + +func (d *DockerLogs) containerListUpdate(acc telegraf.Accumulator) error { + ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration) + defer cancel() + if d.client == nil { + return errors.New(fmt.Sprintf("%s : Dock client is null", ERR_PREFIX)) + } + containers, err := d.client.ContainerList(ctx, d.opts) + if err != nil { + return err + } + for _, container := range containers { + if d.containerInContainerList(container.ID) { + continue + } + d.wg.Add(1) + /*Start a new goroutine for every new container that has logs to collect*/ + go func(c types.Container) { + defer d.wg.Done() + logOptions := types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Timestamps: false, + Details: true, + Follow: true, + Tail: "0", + } + logReader, err := d.client.ContainerLogs(context.Background(), c.ID, logOptions) + if err != nil { + acc.AddError(err) + return + } + d.addToContainerList(c.ID, logReader) + err = d.tailContainerLogs(c, logReader, acc) + if err != nil { + acc.AddError(err) + } + d.removeFromContainerList(c.ID) + return + }(container) + } + return nil +} + +func (d *DockerLogs) tailContainerLogs( + container types.Container, logReader io.ReadCloser, + acc telegraf.Accumulator, +) error { + c, err := d.client.ContainerInspect(context.Background(), container.ID) + if err != nil { + return err + } + /* Parse container name */ + var cname string + for _, name := range container.Names { + trimmedName := strings.TrimPrefix(name, "/") + match := d.containerFilter.Match(trimmedName) + if match { + cname = trimmedName + break + } + } + + if cname == "" { + return errors.New(fmt.Sprintf("%s : container name is null", ERR_PREFIX)) + } + imageName, imageVersion := parseImage(container.Image) + tags := map[string]string{ + "container_name": cname, + "container_image": imageName, + "container_version": imageVersion, + } + fields := map[string]interface{}{} + fields["container_id"] = container.ID + // Add labels to tags + for k, label := range container.Labels { + if d.labelFilter.Match(k) { + tags[k] = label + } + } + if c.Config.Tty { + err = pushTtyLogs(acc, tags, fields, logReader) + } else { + _, err = pushLogs(acc, tags, fields, logReader) + } + if err != nil { + return err + } + return nil +} +func pushTtyLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (err error) { + tags["logType"] = "unknown" //in tty mode we wont be able to differentiate b/w stdout and stderr hence unknown + data := make([]byte, logBytesMax) + for { + num, err := src.Read(data) + if num > 0 { + fields["message"] = data[1:num] + acc.AddFields("docker_log", fields, tags) + } + if err == io.EOF { + fields["message"] = data[1:num] + acc.AddFields("docker_log", fields, tags) + return nil + } + if err != nil { + return err + } + } +} + +/* Inspired from https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go */ +func pushLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (written int64, err error) { + var ( + buf = make([]byte, startingBufLen) + bufLen = len(buf) + nr int + er error + frameSize int + ) + for { + // Make sure we have at least a full header + for nr < stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + stream := StdType(buf[stdWriterFdIndex]) + // Check the first byte to know where to write + var logType string + switch stream { + case Stdin: + logType = "stdin" + break + case Stdout: + logType = "stdout" + break + case Stderr: + logType = "stderr" + break + case Systemerr: + fallthrough + default: + return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) + } + // Retrieve the size of the frame + frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) + + // Check if the buffer is big enough to read the frame. + // Extend it if necessary. + if frameSize+stdWriterPrefixLen > bufLen { + buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) + bufLen = len(buf) + } + + // While the amount of bytes read is less than the size of the frame + header, we keep reading + for nr < frameSize+stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < frameSize+stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + // we might have an error from the source mixed up in our multiplexed + // stream. if we do, return it. + if stream == Systemerr { + return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) + } + + tags["stream"] = logType + fields["message"] = buf[stdWriterPrefixLen+1 : frameSize+stdWriterPrefixLen] + acc.AddFields("docker_log", fields, tags) + written += int64(frameSize) + + // Move the rest of the buffer to the beginning + copy(buf, buf[frameSize+stdWriterPrefixLen:]) + // Move the index + nr -= frameSize + stdWriterPrefixLen + } +} + +func (d *DockerLogs) Start(acc telegraf.Accumulator) error { + var c Client + var err error + if d.Endpoint == "ENV" { + c, err = d.newEnvClient() + } else { + tlsConfig, err := d.ClientConfig.TLSConfig() + if err != nil { + return err + } + c, err = d.newClient(d.Endpoint, tlsConfig) + } + if err != nil { + return err + } + d.client = c + // Create label filters if not already created + if !d.filtersCreated { + err := d.createLabelFilters() + if err != nil { + return err + } + err = d.createContainerFilters() + if err != nil { + return err + } + err = d.createContainerStateFilters() + if err != nil { + return err + } + d.filtersCreated = true + } + filterArgs := filters.NewArgs() + for _, state := range containerStates { + if d.stateFilter.Match(state) { + filterArgs.Add("status", state) + } + } + + // All container states were excluded + if filterArgs.Len() == 0 { + return nil + } + + d.opts = types.ContainerListOptions{ + Filters: filterArgs, + } + return nil +} + +/* Inspired from https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/docker.go */ +func parseImage(image string) (string, string) { + // Adapts some of the logic from the actual Docker library's image parsing + // routines: + // https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go + domain := "" + remainder := "" + + i := strings.IndexRune(image, '/') + + if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") { + remainder = image + } else { + domain, remainder = image[:i], image[i+1:] + } + + imageName := "" + imageVersion := "unknown" + + i = strings.LastIndex(remainder, ":") + if i > -1 { + imageVersion = remainder[i+1:] + imageName = remainder[:i] + } else { + imageName = remainder + } + + if domain != "" { + imageName = domain + "/" + imageName + } + + return imageName, imageVersion +} + +func (d *DockerLogs) Stop() { + d.mu.Lock() + d.stopAllReaders() + d.mu.Unlock() + d.wg.Wait() +} + +func init() { + inputs.Add("docker_log", func() telegraf.Input { + return &DockerLogs{ + Timeout: internal.Duration{Duration: time.Second * 5}, + Endpoint: defaultEndpoint, + newEnvClient: NewEnvClient, + newClient: NewClient, + filtersCreated: false, + containerList: make(map[string]io.ReadCloser), + } + }) +}