diff --git a/Gopkg.lock b/Gopkg.lock index 8f3de4211..bcdf6cd07 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -326,6 +326,7 @@ "api/types/versions", "api/types/volume", "client", + "pkg/stdcopy", ] pruneopts = "" revision = "ed7b6428c133e7c59404251a09b7d6b02fa83cc2" @@ -1591,6 +1592,7 @@ "github.com/docker/docker/api/types/registry", "github.com/docker/docker/api/types/swarm", "github.com/docker/docker/client", + "github.com/docker/docker/pkg/stdcopy", "github.com/docker/libnetwork/ipvs", "github.com/eclipse/paho.mqtt.golang", "github.com/ericchiang/k8s", diff --git a/internal/docker/docker.go b/internal/docker/docker.go new file mode 100644 index 000000000..1808944ae --- /dev/null +++ b/internal/docker/docker.go @@ -0,0 +1,36 @@ +package docker + +import "strings" + +// Adapts some of the logic from the actual Docker library's image parsing +// routines: +// https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go +func ParseImage(image string) (string, string) { + domain := "" + remainder := "" + + i := strings.IndexRune(image, '/') + + if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") { + remainder = image + } else { + domain, remainder = image[:i], image[i+1:] + } + + imageName := "" + imageVersion := "unknown" + + i = strings.LastIndex(remainder, ":") + if i > -1 { + imageVersion = remainder[i+1:] + imageName = remainder[:i] + } else { + imageName = remainder + } + + if domain != "" { + imageName = domain + "/" + imageName + } + + return imageName, imageVersion +} diff --git a/internal/docker/docker_test.go b/internal/docker/docker_test.go new file mode 100644 index 000000000..14591ab87 --- /dev/null +++ b/internal/docker/docker_test.go @@ -0,0 +1,59 @@ +package docker_test + +import ( + "testing" + + "github.com/influxdata/telegraf/internal/docker" + "github.com/stretchr/testify/require" +) + +func TestParseImage(t *testing.T) { + tests := []struct { + image string + parsedName string + parsedVersion string + }{ + { + image: "postgres", + parsedName: "postgres", + parsedVersion: "unknown", + }, + { + image: "postgres:latest", + parsedName: "postgres", + parsedVersion: "latest", + }, + { + image: "coreos/etcd", + parsedName: "coreos/etcd", + parsedVersion: "unknown", + }, + { + image: "coreos/etcd:latest", + parsedName: "coreos/etcd", + parsedVersion: "latest", + }, + { + image: "quay.io/postgres", + parsedName: "quay.io/postgres", + parsedVersion: "unknown", + }, + { + image: "quay.io:4443/coreos/etcd", + parsedName: "quay.io:4443/coreos/etcd", + parsedVersion: "unknown", + }, + { + image: "quay.io:4443/coreos/etcd:latest", + parsedName: "quay.io:4443/coreos/etcd", + parsedVersion: "latest", + }, + } + for _, tt := range tests { + t.Run("parse name "+tt.image, func(t *testing.T) { + imageName, imageVersion := docker.ParseImage(tt.image) + require.Equal(t, tt.parsedName, imageName) + require.Equal(t, tt.parsedVersion, imageVersion) + }) + } +} diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index bc47583b7..c57ed5c48 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/docker" tlsint "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -361,44 +362,12 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { return nil } -func parseImage(image string) (string, string) { - // Adapts some of the logic from the actual Docker library's image parsing - // routines: - // https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go - domain := "" - remainder := "" - - i := strings.IndexRune(image, '/') - - if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") { - remainder = image - } else { - domain, remainder = image[:i], image[i+1:] - } - - imageName := "" - imageVersion := "unknown" - - i = strings.LastIndex(remainder, ":") - if i > -1 { - imageVersion = remainder[i+1:] - imageName = remainder[:i] - } else { - imageName = remainder - } - - if domain != "" { - imageName = domain + "/" + imageName - } - - return imageName, imageVersion -} - func (d *Docker) gatherContainer( container types.Container, acc telegraf.Accumulator, ) error { var v *types.StatsJSON + // Parse container name var cname string for _, name := range container.Names { @@ -414,7 +383,7 @@ func (d *Docker) gatherContainer( return nil } - imageName, imageVersion := parseImage(container.Image) + imageName, imageVersion := docker.ParseImage(container.Image) tags := map[string]string{ "engine_host": d.engineHost, diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index e1a425314..376d3ed0c 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -9,10 +9,9 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" - "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/swarm" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -945,54 +944,3 @@ func TestContainerName(t *testing.T) { }) } } - -func TestParseImage(t *testing.T) { - tests := []struct { - image string - parsedName string - parsedVersion string - }{ - { - image: "postgres", - parsedName: "postgres", - parsedVersion: "unknown", - }, - { - image: "postgres:latest", - parsedName: "postgres", - parsedVersion: "latest", - }, - { - image: "coreos/etcd", - parsedName: "coreos/etcd", - parsedVersion: "unknown", - }, - { - image: "coreos/etcd:latest", - parsedName: "coreos/etcd", - parsedVersion: "latest", - }, - { - image: "quay.io/postgres", - parsedName: "quay.io/postgres", - parsedVersion: "unknown", - }, - { - image: "quay.io:4443/coreos/etcd", - parsedName: "quay.io:4443/coreos/etcd", - parsedVersion: "unknown", - }, - { - image: "quay.io:4443/coreos/etcd:latest", - parsedName: "quay.io:4443/coreos/etcd", - parsedVersion: "latest", - }, - } - for _, tt := range tests { - t.Run("parse name "+tt.image, func(t *testing.T) { - imageName, imageVersion := parseImage(tt.image) - require.Equal(t, tt.parsedName, imageName) - require.Equal(t, tt.parsedVersion, imageVersion) - }) - } -} diff --git a/plugins/inputs/docker_log/README.md b/plugins/inputs/docker_log/README.md index d04adba33..02f44e14c 100644 --- a/plugins/inputs/docker_log/README.md +++ b/plugins/inputs/docker_log/README.md @@ -3,22 +3,35 @@ The docker log plugin uses the Docker Engine API to get logs on running docker containers. -The docker plugin uses the [Official Docker Client](https://github.com/moby/moby/tree/master/client) -to gather logs from the [Engine API](https://docs.docker.com/engine/api/v1.24/). -Note: This plugin works only for containers with the `local` or `json-file` or `journald` logging driver. -### Configuration: +The docker plugin uses the [Official Docker Client][] to gather logs from the +[Engine API][]. + +**Note:** This plugin works only for containers with the `local` or +`json-file` or `journald` logging driver. + +[Official Docker Client]: https://github.com/moby/moby/tree/master/client +[Engine API]: https://docs.docker.com/engine/api/v1.24/ + +### Configuration ```toml -# Read metrics about docker containers [[inputs.docker_log]] ## Docker Endpoint ## To use TCP, set endpoint = "tcp://[ip]:[port]" ## To use environment variables (ie, docker-machine), set endpoint = "ENV" - endpoint = "unix:///var/run/docker.sock" + # endpoint = "unix:///var/run/docker.sock" - ## Containers to include and exclude. Collect all if empty. Globs accepted. - container_name_include = [] - container_name_exclude = [] + ## When true, container logs are read from the beginning; otherwise + ## reading begins at the end of the log. + # from_beginning = false + + ## Timeout for Docker API calls. + # timeout = "5s" + + ## Containers to include and exclude. Globs accepted. + ## Note that an empty array for both will include all containers + # container_name_include = [] + # container_name_exclude = [] ## Container states to include and exclude. Globs accepted. ## When empty only containers in the "running" state will be captured. @@ -27,8 +40,8 @@ Note: This plugin works only for containers with the `local` or `json-file` or ## docker labels to include and exclude as tags. Globs accepted. ## Note that an empty array for both will include all labels as tags - docker_label_include = [] - docker_label_exclude = [] + # docker_label_include = [] + # docker_label_exclude = [] ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" @@ -41,20 +54,31 @@ Note: This plugin works only for containers with the `local` or `json-file` or #### Environment Configuration When using the `"ENV"` endpoint, the connection is configured using the -[cli Docker environment variables](https://godoc.org/github.com/moby/moby/client#NewEnvClient). +[CLI Docker environment variables][env] -### Metrics: +[env]: https://godoc.org/github.com/moby/moby/client#NewEnvClient + +### Metrics - docker_log - tags: - - container_id + - container_image + - container_version - container_name - - stream + - stream (stdout, stderr, or tty) - fields: + - container_id - message -### Example Output: + +### Example Output ``` -docker_log,com.docker.compose.config-hash=e19e13df8fd01ba2d7c1628158fca45cc91afbbe9661b2d30550547eb53a861e,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=influxdb,com.docker.compose.version=1.21.2,containerId=fce475bbfa4c8380ff858d5d767f78622ca6de955b525477624c2b7896a5b8e4,containerName=aicon-influxdb,host=prash-laptop,logType=stderr log=" [httpd] 172.23.0.2 - aicon_admin [13/Apr/2019:08:35:53 +0000] \"POST /query?db=&q=SHOW+SUBSCRIPTIONS HTTP/1.1\" 200 232 \"-\" \"KapacitorInfluxDBClient\" 2661bc9c-5dc7-11e9-82f8-0242ac170007 1360\n" 1555144553541000000 -docker_log,com.docker.compose.config-hash=fd91b3b096c7ab346971c681b88fe1357c609dcc6850e4ea5b1287ad28a57e5d,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=kapacitor,com.docker.compose.version=1.21.2,containerId=6514d1cf6d19e7ecfedf894941f0a2ea21b8aac5e6f48e64f19dbc9bb2805a25,containerName=aicon-kapacitor,host=prash-laptop,logType=stderr log=" ts=2019-04-13T08:36:00.019Z lvl=info msg=\"http request\" service=http host=172.23.0.7 username=- start=2019-04-13T08:36:00.013835165Z method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=2a3eb481-5dc7-11e9-825b-000000000000 duration=5.814404ms\n" 1555144560024000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:\"371ee5d3e587\", Flush Interval:10s" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Tags enabled: host=371ee5d3e587" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded outputs: file" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded processors:" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded aggregators:" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded inputs: net" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Using config file: /etc/telegraf/telegraf.conf" 1560913872000000000 +docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Starting Telegraf 1.10.4" 1560913872000000000 ``` diff --git a/plugins/inputs/docker_log/docker_log.go b/plugins/inputs/docker_log/docker_log.go new file mode 100644 index 000000000..01a2f83da --- /dev/null +++ b/plugins/inputs/docker_log/docker_log.go @@ -0,0 +1,429 @@ +package docker_log + +import ( + "bufio" + "context" + "crypto/tls" + "io" + "strings" + "sync" + "time" + "unicode" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/pkg/stdcopy" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/docker" + tlsint "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +var sampleConfig = ` + ## Docker Endpoint + ## To use TCP, set endpoint = "tcp://[ip]:[port]" + ## To use environment variables (ie, docker-machine), set endpoint = "ENV" + # endpoint = "unix:///var/run/docker.sock" + + ## When true, container logs are read from the beginning; otherwise + ## reading begins at the end of the log. + # from_beginning = false + + ## Timeout for Docker API calls. + # timeout = "5s" + + ## Containers to include and exclude. Globs accepted. + ## Note that an empty array for both will include all containers + # container_name_include = [] + # container_name_exclude = [] + + ## Container states to include and exclude. Globs accepted. + ## When empty only containers in the "running" state will be captured. + # container_state_include = [] + # container_state_exclude = [] + + ## docker labels to include and exclude as tags. Globs accepted. + ## Note that an empty array for both will include all labels as tags + # docker_label_include = [] + # docker_label_exclude = [] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +const ( + defaultEndpoint = "unix:///var/run/docker.sock" + + // Maximum bytes of a log line before it will be split, size is mirroring + // docker code: + // https://github.com/moby/moby/blob/master/daemon/logger/copier.go#L21 + maxLineBytes = 16 * 1024 +) + +var ( + containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} +) + +type DockerLogs struct { + Endpoint string `toml:"endpoint"` + FromBeginning bool `toml:"from_beginning"` + Timeout internal.Duration `toml:"timeout"` + LabelInclude []string `toml:"docker_label_include"` + LabelExclude []string `toml:"docker_label_exclude"` + ContainerInclude []string `toml:"container_name_include"` + ContainerExclude []string `toml:"container_name_exclude"` + ContainerStateInclude []string `toml:"container_state_include"` + ContainerStateExclude []string `toml:"container_state_exclude"` + + tlsint.ClientConfig + + newEnvClient func() (Client, error) + newClient func(string, *tls.Config) (Client, error) + + client Client + labelFilter filter.Filter + containerFilter filter.Filter + stateFilter filter.Filter + opts types.ContainerListOptions + wg sync.WaitGroup + mu sync.Mutex + containerList map[string]context.CancelFunc +} + +func (d *DockerLogs) Description() string { + return "Read logging output from the Docker engine" +} + +func (d *DockerLogs) SampleConfig() string { + return sampleConfig +} + +func (d *DockerLogs) Init() error { + var err error + if d.Endpoint == "ENV" { + d.client, err = d.newEnvClient() + if err != nil { + return err + } + } else { + tlsConfig, err := d.ClientConfig.TLSConfig() + if err != nil { + return err + } + d.client, err = d.newClient(d.Endpoint, tlsConfig) + if err != nil { + return err + } + } + + // Create filters + err = d.createLabelFilters() + if err != nil { + return err + } + err = d.createContainerFilters() + if err != nil { + return err + } + err = d.createContainerStateFilters() + if err != nil { + return err + } + + filterArgs := filters.NewArgs() + for _, state := range containerStates { + if d.stateFilter.Match(state) { + filterArgs.Add("status", state) + } + } + + if filterArgs.Len() != 0 { + d.opts = types.ContainerListOptions{ + Filters: filterArgs, + } + } + + return nil +} + +func (d *DockerLogs) addToContainerList(containerID string, cancel context.CancelFunc) error { + d.mu.Lock() + defer d.mu.Unlock() + d.containerList[containerID] = cancel + return nil +} + +func (d *DockerLogs) removeFromContainerList(containerID string) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.containerList, containerID) + return nil +} + +func (d *DockerLogs) containerInContainerList(containerID string) bool { + d.mu.Lock() + defer d.mu.Unlock() + _, ok := d.containerList[containerID] + return ok +} + +func (d *DockerLogs) cancelTails() error { + d.mu.Lock() + defer d.mu.Unlock() + for _, cancel := range d.containerList { + cancel() + } + return nil +} + +func (d *DockerLogs) matchedContainerName(names []string) string { + // Check if all container names are filtered; in practice I believe + // this array is always of length 1. + for _, name := range names { + trimmedName := strings.TrimPrefix(name, "/") + match := d.containerFilter.Match(trimmedName) + if match { + return trimmedName + } + } + return "" +} + +func (d *DockerLogs) Gather(acc telegraf.Accumulator) error { + ctx := context.Background() + + ctx, cancel := context.WithTimeout(ctx, d.Timeout.Duration) + defer cancel() + containers, err := d.client.ContainerList(ctx, d.opts) + if err != nil { + return err + } + + for _, container := range containers { + if d.containerInContainerList(container.ID) { + continue + } + + containerName := d.matchedContainerName(container.Names) + if containerName == "" { + continue + } + + ctx, cancel := context.WithCancel(context.Background()) + d.addToContainerList(container.ID, cancel) + + // Start a new goroutine for every new container that has logs to collect + d.wg.Add(1) + go func(container types.Container) { + defer d.wg.Done() + defer d.removeFromContainerList(container.ID) + + err = d.tailContainerLogs(ctx, acc, container, containerName) + if err != nil { + acc.AddError(err) + } + }(container) + } + return nil +} + +func (d *DockerLogs) hasTTY(ctx context.Context, container types.Container) (bool, error) { + ctx, cancel := context.WithTimeout(ctx, d.Timeout.Duration) + defer cancel() + c, err := d.client.ContainerInspect(ctx, container.ID) + if err != nil { + return false, err + } + return c.Config.Tty, nil +} + +func (d *DockerLogs) tailContainerLogs( + ctx context.Context, + acc telegraf.Accumulator, + container types.Container, + containerName string, +) error { + imageName, imageVersion := docker.ParseImage(container.Image) + tags := map[string]string{ + "container_name": containerName, + "container_image": imageName, + "container_version": imageVersion, + } + + // Add matching container labels as tags + for k, label := range container.Labels { + if d.labelFilter.Match(k) { + tags[k] = label + } + } + + hasTTY, err := d.hasTTY(ctx, container) + if err != nil { + return err + } + + tail := "0" + if d.FromBeginning { + tail = "all" + } + + logOptions := types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Timestamps: false, + Details: false, + Follow: true, + Tail: tail, + } + + logReader, err := d.client.ContainerLogs(ctx, container.ID, logOptions) + if err != nil { + return err + } + + // If the container is using a TTY, there is only a single stream + // (stdout), and data is copied directly from the container output stream, + // no extra multiplexing or headers. + // + // If the container is *not* using a TTY, streams for stdout and stderr are + // multiplexed. + if hasTTY { + return tailStream(acc, tags, container.ID, logReader, "tty") + } else { + return tailMultiplexed(acc, tags, container.ID, logReader) + } +} + +func tailStream( + acc telegraf.Accumulator, + baseTags map[string]string, + containerID string, + reader io.ReadCloser, + stream string, +) error { + defer reader.Close() + + tags := make(map[string]string, len(baseTags)+1) + for k, v := range baseTags { + tags[k] = v + } + tags["stream"] = stream + + r := bufio.NewReaderSize(reader, 64*1024) + + var err error + var message string + for { + message, err = r.ReadString('\n') + + // Keep any leading space, but remove whitespace from end of line. + // This preserves space in, for example, stacktraces, while removing + // annoying end of line characters and is similar to how other logging + // plugins such as syslog behave. + message = strings.TrimRightFunc(message, unicode.IsSpace) + + if len(message) != 0 { + acc.AddFields("docker_log", map[string]interface{}{ + "container_id": containerID, + "message": message, + }, tags) + } + + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } +} + +func tailMultiplexed( + acc telegraf.Accumulator, + tags map[string]string, + containerID string, + src io.ReadCloser, +) error { + outReader, outWriter := io.Pipe() + errReader, errWriter := io.Pipe() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := tailStream(acc, tags, containerID, outReader, "stdout") + if err != nil { + acc.AddError(err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := tailStream(acc, tags, containerID, errReader, "stderr") + if err != nil { + acc.AddError(err) + } + }() + + _, err := stdcopy.StdCopy(outWriter, errWriter, src) + outWriter.Close() + errWriter.Close() + src.Close() + wg.Wait() + return err +} + +func (d *DockerLogs) Stop() { + d.cancelTails() + d.wg.Wait() +} + +// Following few functions have been inherited from telegraf docker input plugin +func (d *DockerLogs) createContainerFilters() error { + filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) + if err != nil { + return err + } + d.containerFilter = filter + return nil +} + +func (d *DockerLogs) createLabelFilters() error { + filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude) + if err != nil { + return err + } + d.labelFilter = filter + return nil +} + +func (d *DockerLogs) createContainerStateFilters() error { + if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 { + d.ContainerStateInclude = []string{"running"} + } + filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude) + if err != nil { + return err + } + d.stateFilter = filter + return nil +} + +func init() { + inputs.Add("docker_log", func() telegraf.Input { + return &DockerLogs{ + Timeout: internal.Duration{Duration: time.Second * 5}, + Endpoint: defaultEndpoint, + newEnvClient: NewEnvClient, + newClient: NewClient, + containerList: make(map[string]context.CancelFunc), + } + }) +} diff --git a/plugins/inputs/docker_log/docker_log_test.go b/plugins/inputs/docker_log/docker_log_test.go new file mode 100644 index 000000000..ce61f6135 --- /dev/null +++ b/plugins/inputs/docker_log/docker_log_test.go @@ -0,0 +1,175 @@ +package docker_log + +import ( + "bytes" + "context" + "crypto/tls" + "io" + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/pkg/stdcopy" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +type MockClient struct { + ContainerListF func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + ContainerInspectF func(ctx context.Context, containerID string) (types.ContainerJSON, error) + ContainerLogsF func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) +} + +func (c *MockClient) ContainerList( + ctx context.Context, + options types.ContainerListOptions, +) ([]types.Container, error) { + return c.ContainerListF(ctx, options) +} + +func (c *MockClient) ContainerInspect( + ctx context.Context, + containerID string, +) (types.ContainerJSON, error) { + return c.ContainerInspectF(ctx, containerID) +} + +func (c *MockClient) ContainerLogs( + ctx context.Context, + containerID string, + options types.ContainerLogsOptions, +) (io.ReadCloser, error) { + return c.ContainerLogsF(ctx, containerID, options) +} + +type Response struct { + io.Reader +} + +func (r *Response) Close() error { + return nil +} + +func Test(t *testing.T) { + tests := []struct { + name string + client *MockClient + expected []telegraf.Metric + }{ + { + name: "no containers", + client: &MockClient{ + ContainerListF: func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return nil, nil + }, + }, + }, + { + name: "one container tty", + client: &MockClient{ + ContainerListF: func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return []types.Container{ + { + ID: "deadbeef", + Names: []string{"/telegraf"}, + Image: "influxdata/telegraf:1.11.0", + }, + }, nil + }, + ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + Config: &container.Config{ + Tty: true, + }, + }, nil + }, + ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + return &Response{Reader: bytes.NewBuffer([]byte("hello\n"))}, nil + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "hello", + }, + time.Now(), + ), + }, + }, + { + name: "one container multiplex", + client: &MockClient{ + ContainerListF: func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return []types.Container{ + { + ID: "deadbeef", + Names: []string{"/telegraf"}, + Image: "influxdata/telegraf:1.11.0", + }, + }, nil + }, + ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + Config: &container.Config{ + Tty: false, + }, + }, nil + }, + ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + var buf bytes.Buffer + w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout) + w.Write([]byte("hello from stdout")) + return &Response{Reader: &buf}, nil + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "stdout", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "hello from stdout", + }, + time.Now(), + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + plugin := &DockerLogs{ + Timeout: internal.Duration{Duration: time.Second * 5}, + newClient: func(string, *tls.Config) (Client, error) { return tt.client, nil }, + containerList: make(map[string]context.CancelFunc), + } + + err := plugin.Init() + require.NoError(t, err) + + err = plugin.Gather(&acc) + require.NoError(t, err) + + acc.Wait(len(tt.expected)) + plugin.Stop() + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + }) + } +} diff --git a/plugins/inputs/docker_log/docker_logs.go b/plugins/inputs/docker_log/docker_logs.go deleted file mode 100644 index 813b868ee..000000000 --- a/plugins/inputs/docker_log/docker_logs.go +++ /dev/null @@ -1,472 +0,0 @@ -package docker_log - -import ( - "context" - "crypto/tls" - "encoding/binary" - "errors" - "fmt" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/filters" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/filter" - "github.com/influxdata/telegraf/internal" - tlsint "github.com/influxdata/telegraf/internal/tls" - "github.com/influxdata/telegraf/plugins/inputs" - "io" - "strings" - "sync" - "time" -) - -type StdType byte - -const ( - Stdin StdType = iota - Stdout - Stderr - Systemerr - - stdWriterPrefixLen = 8 - stdWriterFdIndex = 0 - stdWriterSizeIndex = 4 - - startingBufLen = 32*1024 + stdWriterPrefixLen + 1 - - ERR_PREFIX = "E! [inputs.docker_log]" - defaultEndpoint = "unix:///var/run/docker.sock" - logBytesMax = 1000 -) - -type DockerLogs struct { - Endpoint string - - Timeout internal.Duration - - LabelInclude []string `toml:"docker_label_include"` - LabelExclude []string `toml:"docker_label_exclude"` - - ContainerInclude []string `toml:"container_name_include"` - ContainerExclude []string `toml:"container_name_exclude"` - - ContainerStateInclude []string `toml:"container_state_include"` - ContainerStateExclude []string `toml:"container_state_exclude"` - - tlsint.ClientConfig - - newEnvClient func() (Client, error) - newClient func(string, *tls.Config) (Client, error) - - client Client - filtersCreated bool - labelFilter filter.Filter - containerFilter filter.Filter - stateFilter filter.Filter - opts types.ContainerListOptions - wg sync.WaitGroup - mu sync.Mutex - containerList map[string]io.ReadCloser -} - -var ( - containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} -) - -var sampleConfig = ` - ## Docker Endpoint - ## To use TCP, set endpoint = "tcp://[ip]:[port]" - ## To use environment variables (ie, docker-machine), set endpoint = "ENV" - endpoint = "unix:///var/run/docker.sock" - ## Containers to include and exclude. Globs accepted. - ## Note that an empty array for both will include all containers - container_name_include = [] - container_name_exclude = [] - ## Container states to include and exclude. Globs accepted. - ## When empty only containers in the "running" state will be captured. - # container_state_include = [] - # container_state_exclude = [] - - ## docker labels to include and exclude as tags. Globs accepted. - ## Note that an empty array for both will include all labels as tags - docker_label_include = [] - docker_label_exclude = [] - - ## Optional TLS Config - # tls_ca = "/etc/telegraf/ca.pem" - # tls_cert = "/etc/telegraf/cert.pem" - # tls_key = "/etc/telegraf/key.pem" - ## Use TLS but skip chain & host verification - # insecure_skip_verify = false -` - -func (d *DockerLogs) Description() string { - return "Plugin to get docker logs" -} - -func (d *DockerLogs) SampleConfig() string { - return sampleConfig -} - -func (d *DockerLogs) Gather(acc telegraf.Accumulator) error { - /*Check to see if any new containers have been created since last time*/ - return d.containerListUpdate(acc) -} - -/*Following few functions have been inherited from telegraf docker input plugin*/ -func (d *DockerLogs) createContainerFilters() error { - filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) - if err != nil { - return err - } - d.containerFilter = filter - return nil -} - -func (d *DockerLogs) createLabelFilters() error { - filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude) - if err != nil { - return err - } - d.labelFilter = filter - return nil -} - -func (d *DockerLogs) createContainerStateFilters() error { - if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 { - d.ContainerStateInclude = []string{"running"} - } - filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude) - if err != nil { - return err - } - d.stateFilter = filter - return nil -} - -func (d *DockerLogs) addToContainerList(containerId string, logReader io.ReadCloser) error { - d.mu.Lock() - defer d.mu.Unlock() - d.containerList[containerId] = logReader - return nil -} - -func (d *DockerLogs) removeFromContainerList(containerId string) error { - d.mu.Lock() - defer d.mu.Unlock() - delete(d.containerList, containerId) - return nil -} - -func (d *DockerLogs) containerInContainerList(containerId string) bool { - if _, ok := d.containerList[containerId]; ok { - return true - } - return false -} - -func (d *DockerLogs) stopAllReaders() error { - for _, container := range d.containerList { - container.Close() - } - return nil -} - -func (d *DockerLogs) containerListUpdate(acc telegraf.Accumulator) error { - ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration) - defer cancel() - if d.client == nil { - return errors.New(fmt.Sprintf("%s : Dock client is null", ERR_PREFIX)) - } - containers, err := d.client.ContainerList(ctx, d.opts) - if err != nil { - return err - } - for _, container := range containers { - if d.containerInContainerList(container.ID) { - continue - } - d.wg.Add(1) - /*Start a new goroutine for every new container that has logs to collect*/ - go func(c types.Container) { - defer d.wg.Done() - logOptions := types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Timestamps: false, - Details: true, - Follow: true, - Tail: "0", - } - logReader, err := d.client.ContainerLogs(context.Background(), c.ID, logOptions) - if err != nil { - acc.AddError(err) - return - } - d.addToContainerList(c.ID, logReader) - err = d.tailContainerLogs(c, logReader, acc) - if err != nil { - acc.AddError(err) - } - d.removeFromContainerList(c.ID) - return - }(container) - } - return nil -} - -func (d *DockerLogs) tailContainerLogs( - container types.Container, logReader io.ReadCloser, - acc telegraf.Accumulator, -) error { - c, err := d.client.ContainerInspect(context.Background(), container.ID) - if err != nil { - return err - } - /* Parse container name */ - var cname string - for _, name := range container.Names { - trimmedName := strings.TrimPrefix(name, "/") - match := d.containerFilter.Match(trimmedName) - if match { - cname = trimmedName - break - } - } - - if cname == "" { - return errors.New(fmt.Sprintf("%s : container name is null", ERR_PREFIX)) - } - imageName, imageVersion := parseImage(container.Image) - tags := map[string]string{ - "container_name": cname, - "container_image": imageName, - "container_version": imageVersion, - } - fields := map[string]interface{}{} - fields["container_id"] = container.ID - // Add labels to tags - for k, label := range container.Labels { - if d.labelFilter.Match(k) { - tags[k] = label - } - } - if c.Config.Tty { - err = pushTtyLogs(acc, tags, fields, logReader) - } else { - _, err = pushLogs(acc, tags, fields, logReader) - } - if err != nil { - return err - } - return nil -} -func pushTtyLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (err error) { - tags["logType"] = "unknown" //in tty mode we wont be able to differentiate b/w stdout and stderr hence unknown - data := make([]byte, logBytesMax) - for { - num, err := src.Read(data) - if num > 0 { - fields["message"] = data[1:num] - acc.AddFields("docker_log", fields, tags) - } - if err == io.EOF { - fields["message"] = data[1:num] - acc.AddFields("docker_log", fields, tags) - return nil - } - if err != nil { - return err - } - } -} - -/* Inspired from https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go */ -func pushLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (written int64, err error) { - var ( - buf = make([]byte, startingBufLen) - bufLen = len(buf) - nr int - er error - frameSize int - ) - for { - // Make sure we have at least a full header - for nr < stdWriterPrefixLen { - var nr2 int - nr2, er = src.Read(buf[nr:]) - nr += nr2 - if er == io.EOF { - if nr < stdWriterPrefixLen { - return written, nil - } - break - } - if er != nil { - return 0, er - } - } - stream := StdType(buf[stdWriterFdIndex]) - // Check the first byte to know where to write - var logType string - switch stream { - case Stdin: - logType = "stdin" - break - case Stdout: - logType = "stdout" - break - case Stderr: - logType = "stderr" - break - case Systemerr: - fallthrough - default: - return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) - } - // Retrieve the size of the frame - frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) - - // Check if the buffer is big enough to read the frame. - // Extend it if necessary. - if frameSize+stdWriterPrefixLen > bufLen { - buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) - bufLen = len(buf) - } - - // While the amount of bytes read is less than the size of the frame + header, we keep reading - for nr < frameSize+stdWriterPrefixLen { - var nr2 int - nr2, er = src.Read(buf[nr:]) - nr += nr2 - if er == io.EOF { - if nr < frameSize+stdWriterPrefixLen { - return written, nil - } - break - } - if er != nil { - return 0, er - } - } - - // we might have an error from the source mixed up in our multiplexed - // stream. if we do, return it. - if stream == Systemerr { - return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) - } - - tags["stream"] = logType - fields["message"] = buf[stdWriterPrefixLen+1 : frameSize+stdWriterPrefixLen] - acc.AddFields("docker_log", fields, tags) - written += int64(frameSize) - - // Move the rest of the buffer to the beginning - copy(buf, buf[frameSize+stdWriterPrefixLen:]) - // Move the index - nr -= frameSize + stdWriterPrefixLen - } -} - -func (d *DockerLogs) Start(acc telegraf.Accumulator) error { - var c Client - var err error - if d.Endpoint == "ENV" { - c, err = d.newEnvClient() - } else { - tlsConfig, err := d.ClientConfig.TLSConfig() - if err != nil { - return err - } - c, err = d.newClient(d.Endpoint, tlsConfig) - } - if err != nil { - return err - } - d.client = c - // Create label filters if not already created - if !d.filtersCreated { - err := d.createLabelFilters() - if err != nil { - return err - } - err = d.createContainerFilters() - if err != nil { - return err - } - err = d.createContainerStateFilters() - if err != nil { - return err - } - d.filtersCreated = true - } - filterArgs := filters.NewArgs() - for _, state := range containerStates { - if d.stateFilter.Match(state) { - filterArgs.Add("status", state) - } - } - - // All container states were excluded - if filterArgs.Len() == 0 { - return nil - } - - d.opts = types.ContainerListOptions{ - Filters: filterArgs, - } - return nil -} - -/* Inspired from https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/docker.go */ -func parseImage(image string) (string, string) { - // Adapts some of the logic from the actual Docker library's image parsing - // routines: - // https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go - domain := "" - remainder := "" - - i := strings.IndexRune(image, '/') - - if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") { - remainder = image - } else { - domain, remainder = image[:i], image[i+1:] - } - - imageName := "" - imageVersion := "unknown" - - i = strings.LastIndex(remainder, ":") - if i > -1 { - imageVersion = remainder[i+1:] - imageName = remainder[:i] - } else { - imageName = remainder - } - - if domain != "" { - imageName = domain + "/" + imageName - } - - return imageName, imageVersion -} - -func (d *DockerLogs) Stop() { - d.mu.Lock() - d.stopAllReaders() - d.mu.Unlock() - d.wg.Wait() -} - -func init() { - inputs.Add("docker_log", func() telegraf.Input { - return &DockerLogs{ - Timeout: internal.Duration{Duration: time.Second * 5}, - Endpoint: defaultEndpoint, - newEnvClient: NewEnvClient, - newClient: NewClient, - filtersCreated: false, - containerList: make(map[string]io.ReadCloser), - } - }) -} diff --git a/testutil/metric.go b/testutil/metric.go index b92c724f1..0dca9c641 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -123,6 +123,11 @@ func SortMetrics() cmp.Option { return cmpopts.SortSlices(lessFunc) } +// IgnoreTime disables comparison of timestamp. +func IgnoreTime() cmp.Option { + return cmpopts.IgnoreFields(metricDiff{}, "Time") +} + // MetricEqual returns true if the metrics are equal. func MetricEqual(expected, actual telegraf.Metric) bool { var lhs, rhs *metricDiff