Follow up work on docker_log input (#6008)

This commit is contained in:
Daniel Nelson 2019-06-20 11:54:12 -07:00 committed by GitHub
parent 29c3d42e7e
commit a0c739eec7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 752 additions and 577 deletions

2
Gopkg.lock generated
View File

@ -326,6 +326,7 @@
"api/types/versions", "api/types/versions",
"api/types/volume", "api/types/volume",
"client", "client",
"pkg/stdcopy",
] ]
pruneopts = "" pruneopts = ""
revision = "ed7b6428c133e7c59404251a09b7d6b02fa83cc2" revision = "ed7b6428c133e7c59404251a09b7d6b02fa83cc2"
@ -1591,6 +1592,7 @@
"github.com/docker/docker/api/types/registry", "github.com/docker/docker/api/types/registry",
"github.com/docker/docker/api/types/swarm", "github.com/docker/docker/api/types/swarm",
"github.com/docker/docker/client", "github.com/docker/docker/client",
"github.com/docker/docker/pkg/stdcopy",
"github.com/docker/libnetwork/ipvs", "github.com/docker/libnetwork/ipvs",
"github.com/eclipse/paho.mqtt.golang", "github.com/eclipse/paho.mqtt.golang",
"github.com/ericchiang/k8s", "github.com/ericchiang/k8s",

36
internal/docker/docker.go Normal file
View File

@ -0,0 +1,36 @@
package docker
import "strings"
// Adapts some of the logic from the actual Docker library's image parsing
// routines:
// https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go
func ParseImage(image string) (string, string) {
domain := ""
remainder := ""
i := strings.IndexRune(image, '/')
if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") {
remainder = image
} else {
domain, remainder = image[:i], image[i+1:]
}
imageName := ""
imageVersion := "unknown"
i = strings.LastIndex(remainder, ":")
if i > -1 {
imageVersion = remainder[i+1:]
imageName = remainder[:i]
} else {
imageName = remainder
}
if domain != "" {
imageName = domain + "/" + imageName
}
return imageName, imageVersion
}

View File

@ -0,0 +1,59 @@
package docker_test
import (
"testing"
"github.com/influxdata/telegraf/internal/docker"
"github.com/stretchr/testify/require"
)
func TestParseImage(t *testing.T) {
tests := []struct {
image string
parsedName string
parsedVersion string
}{
{
image: "postgres",
parsedName: "postgres",
parsedVersion: "unknown",
},
{
image: "postgres:latest",
parsedName: "postgres",
parsedVersion: "latest",
},
{
image: "coreos/etcd",
parsedName: "coreos/etcd",
parsedVersion: "unknown",
},
{
image: "coreos/etcd:latest",
parsedName: "coreos/etcd",
parsedVersion: "latest",
},
{
image: "quay.io/postgres",
parsedName: "quay.io/postgres",
parsedVersion: "unknown",
},
{
image: "quay.io:4443/coreos/etcd",
parsedName: "quay.io:4443/coreos/etcd",
parsedVersion: "unknown",
},
{
image: "quay.io:4443/coreos/etcd:latest",
parsedName: "quay.io:4443/coreos/etcd",
parsedVersion: "latest",
},
}
for _, tt := range tests {
t.Run("parse name "+tt.image, func(t *testing.T) {
imageName, imageVersion := docker.ParseImage(tt.image)
require.Equal(t, tt.parsedName, imageName)
require.Equal(t, tt.parsedVersion, imageVersion)
})
}
}

View File

@ -20,6 +20,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/docker"
tlsint "github.com/influxdata/telegraf/internal/tls" tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -361,44 +362,12 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
return nil return nil
} }
func parseImage(image string) (string, string) {
// Adapts some of the logic from the actual Docker library's image parsing
// routines:
// https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go
domain := ""
remainder := ""
i := strings.IndexRune(image, '/')
if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") {
remainder = image
} else {
domain, remainder = image[:i], image[i+1:]
}
imageName := ""
imageVersion := "unknown"
i = strings.LastIndex(remainder, ":")
if i > -1 {
imageVersion = remainder[i+1:]
imageName = remainder[:i]
} else {
imageName = remainder
}
if domain != "" {
imageName = domain + "/" + imageName
}
return imageName, imageVersion
}
func (d *Docker) gatherContainer( func (d *Docker) gatherContainer(
container types.Container, container types.Container,
acc telegraf.Accumulator, acc telegraf.Accumulator,
) error { ) error {
var v *types.StatsJSON var v *types.StatsJSON
// Parse container name // Parse container name
var cname string var cname string
for _, name := range container.Names { for _, name := range container.Names {
@ -414,7 +383,7 @@ func (d *Docker) gatherContainer(
return nil return nil
} }
imageName, imageVersion := parseImage(container.Image) imageName, imageVersion := docker.ParseImage(container.Image)
tags := map[string]string{ tags := map[string]string{
"engine_host": d.engineHost, "engine_host": d.engineHost,

View File

@ -9,10 +9,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/testutil"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -945,54 +944,3 @@ func TestContainerName(t *testing.T) {
}) })
} }
} }
func TestParseImage(t *testing.T) {
tests := []struct {
image string
parsedName string
parsedVersion string
}{
{
image: "postgres",
parsedName: "postgres",
parsedVersion: "unknown",
},
{
image: "postgres:latest",
parsedName: "postgres",
parsedVersion: "latest",
},
{
image: "coreos/etcd",
parsedName: "coreos/etcd",
parsedVersion: "unknown",
},
{
image: "coreos/etcd:latest",
parsedName: "coreos/etcd",
parsedVersion: "latest",
},
{
image: "quay.io/postgres",
parsedName: "quay.io/postgres",
parsedVersion: "unknown",
},
{
image: "quay.io:4443/coreos/etcd",
parsedName: "quay.io:4443/coreos/etcd",
parsedVersion: "unknown",
},
{
image: "quay.io:4443/coreos/etcd:latest",
parsedName: "quay.io:4443/coreos/etcd",
parsedVersion: "latest",
},
}
for _, tt := range tests {
t.Run("parse name "+tt.image, func(t *testing.T) {
imageName, imageVersion := parseImage(tt.image)
require.Equal(t, tt.parsedName, imageName)
require.Equal(t, tt.parsedVersion, imageVersion)
})
}
}

View File

@ -3,22 +3,35 @@
The docker log plugin uses the Docker Engine API to get logs on running The docker log plugin uses the Docker Engine API to get logs on running
docker containers. docker containers.
The docker plugin uses the [Official Docker Client](https://github.com/moby/moby/tree/master/client) The docker plugin uses the [Official Docker Client][] to gather logs from the
to gather logs from the [Engine API](https://docs.docker.com/engine/api/v1.24/). [Engine API][].
Note: This plugin works only for containers with the `local` or `json-file` or `journald` logging driver.
### Configuration: **Note:** This plugin works only for containers with the `local` or
`json-file` or `journald` logging driver.
[Official Docker Client]: https://github.com/moby/moby/tree/master/client
[Engine API]: https://docs.docker.com/engine/api/v1.24/
### Configuration
```toml ```toml
# Read metrics about docker containers
[[inputs.docker_log]] [[inputs.docker_log]]
## Docker Endpoint ## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]" ## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV" ## To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock" # endpoint = "unix:///var/run/docker.sock"
## Containers to include and exclude. Collect all if empty. Globs accepted. ## When true, container logs are read from the beginning; otherwise
container_name_include = [] ## reading begins at the end of the log.
container_name_exclude = [] # from_beginning = false
## Timeout for Docker API calls.
# timeout = "5s"
## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
# container_name_include = []
# container_name_exclude = []
## Container states to include and exclude. Globs accepted. ## Container states to include and exclude. Globs accepted.
## When empty only containers in the "running" state will be captured. ## When empty only containers in the "running" state will be captured.
@ -27,8 +40,8 @@ Note: This plugin works only for containers with the `local` or `json-file` or
## docker labels to include and exclude as tags. Globs accepted. ## docker labels to include and exclude as tags. Globs accepted.
## Note that an empty array for both will include all labels as tags ## Note that an empty array for both will include all labels as tags
docker_label_include = [] # docker_label_include = []
docker_label_exclude = [] # docker_label_exclude = []
## Optional TLS Config ## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"
@ -41,20 +54,31 @@ Note: This plugin works only for containers with the `local` or `json-file` or
#### Environment Configuration #### Environment Configuration
When using the `"ENV"` endpoint, the connection is configured using the When using the `"ENV"` endpoint, the connection is configured using the
[cli Docker environment variables](https://godoc.org/github.com/moby/moby/client#NewEnvClient). [CLI Docker environment variables][env]
### Metrics: [env]: https://godoc.org/github.com/moby/moby/client#NewEnvClient
### Metrics
- docker_log - docker_log
- tags: - tags:
- container_id - container_image
- container_version
- container_name - container_name
- stream - stream (stdout, stderr, or tty)
- fields: - fields:
- container_id
- message - message
### Example Output:
### Example Output
``` ```
docker_log,com.docker.compose.config-hash=e19e13df8fd01ba2d7c1628158fca45cc91afbbe9661b2d30550547eb53a861e,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=influxdb,com.docker.compose.version=1.21.2,containerId=fce475bbfa4c8380ff858d5d767f78622ca6de955b525477624c2b7896a5b8e4,containerName=aicon-influxdb,host=prash-laptop,logType=stderr log=" [httpd] 172.23.0.2 - aicon_admin [13/Apr/2019:08:35:53 +0000] \"POST /query?db=&q=SHOW+SUBSCRIPTIONS HTTP/1.1\" 200 232 \"-\" \"KapacitorInfluxDBClient\" 2661bc9c-5dc7-11e9-82f8-0242ac170007 1360\n" 1555144553541000000 docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:\"371ee5d3e587\", Flush Interval:10s" 1560913872000000000
docker_log,com.docker.compose.config-hash=fd91b3b096c7ab346971c681b88fe1357c609dcc6850e4ea5b1287ad28a57e5d,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=kapacitor,com.docker.compose.version=1.21.2,containerId=6514d1cf6d19e7ecfedf894941f0a2ea21b8aac5e6f48e64f19dbc9bb2805a25,containerName=aicon-kapacitor,host=prash-laptop,logType=stderr log=" ts=2019-04-13T08:36:00.019Z lvl=info msg=\"http request\" service=http host=172.23.0.7 username=- start=2019-04-13T08:36:00.013835165Z method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=2a3eb481-5dc7-11e9-825b-000000000000 duration=5.814404ms\n" 1555144560024000000 docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Tags enabled: host=371ee5d3e587" 1560913872000000000
docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded outputs: file" 1560913872000000000
docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded processors:" 1560913872000000000
docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded aggregators:" 1560913872000000000
docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Loaded inputs: net" 1560913872000000000
docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Using config file: /etc/telegraf/telegraf.conf" 1560913872000000000
docker_log,container_image=telegraf,container_name=sharp_bell,container_version=alpine,stream=stderr container_id="371ee5d3e58726112f499be62cddef800138ca72bbba635ed2015fbf475b1023",message="2019-06-19T03:11:11Z I! Starting Telegraf 1.10.4" 1560913872000000000
``` ```

View File

@ -0,0 +1,429 @@
package docker_log
import (
"bufio"
"context"
"crypto/tls"
"io"
"strings"
"sync"
"time"
"unicode"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/pkg/stdcopy"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/docker"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
var sampleConfig = `
## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
# endpoint = "unix:///var/run/docker.sock"
## When true, container logs are read from the beginning; otherwise
## reading begins at the end of the log.
# from_beginning = false
## Timeout for Docker API calls.
# timeout = "5s"
## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
# container_name_include = []
# container_name_exclude = []
## Container states to include and exclude. Globs accepted.
## When empty only containers in the "running" state will be captured.
# container_state_include = []
# container_state_exclude = []
## docker labels to include and exclude as tags. Globs accepted.
## Note that an empty array for both will include all labels as tags
# docker_label_include = []
# docker_label_exclude = []
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
const (
defaultEndpoint = "unix:///var/run/docker.sock"
// Maximum bytes of a log line before it will be split, size is mirroring
// docker code:
// https://github.com/moby/moby/blob/master/daemon/logger/copier.go#L21
maxLineBytes = 16 * 1024
)
var (
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
)
type DockerLogs struct {
Endpoint string `toml:"endpoint"`
FromBeginning bool `toml:"from_beginning"`
Timeout internal.Duration `toml:"timeout"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
ContainerStateInclude []string `toml:"container_state_include"`
ContainerStateExclude []string `toml:"container_state_exclude"`
tlsint.ClientConfig
newEnvClient func() (Client, error)
newClient func(string, *tls.Config) (Client, error)
client Client
labelFilter filter.Filter
containerFilter filter.Filter
stateFilter filter.Filter
opts types.ContainerListOptions
wg sync.WaitGroup
mu sync.Mutex
containerList map[string]context.CancelFunc
}
func (d *DockerLogs) Description() string {
return "Read logging output from the Docker engine"
}
func (d *DockerLogs) SampleConfig() string {
return sampleConfig
}
func (d *DockerLogs) Init() error {
var err error
if d.Endpoint == "ENV" {
d.client, err = d.newEnvClient()
if err != nil {
return err
}
} else {
tlsConfig, err := d.ClientConfig.TLSConfig()
if err != nil {
return err
}
d.client, err = d.newClient(d.Endpoint, tlsConfig)
if err != nil {
return err
}
}
// Create filters
err = d.createLabelFilters()
if err != nil {
return err
}
err = d.createContainerFilters()
if err != nil {
return err
}
err = d.createContainerStateFilters()
if err != nil {
return err
}
filterArgs := filters.NewArgs()
for _, state := range containerStates {
if d.stateFilter.Match(state) {
filterArgs.Add("status", state)
}
}
if filterArgs.Len() != 0 {
d.opts = types.ContainerListOptions{
Filters: filterArgs,
}
}
return nil
}
func (d *DockerLogs) addToContainerList(containerID string, cancel context.CancelFunc) error {
d.mu.Lock()
defer d.mu.Unlock()
d.containerList[containerID] = cancel
return nil
}
func (d *DockerLogs) removeFromContainerList(containerID string) error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.containerList, containerID)
return nil
}
func (d *DockerLogs) containerInContainerList(containerID string) bool {
d.mu.Lock()
defer d.mu.Unlock()
_, ok := d.containerList[containerID]
return ok
}
func (d *DockerLogs) cancelTails() error {
d.mu.Lock()
defer d.mu.Unlock()
for _, cancel := range d.containerList {
cancel()
}
return nil
}
func (d *DockerLogs) matchedContainerName(names []string) string {
// Check if all container names are filtered; in practice I believe
// this array is always of length 1.
for _, name := range names {
trimmedName := strings.TrimPrefix(name, "/")
match := d.containerFilter.Match(trimmedName)
if match {
return trimmedName
}
}
return ""
}
func (d *DockerLogs) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, d.Timeout.Duration)
defer cancel()
containers, err := d.client.ContainerList(ctx, d.opts)
if err != nil {
return err
}
for _, container := range containers {
if d.containerInContainerList(container.ID) {
continue
}
containerName := d.matchedContainerName(container.Names)
if containerName == "" {
continue
}
ctx, cancel := context.WithCancel(context.Background())
d.addToContainerList(container.ID, cancel)
// Start a new goroutine for every new container that has logs to collect
d.wg.Add(1)
go func(container types.Container) {
defer d.wg.Done()
defer d.removeFromContainerList(container.ID)
err = d.tailContainerLogs(ctx, acc, container, containerName)
if err != nil {
acc.AddError(err)
}
}(container)
}
return nil
}
func (d *DockerLogs) hasTTY(ctx context.Context, container types.Container) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, d.Timeout.Duration)
defer cancel()
c, err := d.client.ContainerInspect(ctx, container.ID)
if err != nil {
return false, err
}
return c.Config.Tty, nil
}
func (d *DockerLogs) tailContainerLogs(
ctx context.Context,
acc telegraf.Accumulator,
container types.Container,
containerName string,
) error {
imageName, imageVersion := docker.ParseImage(container.Image)
tags := map[string]string{
"container_name": containerName,
"container_image": imageName,
"container_version": imageVersion,
}
// Add matching container labels as tags
for k, label := range container.Labels {
if d.labelFilter.Match(k) {
tags[k] = label
}
}
hasTTY, err := d.hasTTY(ctx, container)
if err != nil {
return err
}
tail := "0"
if d.FromBeginning {
tail = "all"
}
logOptions := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: false,
Details: false,
Follow: true,
Tail: tail,
}
logReader, err := d.client.ContainerLogs(ctx, container.ID, logOptions)
if err != nil {
return err
}
// If the container is using a TTY, there is only a single stream
// (stdout), and data is copied directly from the container output stream,
// no extra multiplexing or headers.
//
// If the container is *not* using a TTY, streams for stdout and stderr are
// multiplexed.
if hasTTY {
return tailStream(acc, tags, container.ID, logReader, "tty")
} else {
return tailMultiplexed(acc, tags, container.ID, logReader)
}
}
func tailStream(
acc telegraf.Accumulator,
baseTags map[string]string,
containerID string,
reader io.ReadCloser,
stream string,
) error {
defer reader.Close()
tags := make(map[string]string, len(baseTags)+1)
for k, v := range baseTags {
tags[k] = v
}
tags["stream"] = stream
r := bufio.NewReaderSize(reader, 64*1024)
var err error
var message string
for {
message, err = r.ReadString('\n')
// Keep any leading space, but remove whitespace from end of line.
// This preserves space in, for example, stacktraces, while removing
// annoying end of line characters and is similar to how other logging
// plugins such as syslog behave.
message = strings.TrimRightFunc(message, unicode.IsSpace)
if len(message) != 0 {
acc.AddFields("docker_log", map[string]interface{}{
"container_id": containerID,
"message": message,
}, tags)
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}
func tailMultiplexed(
acc telegraf.Accumulator,
tags map[string]string,
containerID string,
src io.ReadCloser,
) error {
outReader, outWriter := io.Pipe()
errReader, errWriter := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := tailStream(acc, tags, containerID, outReader, "stdout")
if err != nil {
acc.AddError(err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err := tailStream(acc, tags, containerID, errReader, "stderr")
if err != nil {
acc.AddError(err)
}
}()
_, err := stdcopy.StdCopy(outWriter, errWriter, src)
outWriter.Close()
errWriter.Close()
src.Close()
wg.Wait()
return err
}
func (d *DockerLogs) Stop() {
d.cancelTails()
d.wg.Wait()
}
// Following few functions have been inherited from telegraf docker input plugin
func (d *DockerLogs) createContainerFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
if err != nil {
return err
}
d.containerFilter = filter
return nil
}
func (d *DockerLogs) createLabelFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
if err != nil {
return err
}
d.labelFilter = filter
return nil
}
func (d *DockerLogs) createContainerStateFilters() error {
if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 {
d.ContainerStateInclude = []string{"running"}
}
filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude)
if err != nil {
return err
}
d.stateFilter = filter
return nil
}
func init() {
inputs.Add("docker_log", func() telegraf.Input {
return &DockerLogs{
Timeout: internal.Duration{Duration: time.Second * 5},
Endpoint: defaultEndpoint,
newEnvClient: NewEnvClient,
newClient: NewClient,
containerList: make(map[string]context.CancelFunc),
}
})
}

View File

@ -0,0 +1,175 @@
package docker_log
import (
"bytes"
"context"
"crypto/tls"
"io"
"testing"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
type MockClient struct {
ContainerListF func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerInspectF func(ctx context.Context, containerID string) (types.ContainerJSON, error)
ContainerLogsF func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
}
func (c *MockClient) ContainerList(
ctx context.Context,
options types.ContainerListOptions,
) ([]types.Container, error) {
return c.ContainerListF(ctx, options)
}
func (c *MockClient) ContainerInspect(
ctx context.Context,
containerID string,
) (types.ContainerJSON, error) {
return c.ContainerInspectF(ctx, containerID)
}
func (c *MockClient) ContainerLogs(
ctx context.Context,
containerID string,
options types.ContainerLogsOptions,
) (io.ReadCloser, error) {
return c.ContainerLogsF(ctx, containerID, options)
}
type Response struct {
io.Reader
}
func (r *Response) Close() error {
return nil
}
func Test(t *testing.T) {
tests := []struct {
name string
client *MockClient
expected []telegraf.Metric
}{
{
name: "no containers",
client: &MockClient{
ContainerListF: func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
return nil, nil
},
},
},
{
name: "one container tty",
client: &MockClient{
ContainerListF: func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
return []types.Container{
{
ID: "deadbeef",
Names: []string{"/telegraf"},
Image: "influxdata/telegraf:1.11.0",
},
}, nil
},
ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return types.ContainerJSON{
Config: &container.Config{
Tty: true,
},
}, nil
},
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return &Response{Reader: bytes.NewBuffer([]byte("hello\n"))}, nil
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"docker_log",
map[string]string{
"container_name": "telegraf",
"container_image": "influxdata/telegraf",
"container_version": "1.11.0",
"stream": "tty",
},
map[string]interface{}{
"container_id": "deadbeef",
"message": "hello",
},
time.Now(),
),
},
},
{
name: "one container multiplex",
client: &MockClient{
ContainerListF: func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
return []types.Container{
{
ID: "deadbeef",
Names: []string{"/telegraf"},
Image: "influxdata/telegraf:1.11.0",
},
}, nil
},
ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return types.ContainerJSON{
Config: &container.Config{
Tty: false,
},
}, nil
},
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
var buf bytes.Buffer
w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout)
w.Write([]byte("hello from stdout"))
return &Response{Reader: &buf}, nil
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"docker_log",
map[string]string{
"container_name": "telegraf",
"container_image": "influxdata/telegraf",
"container_version": "1.11.0",
"stream": "stdout",
},
map[string]interface{}{
"container_id": "deadbeef",
"message": "hello from stdout",
},
time.Now(),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
plugin := &DockerLogs{
Timeout: internal.Duration{Duration: time.Second * 5},
newClient: func(string, *tls.Config) (Client, error) { return tt.client, nil },
containerList: make(map[string]context.CancelFunc),
}
err := plugin.Init()
require.NoError(t, err)
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(len(tt.expected))
plugin.Stop()
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
})
}
}

