Use docker log timestamp as metric time (#7434)

This commit is contained in:
i-prudnikov 2020-05-06 21:20:44 +03:00 committed by GitHub
parent 022ff63d29
commit 0924ad2668
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 20 deletions

View File

@ -2,8 +2,10 @@ package docker_log
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"strings"
"sync"
@ -287,7 +289,7 @@ func (d *DockerLogs) tailContainerLogs(
logOptions := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: false,
Timestamps: true,
Details: false,
Follow: true,
Tail: tail,
@ -311,6 +313,30 @@ func (d *DockerLogs) tailContainerLogs(
}
}
func parseLine(line []byte) (time.Time, string, error) {
parts := bytes.SplitN(line, []byte(" "), 2)
switch len(parts) {
case 1:
parts = append(parts, []byte(""))
}
tsString := string(parts[0])
// Keep any leading space, but remove whitespace from end of line.
// This preserves space in, for example, stacktraces, while removing
// annoying end of line characters and is similar to how other logging
// plugins such as syslog behave.
message := bytes.TrimRightFunc(parts[1], unicode.IsSpace)
ts, err := time.Parse(time.RFC3339Nano, tsString)
if err != nil {
return time.Time{}, "", fmt.Errorf("error parsing timestamp %q: %v", tsString, err)
}
return ts, string(message), nil
}
func tailStream(
acc telegraf.Accumulator,
baseTags map[string]string,
@ -328,22 +354,19 @@ func tailStream(
r := bufio.NewReaderSize(reader, 64*1024)
var err error
var message string
for {
message, err = r.ReadString('\n')
line, err := r.ReadBytes('\n')
// Keep any leading space, but remove whitespace from end of line.
// This preserves space in, for example, stacktraces, while removing
// annoying end of line characters and is similar to how other logging
// plugins such as syslog behave.
message = strings.TrimRightFunc(message, unicode.IsSpace)
if len(message) != 0 {
acc.AddFields("docker_log", map[string]interface{}{
"container_id": containerID,
"message": message,
}, tags)
if len(line) != 0 {
ts, message, err := parseLine(line)
if err != nil {
acc.AddError(err)
} else {
acc.AddFields("docker_log", map[string]interface{}{
"container_id": containerID,
"message": message,
}, tags, ts)
}
}
if err != nil {

View File

@ -53,6 +53,14 @@ func (r *Response) Close() error {
return nil
}
func MustParse(layout, value string) time.Time {
tm, err := time.Parse(layout, value)
if err != nil {
panic(err)
}
return tm
}
func Test(t *testing.T) {
tests := []struct {
name string
@ -87,7 +95,7 @@ func Test(t *testing.T) {
}, nil
},
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return &Response{Reader: bytes.NewBuffer([]byte("hello\n"))}, nil
return &Response{Reader: bytes.NewBuffer([]byte("2020-04-28T18:43:16.432691200Z hello\n"))}, nil
},
},
expected: []telegraf.Metric{
@ -104,7 +112,7 @@ func Test(t *testing.T) {
"container_id": "deadbeef",
"message": "hello",
},
time.Now(),
MustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"),
),
},
},
@ -130,7 +138,7 @@ func Test(t *testing.T) {
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
var buf bytes.Buffer
w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout)
w.Write([]byte("hello from stdout"))
w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout"))
return &Response{Reader: &buf}, nil
},
},
@ -148,7 +156,7 @@ func Test(t *testing.T) {
"container_id": "deadbeef",
"message": "hello from stdout",
},
time.Now(),
MustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"),
),
},
},
@ -172,7 +180,9 @@ func Test(t *testing.T) {
acc.Wait(len(tt.expected))
plugin.Stop()
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
require.Nil(t, acc.Errors) // no errors during gathering
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics())
})
}
}