diff --git a/internal/internal.go b/internal/internal.go index a0a3ec0ec..368bc8bcf 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -7,20 +7,22 @@ import ( "context" "crypto/rand" "errors" + "fmt" "io" "log" + "math" "math/big" "os" "os/exec" + "regexp" + "runtime" "strconv" "strings" "syscall" "time" "unicode" - "fmt" "github.com/alecthomas/units" - "runtime" ) const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -330,3 +332,53 @@ func CompressWithGzip(data io.Reader) (io.Reader, error) { return pipeReader, err } + +// ParseTimestamp parses a timestamp value as a unix epoch of various precision. +// +// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part. +// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part. +// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part. +// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part. +func ParseTimestamp(timestamp interface{}, format string) (time.Time, error) { + timeInt, timeFractional := int64(0), int64(0) + timeEpochStr, ok := timestamp.(string) + var err error + + if !ok { + timeEpochFloat, ok := timestamp.(float64) + if !ok { + return time.Time{}, fmt.Errorf("time: %v could not be converted to string nor float64", timestamp) + } + intPart, frac := math.Modf(timeEpochFloat) + timeInt, timeFractional = int64(intPart), int64(frac*1e9) + } else { + splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2) + timeInt, err = strconv.ParseInt(splitted[0], 10, 64) + if err != nil { + return time.Parse(format, timeEpochStr) + } + + if len(splitted) == 2 { + if len(splitted[1]) > 9 { + splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision + } + nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds + + timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64) + if err != nil { + return time.Time{}, err + } + } + } + if strings.EqualFold(format, "unix") { + return time.Unix(timeInt, timeFractional).UTC(), nil + } else if strings.EqualFold(format, "unix_ms") { + return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil + } else if strings.EqualFold(format, "unix_us") { + return time.Unix(0, timeInt*1e3).UTC(), nil + } else if strings.EqualFold(format, "unix_ns") { + return time.Unix(0, timeInt).UTC(), nil + } else { + return time.Time{}, errors.New("Invalid unix format") + } +} diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 9401c1dd1..5f4fcc640 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" ) @@ -239,22 +240,8 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface case "": err = fmt.Errorf("timestamp format must be specified") return - case "unix": - var unixTime int64 - unixTime, err = strconv.ParseInt(tStr, 10, 64) - if err != nil { - return - } - metricTime = time.Unix(unixTime, 0) - case "unix_ms": - var unixTime int64 - unixTime, err = strconv.ParseInt(tStr, 10, 64) - if err != nil { - return - } - metricTime = time.Unix(unixTime/1000, (unixTime%1000)*1e6) default: - metricTime, err = time.Parse(timestampFormat, tStr) + metricTime, err = internal.ParseTimestamp(tStr, timestampFormat) if err != nil { return } diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index b3bb9a488..2f939a84f 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -5,16 +5,15 @@ import ( "encoding/json" "fmt" "log" - "math" - "regexp" "strconv" "strings" "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/pkg/errors" "github.com/tidwall/gjson" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" ) var ( @@ -50,55 +49,6 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } -// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part. -// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part. -// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part. -// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part. -func parseUnixTimestamp(jsonValue interface{}, format string) (time.Time, error) { - timeInt, timeFractional := int64(0), int64(0) - timeEpochStr, ok := jsonValue.(string) - var err error - - if !ok { - timeEpochFloat, ok := jsonValue.(float64) - if !ok { - err := fmt.Errorf("time: %v could not be converted to string nor float64", jsonValue) - return time.Time{}, err - } - intPart, frac := math.Modf(timeEpochFloat) - timeInt, timeFractional = int64(intPart), int64(frac*1e9) - } else { - splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2) - timeInt, err = strconv.ParseInt(splitted[0], 10, 64) - if err != nil { - return time.Time{}, err - } - - if len(splitted) == 2 { - if len(splitted[1]) > 9 { - splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision - } - nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds - - timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64) - if err != nil { - return time.Time{}, err - } - } - } - if strings.EqualFold(format, "unix") { - return time.Unix(timeInt, timeFractional).UTC(), nil - } else if strings.EqualFold(format, "unix_ms") { - return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil - } else if strings.EqualFold(format, "unix_us") { - return time.Unix(0, timeInt*1e3).UTC(), nil - } else if strings.EqualFold(format, "unix_ns") { - return time.Unix(0, timeInt).UTC(), nil - } else { - return time.Time{}, errors.New("Invalid unix format") - } -} - func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { tags := make(map[string]string) for k, v := range p.DefaultTags { @@ -132,24 +82,9 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i return nil, err } - if strings.EqualFold(p.JSONTimeFormat, "unix") || - strings.EqualFold(p.JSONTimeFormat, "unix_ms") || - strings.EqualFold(p.JSONTimeFormat, "unix_us") || - strings.EqualFold(p.JSONTimeFormat, "unix_ns") { - nTime, err = parseUnixTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat) - if err != nil { - return nil, err - } - } else { - timeStr, ok := f.Fields[p.JSONTimeKey].(string) - if !ok { - err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey]) - return nil, err - } - nTime, err = time.Parse(p.JSONTimeFormat, timeStr) - if err != nil { - return nil, err - } + nTime, err = internal.ParseTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat) + if err != nil { + return nil, err } delete(f.Fields, p.JSONTimeKey)