View File

@ -1,472 +0,0 @@
package docker_log
import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"io"
"strings"
"sync"
"time"
)
type StdType byte
const (
Stdin StdType = iota
Stdout
Stderr
Systemerr
stdWriterPrefixLen = 8
stdWriterFdIndex = 0
stdWriterSizeIndex = 4
startingBufLen = 32*1024 + stdWriterPrefixLen + 1
ERR_PREFIX = "E! [inputs.docker_log]"
defaultEndpoint = "unix:///var/run/docker.sock"
logBytesMax = 1000
)
type DockerLogs struct {
Endpoint string
Timeout internal.Duration
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
ContainerStateInclude []string `toml:"container_state_include"`
ContainerStateExclude []string `toml:"container_state_exclude"`
tlsint.ClientConfig
newEnvClient func() (Client, error)
newClient func(string, *tls.Config) (Client, error)
client Client
filtersCreated bool
labelFilter filter.Filter
containerFilter filter.Filter
stateFilter filter.Filter
opts types.ContainerListOptions
wg sync.WaitGroup
mu sync.Mutex
containerList map[string]io.ReadCloser
}
var (
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
)
var sampleConfig = `
## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"
## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
container_name_include = []
container_name_exclude = []
## Container states to include and exclude. Globs accepted.
## When empty only containers in the "running" state will be captured.
# container_state_include = []
# container_state_exclude = []
## docker labels to include and exclude as tags. Globs accepted.
## Note that an empty array for both will include all labels as tags
docker_label_include = []
docker_label_exclude = []
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (d *DockerLogs) Description() string {
return "Plugin to get docker logs"
}
func (d *DockerLogs) SampleConfig() string {
return sampleConfig
}
func (d *DockerLogs) Gather(acc telegraf.Accumulator) error {
/*Check to see if any new containers have been created since last time*/
return d.containerListUpdate(acc)
}
/*Following few functions have been inherited from telegraf docker input plugin*/
func (d *DockerLogs) createContainerFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
if err != nil {
return err
}
d.containerFilter = filter
return nil
}
func (d *DockerLogs) createLabelFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
if err != nil {
return err
}
d.labelFilter = filter
return nil
}
func (d *DockerLogs) createContainerStateFilters() error {
if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 {
d.ContainerStateInclude = []string{"running"}
}
filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude)
if err != nil {
return err
}
d.stateFilter = filter
return nil
}
func (d *DockerLogs) addToContainerList(containerId string, logReader io.ReadCloser) error {
d.mu.Lock()
defer d.mu.Unlock()
d.containerList[containerId] = logReader
return nil
}
func (d *DockerLogs) removeFromContainerList(containerId string) error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.containerList, containerId)
return nil
}
func (d *DockerLogs) containerInContainerList(containerId string) bool {
if _, ok := d.containerList[containerId]; ok {
return true
}
return false
}
func (d *DockerLogs) stopAllReaders() error {
for _, container := range d.containerList {
container.Close()
}
return nil
}
func (d *DockerLogs) containerListUpdate(acc telegraf.Accumulator) error {
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
if d.client == nil {
return errors.New(fmt.Sprintf("%s : Dock client is null", ERR_PREFIX))
}
containers, err := d.client.ContainerList(ctx, d.opts)
if err != nil {
return err
}
for _, container := range containers {
if d.containerInContainerList(container.ID) {
continue
}
d.wg.Add(1)
/*Start a new goroutine for every new container that has logs to collect*/
go func(c types.Container) {
defer d.wg.Done()
logOptions := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: false,
Details: true,
Follow: true,
Tail: "0",
}
logReader, err := d.client.ContainerLogs(context.Background(), c.ID, logOptions)
if err != nil {
acc.AddError(err)
return
}
d.addToContainerList(c.ID, logReader)
err = d.tailContainerLogs(c, logReader, acc)
if err != nil {
acc.AddError(err)
}
d.removeFromContainerList(c.ID)
return
}(container)
}
return nil
}
func (d *DockerLogs) tailContainerLogs(
container types.Container, logReader io.ReadCloser,
acc telegraf.Accumulator,
) error {
c, err := d.client.ContainerInspect(context.Background(), container.ID)
if err != nil {
return err
}
/* Parse container name */
var cname string
for _, name := range container.Names {
trimmedName := strings.TrimPrefix(name, "/")
match := d.containerFilter.Match(trimmedName)
if match {
cname = trimmedName
break
}
}
if cname == "" {
return errors.New(fmt.Sprintf("%s : container name is null", ERR_PREFIX))
}
imageName, imageVersion := parseImage(container.Image)
tags := map[string]string{
"container_name": cname,
"container_image": imageName,
"container_version": imageVersion,
}
fields := map[string]interface{}{}
fields["container_id"] = container.ID
// Add labels to tags
for k, label := range container.Labels {
if d.labelFilter.Match(k) {
tags[k] = label
}
}
if c.Config.Tty {
err = pushTtyLogs(acc, tags, fields, logReader)
} else {
_, err = pushLogs(acc, tags, fields, logReader)
}
if err != nil {
return err
}
return nil
}
func pushTtyLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (err error) {
tags["logType"] = "unknown" //in tty mode we wont be able to differentiate b/w stdout and stderr hence unknown
data := make([]byte, logBytesMax)
for {
num, err := src.Read(data)
if num > 0 {
fields["message"] = data[1:num]
acc.AddFields("docker_log", fields, tags)
}
if err == io.EOF {
fields["message"] = data[1:num]
acc.AddFields("docker_log", fields, tags)
return nil
}
if err != nil {
return err
}
}
}
/* Inspired from https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go */
func pushLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (written int64, err error) {
var (
buf = make([]byte, startingBufLen)
bufLen = len(buf)
nr int
er error
frameSize int
)
for {
// Make sure we have at least a full header
for nr < stdWriterPrefixLen {
var nr2 int
nr2, er = src.Read(buf[nr:])
nr += nr2
if er == io.EOF {
if nr < stdWriterPrefixLen {
return written, nil
}
break
}
if er != nil {
return 0, er
}
}
stream := StdType(buf[stdWriterFdIndex])
// Check the first byte to know where to write
var logType string
switch stream {
case Stdin:
logType = "stdin"
break
case Stdout:
logType = "stdout"
break
case Stderr:
logType = "stderr"
break
case Systemerr:
fallthrough
default:
return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
}
// Retrieve the size of the frame
frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4]))
// Check if the buffer is big enough to read the frame.
// Extend it if necessary.
if frameSize+stdWriterPrefixLen > bufLen {
buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...)
bufLen = len(buf)
}
// While the amount of bytes read is less than the size of the frame + header, we keep reading
for nr < frameSize+stdWriterPrefixLen {
var nr2 int
nr2, er = src.Read(buf[nr:])
nr += nr2
if er == io.EOF {
if nr < frameSize+stdWriterPrefixLen {
return written, nil
}
break
}
if er != nil {
return 0, er
}
}
// we might have an error from the source mixed up in our multiplexed
// stream. if we do, return it.
if stream == Systemerr {
return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen]))
}
tags["stream"] = logType
fields["message"] = buf[stdWriterPrefixLen+1 : frameSize+stdWriterPrefixLen]
acc.AddFields("docker_log", fields, tags)
written += int64(frameSize)
// Move the rest of the buffer to the beginning
copy(buf, buf[frameSize+stdWriterPrefixLen:])
// Move the index
nr -= frameSize + stdWriterPrefixLen
}
}
func (d *DockerLogs) Start(acc telegraf.Accumulator) error {
var c Client
var err error
if d.Endpoint == "ENV" {
c, err = d.newEnvClient()
} else {
tlsConfig, err := d.ClientConfig.TLSConfig()
if err != nil {
return err
}
c, err = d.newClient(d.Endpoint, tlsConfig)
}
if err != nil {
return err
}
d.client = c
// Create label filters if not already created
if !d.filtersCreated {
err := d.createLabelFilters()
if err != nil {
return err
}
err = d.createContainerFilters()
if err != nil {
return err
}
err = d.createContainerStateFilters()
if err != nil {
return err
}
d.filtersCreated = true
}
filterArgs := filters.NewArgs()
for _, state := range containerStates {
if d.stateFilter.Match(state) {
filterArgs.Add("status", state)
}
}
// All container states were excluded
if filterArgs.Len() == 0 {
return nil
}
d.opts = types.ContainerListOptions{
Filters: filterArgs,
}
return nil
}
/* Inspired from https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/docker.go */
func parseImage(image string) (string, string) {
// Adapts some of the logic from the actual Docker library's image parsing
// routines:
// https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go
domain := ""
remainder := ""
i := strings.IndexRune(image, '/')
if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") {
remainder = image
} else {
domain, remainder = image[:i], image[i+1:]
}
imageName := ""
imageVersion := "unknown"
i = strings.LastIndex(remainder, ":")
if i > -1 {
imageVersion = remainder[i+1:]
imageName = remainder[:i]
} else {
imageName = remainder
}
if domain != "" {
imageName = domain + "/" + imageName
}
return imageName, imageVersion
}
func (d *DockerLogs) Stop() {
d.mu.Lock()
d.stopAllReaders()
d.mu.Unlock()
d.wg.Wait()
}
func init() {
inputs.Add("docker_log", func() telegraf.Input {
return &DockerLogs{
Timeout: internal.Duration{Duration: time.Second * 5},
Endpoint: defaultEndpoint,
newEnvClient: NewEnvClient,
newClient: NewClient,
filtersCreated: false,
containerList: make(map[string]io.ReadCloser),
}
})
}

View File

@ -123,6 +123,11 @@ func SortMetrics() cmp.Option {
return cmpopts.SortSlices(lessFunc) return cmpopts.SortSlices(lessFunc)
} }
// IgnoreTime disables comparison of timestamp.
func IgnoreTime() cmp.Option {
return cmpopts.IgnoreFields(metricDiff{}, "Time")
}
// MetricEqual returns true if the metrics are equal. // MetricEqual returns true if the metrics are equal.
func MetricEqual(expected, actual telegraf.Metric) bool { func MetricEqual(expected, actual telegraf.Metric) bool {
var lhs, rhs *metricDiff var lhs, rhs *metricDiff