diff --git a/plugins/inputs/docker_log/docker_log.go b/plugins/inputs/docker_log/docker_log.go index 7cb2d94be..cf5960b81 100644 --- a/plugins/inputs/docker_log/docker_log.go +++ b/plugins/inputs/docker_log/docker_log.go @@ -2,8 +2,10 @@ package docker_log import ( "bufio" + "bytes" "context" "crypto/tls" + "fmt" "io" "strings" "sync" @@ -287,7 +289,7 @@ func (d *DockerLogs) tailContainerLogs( logOptions := types.ContainerLogsOptions{ ShowStdout: true, ShowStderr: true, - Timestamps: false, + Timestamps: true, Details: false, Follow: true, Tail: tail, @@ -311,6 +313,30 @@ func (d *DockerLogs) tailContainerLogs( } } +func parseLine(line []byte) (time.Time, string, error) { + parts := bytes.SplitN(line, []byte(" "), 2) + + switch len(parts) { + case 1: + parts = append(parts, []byte("")) + } + + tsString := string(parts[0]) + + // Keep any leading space, but remove whitespace from end of line. + // This preserves space in, for example, stacktraces, while removing + // annoying end of line characters and is similar to how other logging + // plugins such as syslog behave. + message := bytes.TrimRightFunc(parts[1], unicode.IsSpace) + + ts, err := time.Parse(time.RFC3339Nano, tsString) + if err != nil { + return time.Time{}, "", fmt.Errorf("error parsing timestamp %q: %v", tsString, err) + } + + return ts, string(message), nil +} + func tailStream( acc telegraf.Accumulator, baseTags map[string]string, @@ -328,22 +354,19 @@ func tailStream( r := bufio.NewReaderSize(reader, 64*1024) - var err error - var message string for { - message, err = r.ReadString('\n') + line, err := r.ReadBytes('\n') - // Keep any leading space, but remove whitespace from end of line. - // This preserves space in, for example, stacktraces, while removing - // annoying end of line characters and is similar to how other logging - // plugins such as syslog behave. - message = strings.TrimRightFunc(message, unicode.IsSpace) - - if len(message) != 0 { - acc.AddFields("docker_log", map[string]interface{}{ - "container_id": containerID, - "message": message, - }, tags) + if len(line) != 0 { + ts, message, err := parseLine(line) + if err != nil { + acc.AddError(err) + } else { + acc.AddFields("docker_log", map[string]interface{}{ + "container_id": containerID, + "message": message, + }, tags, ts) + } } if err != nil { diff --git a/plugins/inputs/docker_log/docker_log_test.go b/plugins/inputs/docker_log/docker_log_test.go index 11cf0befd..c8903c9d8 100644 --- a/plugins/inputs/docker_log/docker_log_test.go +++ b/plugins/inputs/docker_log/docker_log_test.go @@ -53,6 +53,14 @@ func (r *Response) Close() error { return nil } +func MustParse(layout, value string) time.Time { + tm, err := time.Parse(layout, value) + if err != nil { + panic(err) + } + return tm +} + func Test(t *testing.T) { tests := []struct { name string @@ -87,7 +95,7 @@ func Test(t *testing.T) { }, nil }, ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { - return &Response{Reader: bytes.NewBuffer([]byte("hello\n"))}, nil + return &Response{Reader: bytes.NewBuffer([]byte("2020-04-28T18:43:16.432691200Z hello\n"))}, nil }, }, expected: []telegraf.Metric{ @@ -104,7 +112,7 @@ func Test(t *testing.T) { "container_id": "deadbeef", "message": "hello", }, - time.Now(), + MustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"), ), }, }, @@ -130,7 +138,7 @@ func Test(t *testing.T) { ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { var buf bytes.Buffer w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout) - w.Write([]byte("hello from stdout")) + w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout")) return &Response{Reader: &buf}, nil }, }, @@ -148,7 +156,7 @@ func Test(t *testing.T) { "container_id": "deadbeef", "message": "hello from stdout", }, - time.Now(), + MustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"), ), }, }, @@ -172,7 +180,9 @@ func Test(t *testing.T) { acc.Wait(len(tt.expected)) plugin.Stop() - testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + require.Nil(t, acc.Errors) // no errors during gathering + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) }) } }