Fix detection of layout timestamps (#6390)
This commit is contained in:
parent
7167a23c52
commit
6dc61be6eb
|
@ -13,7 +13,6 @@ import (
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"regexp"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -302,62 +301,115 @@ func CompressWithGzip(data io.Reader) (io.Reader, error) {
|
||||||
return pipeReader, err
|
return pipeReader, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseTimestamp with no location provided parses a timestamp value as UTC
|
// ParseTimestamp parses a Time according to the standard Telegraf options.
|
||||||
func ParseTimestamp(timestamp interface{}, format string) (time.Time, error) {
|
// These are generally displayed in the toml similar to:
|
||||||
return ParseTimestampWithLocation(timestamp, format, "UTC")
|
// json_time_key= "timestamp"
|
||||||
|
// json_time_format = "2006-01-02T15:04:05Z07:00"
|
||||||
|
// json_timezone = "America/Los_Angeles"
|
||||||
|
//
|
||||||
|
// The format can be one of "unix", "unix_ms", "unix_us", "unix_ns", or a Go
|
||||||
|
// time layout suitable for time.Parse.
|
||||||
|
//
|
||||||
|
// When using the "unix" format, a optional fractional component is allowed.
|
||||||
|
// Specific unix time precisions cannot have a fractional component.
|
||||||
|
//
|
||||||
|
// Unix times may be an int64, float64, or string. When using a Go format
|
||||||
|
// string the timestamp must be a string.
|
||||||
|
//
|
||||||
|
// The location is a location string suitable for time.LoadLocation. Unix
|
||||||
|
// times do not use the location string, a unix time is always return in the
|
||||||
|
// UTC location.
|
||||||
|
func ParseTimestamp(format string, timestamp interface{}, location string) (time.Time, error) {
|
||||||
|
switch format {
|
||||||
|
case "unix", "unix_ms", "unix_us", "unix_ns":
|
||||||
|
return parseUnix(format, timestamp)
|
||||||
|
default:
|
||||||
|
if location == "" {
|
||||||
|
location = "UTC"
|
||||||
|
}
|
||||||
|
return parseTime(format, timestamp, location)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseTimestamp parses a timestamp value as a unix epoch of various precision.
|
func parseUnix(format string, timestamp interface{}) (time.Time, error) {
|
||||||
//
|
integer, fractional, err := parseComponents(timestamp)
|
||||||
// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
|
if err != nil {
|
||||||
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
|
return time.Unix(0, 0), err
|
||||||
// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part.
|
}
|
||||||
// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part.
|
|
||||||
func ParseTimestampWithLocation(timestamp interface{}, format string, location string) (time.Time, error) {
|
|
||||||
timeInt, timeFractional := int64(0), int64(0)
|
|
||||||
|
|
||||||
|
switch strings.ToLower(format) {
|
||||||
|
case "unix":
|
||||||
|
return time.Unix(integer, fractional).UTC(), nil
|
||||||
|
case "unix_ms":
|
||||||
|
return time.Unix(0, integer*1e6).UTC(), nil
|
||||||
|
case "unix_us":
|
||||||
|
return time.Unix(0, integer*1e3).UTC(), nil
|
||||||
|
case "unix_ns":
|
||||||
|
return time.Unix(0, integer).UTC(), nil
|
||||||
|
default:
|
||||||
|
return time.Unix(0, 0), errors.New("unsupported type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the integers before and after an optional decimal point. Both '.'
|
||||||
|
// and ',' are supported for the decimal point. The timestamp can be an int64,
|
||||||
|
// float64, or string.
|
||||||
|
// ex: "42.5" -> (42, 5, nil)
|
||||||
|
func parseComponents(timestamp interface{}) (int64, int64, error) {
|
||||||
switch ts := timestamp.(type) {
|
switch ts := timestamp.(type) {
|
||||||
case string:
|
case string:
|
||||||
var err error
|
parts := strings.SplitN(ts, ".", 2)
|
||||||
splitted := regexp.MustCompile("[.,]").Split(ts, 2)
|
if len(parts) == 2 {
|
||||||
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
|
return parseUnixTimeComponents(parts[0], parts[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = strings.SplitN(ts, ",", 2)
|
||||||
|
if len(parts) == 2 {
|
||||||
|
return parseUnixTimeComponents(parts[0], parts[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
integer, err := strconv.ParseInt(ts, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
return integer, 0, nil
|
||||||
|
case int64:
|
||||||
|
return ts, 0, nil
|
||||||
|
case float64:
|
||||||
|
integer, fractional := math.Modf(ts)
|
||||||
|
return int64(integer), int64(fractional * 1e9), nil
|
||||||
|
default:
|
||||||
|
return 0, 0, errors.New("unsupported type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseUnixTimeComponents(first, second string) (int64, int64, error) {
|
||||||
|
integer, err := strconv.ParseInt(first, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to nanoseconds, dropping any greater precision.
|
||||||
|
buf := []byte("000000000")
|
||||||
|
copy(buf, second)
|
||||||
|
|
||||||
|
fractional, err := strconv.ParseInt(string(buf), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
return integer, fractional, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseTime parses a string timestamp according to the format string.
|
||||||
|
func parseTime(format string, timestamp interface{}, location string) (time.Time, error) {
|
||||||
|
switch ts := timestamp.(type) {
|
||||||
|
case string:
|
||||||
loc, err := time.LoadLocation(location)
|
loc, err := time.LoadLocation(location)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, fmt.Errorf("location: %s could not be loaded as a location", location)
|
return time.Unix(0, 0), err
|
||||||
}
|
}
|
||||||
return time.ParseInLocation(format, ts, loc)
|
return time.ParseInLocation(format, ts, loc)
|
||||||
}
|
|
||||||
|
|
||||||
if len(splitted) == 2 {
|
|
||||||
if len(splitted[1]) > 9 {
|
|
||||||
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
|
|
||||||
}
|
|
||||||
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds
|
|
||||||
|
|
||||||
timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return time.Time{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case int64:
|
|
||||||
timeInt = ts
|
|
||||||
case float64:
|
|
||||||
intPart, frac := math.Modf(ts)
|
|
||||||
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
|
|
||||||
default:
|
default:
|
||||||
return time.Time{}, fmt.Errorf("time: %v could not be converted to string nor float64", timestamp)
|
return time.Unix(0, 0), errors.New("unsupported type")
|
||||||
}
|
|
||||||
|
|
||||||
if strings.EqualFold(format, "unix") {
|
|
||||||
return time.Unix(timeInt, timeFractional).UTC(), nil
|
|
||||||
} else if strings.EqualFold(format, "unix_ms") {
|
|
||||||
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
|
|
||||||
} else if strings.EqualFold(format, "unix_us") {
|
|
||||||
return time.Unix(0, timeInt*1e3).UTC(), nil
|
|
||||||
} else if strings.EqualFold(format, "unix_ns") {
|
|
||||||
return time.Unix(0, timeInt).UTC(), nil
|
|
||||||
} else {
|
|
||||||
return time.Time{}, errors.New("Invalid unix format")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,32 +331,118 @@ func TestAlignTime(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseTimestamp(t *testing.T) {
|
func TestParseTimestamp(t *testing.T) {
|
||||||
time, err := ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000")
|
rfc3339 := func(value string) time.Time {
|
||||||
assert.Nil(t, err)
|
tm, err := time.Parse(time.RFC3339Nano, value)
|
||||||
assert.EqualValues(t, int64(1550699434029665000), time.UnixNano())
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
time, err = ParseTimestamp("2019-02-20 21:50:34.029665-04:00", "2006-01-02 15:04:05.000000-07:00")
|
}
|
||||||
assert.Nil(t, err)
|
return tm
|
||||||
assert.EqualValues(t, int64(1550713834029665000), time.UnixNano())
|
|
||||||
|
|
||||||
time, err = ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000-06:00")
|
|
||||||
assert.NotNil(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseTimestampWithLocation(t *testing.T) {
|
tests := []struct {
|
||||||
time, err := ParseTimestampWithLocation("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000", "UTC")
|
name string
|
||||||
assert.Nil(t, err)
|
format string
|
||||||
assert.EqualValues(t, int64(1550699434029665000), time.UnixNano())
|
timestamp interface{}
|
||||||
|
location string
|
||||||
time, err = ParseTimestampWithLocation("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000", "America/New_York")
|
expected time.Time
|
||||||
assert.Nil(t, err)
|
err bool
|
||||||
assert.EqualValues(t, int64(1550717434029665000), time.UnixNano())
|
}{
|
||||||
|
{
|
||||||
//Provided location is ignored if an offset is successfully parsed
|
name: "parse layout string in utc",
|
||||||
time, err = ParseTimestampWithLocation("2019-02-20 21:50:34.029665-07:00", "2006-01-02 15:04:05.000000-07:00", "America/New_York")
|
format: "2006-01-02 15:04:05",
|
||||||
assert.Nil(t, err)
|
timestamp: "2019-02-20 21:50:34",
|
||||||
assert.EqualValues(t, int64(1550724634029665000), time.UnixNano())
|
location: "UTC",
|
||||||
|
expected: rfc3339("2019-02-20T21:50:34Z"),
|
||||||
time, err = ParseTimestampWithLocation("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000", "InvalidTimeZone")
|
},
|
||||||
assert.NotNil(t, err)
|
{
|
||||||
|
name: "parse layout string with invalid timezone",
|
||||||
|
format: "2006-01-02 15:04:05",
|
||||||
|
timestamp: "2019-02-20 21:50:34",
|
||||||
|
location: "InvalidTimeZone",
|
||||||
|
err: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "layout regression 6386",
|
||||||
|
format: "02.01.2006 15:04:05",
|
||||||
|
timestamp: "09.07.2019 00:11:00",
|
||||||
|
expected: rfc3339("2019-07-09T00:11:00Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "default location is utc",
|
||||||
|
format: "2006-01-02 15:04:05",
|
||||||
|
timestamp: "2019-02-20 21:50:34",
|
||||||
|
expected: rfc3339("2019-02-20T21:50:34Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix seconds without fractional",
|
||||||
|
format: "unix",
|
||||||
|
timestamp: "1568338208",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix seconds with fractional",
|
||||||
|
format: "unix",
|
||||||
|
timestamp: "1568338208.500",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix seconds with fractional and comma decimal point",
|
||||||
|
format: "unix",
|
||||||
|
timestamp: "1568338208,500",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix seconds extra precision",
|
||||||
|
format: "unix",
|
||||||
|
timestamp: "1568338208.00000050042",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.000000500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix seconds integer",
|
||||||
|
format: "unix",
|
||||||
|
timestamp: int64(1568338208),
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix seconds float",
|
||||||
|
format: "unix",
|
||||||
|
timestamp: float64(1568338208.500),
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix milliseconds",
|
||||||
|
format: "unix_ms",
|
||||||
|
timestamp: "1568338208500",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix milliseconds with fractional is ignored",
|
||||||
|
format: "unix_ms",
|
||||||
|
timestamp: "1568338208500.42",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix microseconds",
|
||||||
|
format: "unix_us",
|
||||||
|
timestamp: "1568338208000500",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.000500Z"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix nanoseconds",
|
||||||
|
format: "unix_ns",
|
||||||
|
timestamp: "1568338208000000500",
|
||||||
|
expected: rfc3339("2019-09-13T01:30:08.000000500Z"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
tm, err := ParseTimestamp(tt.format, tt.timestamp, tt.location)
|
||||||
|
if tt.err {
|
||||||
|
require.Error(t, err)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tt.expected, tm)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,7 +235,7 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface
|
||||||
case "":
|
case "":
|
||||||
return time.Time{}, fmt.Errorf("timestamp format must be specified")
|
return time.Time{}, fmt.Errorf("timestamp format must be specified")
|
||||||
default:
|
default:
|
||||||
metricTime, err := internal.ParseTimestamp(recordFields[timestampColumn], timestampFormat)
|
metricTime, err := internal.ParseTimestamp(timestampFormat, recordFields[timestampColumn], "UTC")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, err
|
return time.Time{}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,7 @@ func (p *Parser) parseObject(data map[string]interface{}) ([]telegraf.Metric, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nTime, err = internal.ParseTimestampWithLocation(f.Fields[p.timeKey], p.timeFormat, p.timezone)
|
nTime, err = internal.ParseTimestamp(p.timeFormat, f.Fields[p.timeKey], p.timezone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue