Add docker log plugin (#4773)
This commit is contained in:
		
							parent
							
								
									9f3c1c6ec7
								
							
						
					
					
						commit
						f8bef14860
					
				|  | @ -32,6 +32,7 @@ import ( | |||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/dmcache" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/dns_query" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/docker" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/docker_log" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/dovecot" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/ecs" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" | ||||
|  |  | |||
|  | @ -0,0 +1,60 @@ | |||
| # Docker Log Input Plugin | ||||
| 
 | ||||
| The docker log plugin uses the Docker Engine API to get logs on running | ||||
| docker containers. | ||||
| 
 | ||||
| The docker plugin uses the [Official Docker Client](https://github.com/moby/moby/tree/master/client) | ||||
| to gather logs from the [Engine API](https://docs.docker.com/engine/api/v1.24/). | ||||
| Note: This plugin works only for containers with the `local` or  `json-file` or `journald` logging driver. | ||||
| ### Configuration: | ||||
| 
 | ||||
| ```toml | ||||
| # Read metrics about docker containers | ||||
| [[inputs.docker_log]] | ||||
|   ## Docker Endpoint | ||||
|   ##   To use TCP, set endpoint = "tcp://[ip]:[port]" | ||||
|   ##   To use environment variables (ie, docker-machine), set endpoint = "ENV" | ||||
|   endpoint = "unix:///var/run/docker.sock" | ||||
| 
 | ||||
|   ## Containers to include and exclude. Collect all if empty. Globs accepted. | ||||
|   container_name_include = [] | ||||
|   container_name_exclude = [] | ||||
| 
 | ||||
|   ## Container states to include and exclude. Globs accepted. | ||||
|   ## When empty only containers in the "running" state will be captured. | ||||
|   # container_state_include = [] | ||||
|   # container_state_exclude = [] | ||||
| 
 | ||||
|   ## docker labels to include and exclude as tags.  Globs accepted. | ||||
|   ## Note that an empty array for both will include all labels as tags | ||||
|   docker_label_include = [] | ||||
|   docker_label_exclude = [] | ||||
| 
 | ||||
|   ## Optional TLS Config | ||||
|   # tls_ca = "/etc/telegraf/ca.pem" | ||||
|   # tls_cert = "/etc/telegraf/cert.pem" | ||||
|   # tls_key = "/etc/telegraf/key.pem" | ||||
|   ## Use TLS but skip chain & host verification | ||||
|   # insecure_skip_verify = false | ||||
| ``` | ||||
| 
 | ||||
| #### Environment Configuration | ||||
| 
 | ||||
| When using the `"ENV"` endpoint, the connection is configured using the | ||||
| [cli Docker environment variables](https://godoc.org/github.com/moby/moby/client#NewEnvClient). | ||||
| 
 | ||||
| ### Metrics: | ||||
| 
 | ||||
| - docker_log | ||||
|   - tags: | ||||
|     - container_id | ||||
|     - container_name | ||||
|     - stream | ||||
|   - fields: | ||||
|     - message | ||||
| ### Example Output: | ||||
| 
 | ||||
| ``` | ||||
| docker_log,com.docker.compose.config-hash=e19e13df8fd01ba2d7c1628158fca45cc91afbbe9661b2d30550547eb53a861e,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=influxdb,com.docker.compose.version=1.21.2,containerId=fce475bbfa4c8380ff858d5d767f78622ca6de955b525477624c2b7896a5b8e4,containerName=aicon-influxdb,host=prash-laptop,logType=stderr log=" [httpd] 172.23.0.2 - aicon_admin [13/Apr/2019:08:35:53 +0000] \"POST /query?db=&q=SHOW+SUBSCRIPTIONS HTTP/1.1\" 200 232 \"-\" \"KapacitorInfluxDBClient\" 2661bc9c-5dc7-11e9-82f8-0242ac170007 1360\n" 1555144553541000000 | ||||
| docker_log,com.docker.compose.config-hash=fd91b3b096c7ab346971c681b88fe1357c609dcc6850e4ea5b1287ad28a57e5d,com.docker.compose.container-number=1,com.docker.compose.oneoff=False,com.docker.compose.project=distribution,com.docker.compose.service=kapacitor,com.docker.compose.version=1.21.2,containerId=6514d1cf6d19e7ecfedf894941f0a2ea21b8aac5e6f48e64f19dbc9bb2805a25,containerName=aicon-kapacitor,host=prash-laptop,logType=stderr log=" ts=2019-04-13T08:36:00.019Z lvl=info msg=\"http request\" service=http host=172.23.0.7 username=- start=2019-04-13T08:36:00.013835165Z method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=2a3eb481-5dc7-11e9-825b-000000000000 duration=5.814404ms\n" 1555144560024000000 | ||||
| ``` | ||||
|  | @ -0,0 +1,63 @@ | |||
| package docker_log | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 
 | ||||
| 	"github.com/docker/docker/api/types" | ||||
| 	docker "github.com/docker/docker/client" | ||||
| ) | ||||
| 
 | ||||
| /*This file is inherited from telegraf docker input plugin*/ | ||||
| var ( | ||||
| 	version        = "1.24" | ||||
| 	defaultHeaders = map[string]string{"User-Agent": "engine-api-cli-1.0"} | ||||
| ) | ||||
| 
 | ||||
| type Client interface { | ||||
| 	ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) | ||||
| 	ContainerLogs(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) | ||||
| 	ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) | ||||
| } | ||||
| 
 | ||||
| func NewEnvClient() (Client, error) { | ||||
| 	client, err := docker.NewClientWithOpts(docker.FromEnv) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &SocketClient{client}, nil | ||||
| } | ||||
| 
 | ||||
| func NewClient(host string, tlsConfig *tls.Config) (Client, error) { | ||||
| 	transport := &http.Transport{ | ||||
| 		TLSClientConfig: tlsConfig, | ||||
| 	} | ||||
| 	httpClient := &http.Client{Transport: transport} | ||||
| 	client, err := docker.NewClientWithOpts( | ||||
| 		docker.WithHTTPHeaders(defaultHeaders), | ||||
| 		docker.WithHTTPClient(httpClient), | ||||
| 		docker.WithVersion(version), | ||||
| 		docker.WithHost(host)) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &SocketClient{client}, nil | ||||
| } | ||||
| 
 | ||||
| type SocketClient struct { | ||||
| 	client *docker.Client | ||||
| } | ||||
| 
 | ||||
| func (c *SocketClient) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { | ||||
| 	return c.client.ContainerList(ctx, options) | ||||
| } | ||||
| 
 | ||||
| func (c *SocketClient) ContainerLogs(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { | ||||
| 	return c.client.ContainerLogs(ctx, containerID, options) | ||||
| } | ||||
| func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { | ||||
| 	return c.client.ContainerInspect(ctx, containerID) | ||||
| } | ||||
|  | @ -0,0 +1,472 @@ | |||
| package docker_log | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"encoding/binary" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"github.com/docker/docker/api/types" | ||||
| 	"github.com/docker/docker/api/types/filters" | ||||
| 	"github.com/influxdata/telegraf" | ||||
| 	"github.com/influxdata/telegraf/filter" | ||||
| 	"github.com/influxdata/telegraf/internal" | ||||
| 	tlsint "github.com/influxdata/telegraf/internal/tls" | ||||
| 	"github.com/influxdata/telegraf/plugins/inputs" | ||||
| 	"io" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type StdType byte | ||||
| 
 | ||||
| const ( | ||||
| 	Stdin StdType = iota | ||||
| 	Stdout | ||||
| 	Stderr | ||||
| 	Systemerr | ||||
| 
 | ||||
| 	stdWriterPrefixLen = 8 | ||||
| 	stdWriterFdIndex   = 0 | ||||
| 	stdWriterSizeIndex = 4 | ||||
| 
 | ||||
| 	startingBufLen = 32*1024 + stdWriterPrefixLen + 1 | ||||
| 
 | ||||
| 	ERR_PREFIX      = "E! [inputs.docker_log]" | ||||
| 	defaultEndpoint = "unix:///var/run/docker.sock" | ||||
| 	logBytesMax     = 1000 | ||||
| ) | ||||
| 
 | ||||
| type DockerLogs struct { | ||||
| 	Endpoint string | ||||
| 
 | ||||
| 	Timeout internal.Duration | ||||
| 
 | ||||
| 	LabelInclude []string `toml:"docker_label_include"` | ||||
| 	LabelExclude []string `toml:"docker_label_exclude"` | ||||
| 
 | ||||
| 	ContainerInclude []string `toml:"container_name_include"` | ||||
| 	ContainerExclude []string `toml:"container_name_exclude"` | ||||
| 
 | ||||
| 	ContainerStateInclude []string `toml:"container_state_include"` | ||||
| 	ContainerStateExclude []string `toml:"container_state_exclude"` | ||||
| 
 | ||||
| 	tlsint.ClientConfig | ||||
| 
 | ||||
| 	newEnvClient func() (Client, error) | ||||
| 	newClient    func(string, *tls.Config) (Client, error) | ||||
| 
 | ||||
| 	client          Client | ||||
| 	filtersCreated  bool | ||||
| 	labelFilter     filter.Filter | ||||
| 	containerFilter filter.Filter | ||||
| 	stateFilter     filter.Filter | ||||
| 	opts            types.ContainerListOptions | ||||
| 	wg              sync.WaitGroup | ||||
| 	mu              sync.Mutex | ||||
| 	containerList   map[string]io.ReadCloser | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} | ||||
| ) | ||||
| 
 | ||||
| var sampleConfig = ` | ||||
|   ## Docker Endpoint | ||||
|   ## To use TCP, set endpoint = "tcp://[ip]:[port]" | ||||
|   ## To use environment variables (ie, docker-machine), set endpoint = "ENV" | ||||
|   endpoint = "unix:///var/run/docker.sock" | ||||
|   ## Containers to include and exclude. Globs accepted. | ||||
|   ## Note that an empty array for both will include all containers | ||||
|   container_name_include = [] | ||||
|   container_name_exclude = [] | ||||
|   ## Container states to include and exclude. Globs accepted. | ||||
|   ## When empty only containers in the "running" state will be captured. | ||||
|   # container_state_include = [] | ||||
|   # container_state_exclude = [] | ||||
| 
 | ||||
|   ## docker labels to include and exclude as tags.  Globs accepted. | ||||
|   ## Note that an empty array for both will include all labels as tags | ||||
|   docker_label_include = [] | ||||
|   docker_label_exclude = [] | ||||
| 
 | ||||
|   ## Optional TLS Config | ||||
|   # tls_ca = "/etc/telegraf/ca.pem" | ||||
|   # tls_cert = "/etc/telegraf/cert.pem" | ||||
|   # tls_key = "/etc/telegraf/key.pem" | ||||
|   ## Use TLS but skip chain & host verification | ||||
|   # insecure_skip_verify = false | ||||
| ` | ||||
| 
 | ||||
| func (d *DockerLogs) Description() string { | ||||
| 	return "Plugin to get docker logs" | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) SampleConfig() string { | ||||
| 	return sampleConfig | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) Gather(acc telegraf.Accumulator) error { | ||||
| 	/*Check to see if any new containers have been created since last time*/ | ||||
| 	return d.containerListUpdate(acc) | ||||
| } | ||||
| 
 | ||||
| /*Following few functions have been inherited from telegraf docker input plugin*/ | ||||
| func (d *DockerLogs) createContainerFilters() error { | ||||
| 	filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	d.containerFilter = filter | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) createLabelFilters() error { | ||||
| 	filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	d.labelFilter = filter | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) createContainerStateFilters() error { | ||||
| 	if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 { | ||||
| 		d.ContainerStateInclude = []string{"running"} | ||||
| 	} | ||||
| 	filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	d.stateFilter = filter | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) addToContainerList(containerId string, logReader io.ReadCloser) error { | ||||
| 	d.mu.Lock() | ||||
| 	defer d.mu.Unlock() | ||||
| 	d.containerList[containerId] = logReader | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) removeFromContainerList(containerId string) error { | ||||
| 	d.mu.Lock() | ||||
| 	defer d.mu.Unlock() | ||||
| 	delete(d.containerList, containerId) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) containerInContainerList(containerId string) bool { | ||||
| 	if _, ok := d.containerList[containerId]; ok { | ||||
| 		return true | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) stopAllReaders() error { | ||||
| 	for _, container := range d.containerList { | ||||
| 		container.Close() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) containerListUpdate(acc telegraf.Accumulator) error { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration) | ||||
| 	defer cancel() | ||||
| 	if d.client == nil { | ||||
| 		return errors.New(fmt.Sprintf("%s : Dock client is null", ERR_PREFIX)) | ||||
| 	} | ||||
| 	containers, err := d.client.ContainerList(ctx, d.opts) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	for _, container := range containers { | ||||
| 		if d.containerInContainerList(container.ID) { | ||||
| 			continue | ||||
| 		} | ||||
| 		d.wg.Add(1) | ||||
| 		/*Start a new goroutine for every new container that has logs to collect*/ | ||||
| 		go func(c types.Container) { | ||||
| 			defer d.wg.Done() | ||||
| 			logOptions := types.ContainerLogsOptions{ | ||||
| 				ShowStdout: true, | ||||
| 				ShowStderr: true, | ||||
| 				Timestamps: false, | ||||
| 				Details:    true, | ||||
| 				Follow:     true, | ||||
| 				Tail:       "0", | ||||
| 			} | ||||
| 			logReader, err := d.client.ContainerLogs(context.Background(), c.ID, logOptions) | ||||
| 			if err != nil { | ||||
| 				acc.AddError(err) | ||||
| 				return | ||||
| 			} | ||||
| 			d.addToContainerList(c.ID, logReader) | ||||
| 			err = d.tailContainerLogs(c, logReader, acc) | ||||
| 			if err != nil { | ||||
| 				acc.AddError(err) | ||||
| 			} | ||||
| 			d.removeFromContainerList(c.ID) | ||||
| 			return | ||||
| 		}(container) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) tailContainerLogs( | ||||
| 	container types.Container, logReader io.ReadCloser, | ||||
| 	acc telegraf.Accumulator, | ||||
| ) error { | ||||
| 	c, err := d.client.ContainerInspect(context.Background(), container.ID) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	/* Parse container name */ | ||||
| 	var cname string | ||||
| 	for _, name := range container.Names { | ||||
| 		trimmedName := strings.TrimPrefix(name, "/") | ||||
| 		match := d.containerFilter.Match(trimmedName) | ||||
| 		if match { | ||||
| 			cname = trimmedName | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if cname == "" { | ||||
| 		return errors.New(fmt.Sprintf("%s : container name is null", ERR_PREFIX)) | ||||
| 	} | ||||
| 	imageName, imageVersion := parseImage(container.Image) | ||||
| 	tags := map[string]string{ | ||||
| 		"container_name":    cname, | ||||
| 		"container_image":   imageName, | ||||
| 		"container_version": imageVersion, | ||||
| 	} | ||||
| 	fields := map[string]interface{}{} | ||||
| 	fields["container_id"] = container.ID | ||||
| 	// Add labels to tags
 | ||||
| 	for k, label := range container.Labels { | ||||
| 		if d.labelFilter.Match(k) { | ||||
| 			tags[k] = label | ||||
| 		} | ||||
| 	} | ||||
| 	if c.Config.Tty { | ||||
| 		err = pushTtyLogs(acc, tags, fields, logReader) | ||||
| 	} else { | ||||
| 		_, err = pushLogs(acc, tags, fields, logReader) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| func pushTtyLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (err error) { | ||||
| 	tags["logType"] = "unknown" //in tty mode we wont be able to differentiate b/w stdout and stderr hence unknown
 | ||||
| 	data := make([]byte, logBytesMax) | ||||
| 	for { | ||||
| 		num, err := src.Read(data) | ||||
| 		if num > 0 { | ||||
| 			fields["message"] = data[1:num] | ||||
| 			acc.AddFields("docker_log", fields, tags) | ||||
| 		} | ||||
| 		if err == io.EOF { | ||||
| 			fields["message"] = data[1:num] | ||||
| 			acc.AddFields("docker_log", fields, tags) | ||||
| 			return nil | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /* Inspired from https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go */ | ||||
| func pushLogs(acc telegraf.Accumulator, tags map[string]string, fields map[string]interface{}, src io.Reader) (written int64, err error) { | ||||
| 	var ( | ||||
| 		buf       = make([]byte, startingBufLen) | ||||
| 		bufLen    = len(buf) | ||||
| 		nr        int | ||||
| 		er        error | ||||
| 		frameSize int | ||||
| 	) | ||||
| 	for { | ||||
| 		// Make sure we have at least a full header
 | ||||
| 		for nr < stdWriterPrefixLen { | ||||
| 			var nr2 int | ||||
| 			nr2, er = src.Read(buf[nr:]) | ||||
| 			nr += nr2 | ||||
| 			if er == io.EOF { | ||||
| 				if nr < stdWriterPrefixLen { | ||||
| 					return written, nil | ||||
| 				} | ||||
| 				break | ||||
| 			} | ||||
| 			if er != nil { | ||||
| 				return 0, er | ||||
| 			} | ||||
| 		} | ||||
| 		stream := StdType(buf[stdWriterFdIndex]) | ||||
| 		// Check the first byte to know where to write
 | ||||
| 		var logType string | ||||
| 		switch stream { | ||||
| 		case Stdin: | ||||
| 			logType = "stdin" | ||||
| 			break | ||||
| 		case Stdout: | ||||
| 			logType = "stdout" | ||||
| 			break | ||||
| 		case Stderr: | ||||
| 			logType = "stderr" | ||||
| 			break | ||||
| 		case Systemerr: | ||||
| 			fallthrough | ||||
| 		default: | ||||
| 			return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) | ||||
| 		} | ||||
| 		// Retrieve the size of the frame
 | ||||
| 		frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) | ||||
| 
 | ||||
| 		// Check if the buffer is big enough to read the frame.
 | ||||
| 		// Extend it if necessary.
 | ||||
| 		if frameSize+stdWriterPrefixLen > bufLen { | ||||
| 			buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) | ||||
| 			bufLen = len(buf) | ||||
| 		} | ||||
| 
 | ||||
| 		// While the amount of bytes read is less than the size of the frame + header, we keep reading
 | ||||
| 		for nr < frameSize+stdWriterPrefixLen { | ||||
| 			var nr2 int | ||||
| 			nr2, er = src.Read(buf[nr:]) | ||||
| 			nr += nr2 | ||||
| 			if er == io.EOF { | ||||
| 				if nr < frameSize+stdWriterPrefixLen { | ||||
| 					return written, nil | ||||
| 				} | ||||
| 				break | ||||
| 			} | ||||
| 			if er != nil { | ||||
| 				return 0, er | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		// we might have an error from the source mixed up in our multiplexed
 | ||||
| 		// stream. if we do, return it.
 | ||||
| 		if stream == Systemerr { | ||||
| 			return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) | ||||
| 		} | ||||
| 
 | ||||
| 		tags["stream"] = logType | ||||
| 		fields["message"] = buf[stdWriterPrefixLen+1 : frameSize+stdWriterPrefixLen] | ||||
| 		acc.AddFields("docker_log", fields, tags) | ||||
| 		written += int64(frameSize) | ||||
| 
 | ||||
| 		// Move the rest of the buffer to the beginning
 | ||||
| 		copy(buf, buf[frameSize+stdWriterPrefixLen:]) | ||||
| 		// Move the index
 | ||||
| 		nr -= frameSize + stdWriterPrefixLen | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) Start(acc telegraf.Accumulator) error { | ||||
| 	var c Client | ||||
| 	var err error | ||||
| 	if d.Endpoint == "ENV" { | ||||
| 		c, err = d.newEnvClient() | ||||
| 	} else { | ||||
| 		tlsConfig, err := d.ClientConfig.TLSConfig() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		c, err = d.newClient(d.Endpoint, tlsConfig) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	d.client = c | ||||
| 	// Create label filters if not already created
 | ||||
| 	if !d.filtersCreated { | ||||
| 		err := d.createLabelFilters() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		err = d.createContainerFilters() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		err = d.createContainerStateFilters() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		d.filtersCreated = true | ||||
| 	} | ||||
| 	filterArgs := filters.NewArgs() | ||||
| 	for _, state := range containerStates { | ||||
| 		if d.stateFilter.Match(state) { | ||||
| 			filterArgs.Add("status", state) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// All container states were excluded
 | ||||
| 	if filterArgs.Len() == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	d.opts = types.ContainerListOptions{ | ||||
| 		Filters: filterArgs, | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| /* Inspired from https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/docker.go */ | ||||
| func parseImage(image string) (string, string) { | ||||
| 	// Adapts some of the logic from the actual Docker library's image parsing
 | ||||
| 	// routines:
 | ||||
| 	// https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go
 | ||||
| 	domain := "" | ||||
| 	remainder := "" | ||||
| 
 | ||||
| 	i := strings.IndexRune(image, '/') | ||||
| 
 | ||||
| 	if i == -1 || (!strings.ContainsAny(image[:i], ".:") && image[:i] != "localhost") { | ||||
| 		remainder = image | ||||
| 	} else { | ||||
| 		domain, remainder = image[:i], image[i+1:] | ||||
| 	} | ||||
| 
 | ||||
| 	imageName := "" | ||||
| 	imageVersion := "unknown" | ||||
| 
 | ||||
| 	i = strings.LastIndex(remainder, ":") | ||||
| 	if i > -1 { | ||||
| 		imageVersion = remainder[i+1:] | ||||
| 		imageName = remainder[:i] | ||||
| 	} else { | ||||
| 		imageName = remainder | ||||
| 	} | ||||
| 
 | ||||
| 	if domain != "" { | ||||
| 		imageName = domain + "/" + imageName | ||||
| 	} | ||||
| 
 | ||||
| 	return imageName, imageVersion | ||||
| } | ||||
| 
 | ||||
| func (d *DockerLogs) Stop() { | ||||
| 	d.mu.Lock() | ||||
| 	d.stopAllReaders() | ||||
| 	d.mu.Unlock() | ||||
| 	d.wg.Wait() | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	inputs.Add("docker_log", func() telegraf.Input { | ||||
| 		return &DockerLogs{ | ||||
| 			Timeout:        internal.Duration{Duration: time.Second * 5}, | ||||
| 			Endpoint:       defaultEndpoint, | ||||
| 			newEnvClient:   NewEnvClient, | ||||
| 			newClient:      NewClient, | ||||
| 			filtersCreated: false, | ||||
| 			containerList:  make(map[string]io.ReadCloser), | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
		Loading…
	
		Reference in New Issue