Unify time parsing in json/csv parsers (#5382)
This commit is contained in:
parent
7887e15446
commit
10ac030502
|
@ -7,20 +7,22 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"regexp"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
"unicode"
|
"unicode"
|
||||||
|
|
||||||
"fmt"
|
|
||||||
"github.com/alecthomas/units"
|
"github.com/alecthomas/units"
|
||||||
"runtime"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
||||||
|
@ -330,3 +332,53 @@ func CompressWithGzip(data io.Reader) (io.Reader, error) {
|
||||||
|
|
||||||
return pipeReader, err
|
return pipeReader, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseTimestamp parses a timestamp value as a unix epoch of various precision.
|
||||||
|
//
|
||||||
|
// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
|
||||||
|
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
|
||||||
|
// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part.
|
||||||
|
// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part.
|
||||||
|
func ParseTimestamp(timestamp interface{}, format string) (time.Time, error) {
|
||||||
|
timeInt, timeFractional := int64(0), int64(0)
|
||||||
|
timeEpochStr, ok := timestamp.(string)
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
timeEpochFloat, ok := timestamp.(float64)
|
||||||
|
if !ok {
|
||||||
|
return time.Time{}, fmt.Errorf("time: %v could not be converted to string nor float64", timestamp)
|
||||||
|
}
|
||||||
|
intPart, frac := math.Modf(timeEpochFloat)
|
||||||
|
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
|
||||||
|
} else {
|
||||||
|
splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2)
|
||||||
|
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return time.Parse(format, timeEpochStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(splitted) == 2 {
|
||||||
|
if len(splitted[1]) > 9 {
|
||||||
|
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
|
||||||
|
}
|
||||||
|
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds
|
||||||
|
|
||||||
|
timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if strings.EqualFold(format, "unix") {
|
||||||
|
return time.Unix(timeInt, timeFractional).UTC(), nil
|
||||||
|
} else if strings.EqualFold(format, "unix_ms") {
|
||||||
|
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
|
||||||
|
} else if strings.EqualFold(format, "unix_us") {
|
||||||
|
return time.Unix(0, timeInt*1e3).UTC(), nil
|
||||||
|
} else if strings.EqualFold(format, "unix_ns") {
|
||||||
|
return time.Unix(0, timeInt).UTC(), nil
|
||||||
|
} else {
|
||||||
|
return time.Time{}, errors.New("Invalid unix format")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -239,22 +240,8 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface
|
||||||
case "":
|
case "":
|
||||||
err = fmt.Errorf("timestamp format must be specified")
|
err = fmt.Errorf("timestamp format must be specified")
|
||||||
return
|
return
|
||||||
case "unix":
|
|
||||||
var unixTime int64
|
|
||||||
unixTime, err = strconv.ParseInt(tStr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
metricTime = time.Unix(unixTime, 0)
|
|
||||||
case "unix_ms":
|
|
||||||
var unixTime int64
|
|
||||||
unixTime, err = strconv.ParseInt(tStr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
metricTime = time.Unix(unixTime/1000, (unixTime%1000)*1e6)
|
|
||||||
default:
|
default:
|
||||||
metricTime, err = time.Parse(timestampFormat, tStr)
|
metricTime, err = internal.ParseTimestamp(tStr, timestampFormat)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,16 +5,15 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/metric"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -50,55 +49,6 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
|
||||||
return metrics, nil
|
return metrics, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
|
|
||||||
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
|
|
||||||
// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part.
|
|
||||||
// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part.
|
|
||||||
func parseUnixTimestamp(jsonValue interface{}, format string) (time.Time, error) {
|
|
||||||
timeInt, timeFractional := int64(0), int64(0)
|
|
||||||
timeEpochStr, ok := jsonValue.(string)
|
|
||||||
var err error
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
timeEpochFloat, ok := jsonValue.(float64)
|
|
||||||
if !ok {
|
|
||||||
err := fmt.Errorf("time: %v could not be converted to string nor float64", jsonValue)
|
|
||||||
return time.Time{}, err
|
|
||||||
}
|
|
||||||
intPart, frac := math.Modf(timeEpochFloat)
|
|
||||||
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
|
|
||||||
} else {
|
|
||||||
splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2)
|
|
||||||
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return time.Time{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(splitted) == 2 {
|
|
||||||
if len(splitted[1]) > 9 {
|
|
||||||
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
|
|
||||||
}
|
|
||||||
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds
|
|
||||||
|
|
||||||
timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return time.Time{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if strings.EqualFold(format, "unix") {
|
|
||||||
return time.Unix(timeInt, timeFractional).UTC(), nil
|
|
||||||
} else if strings.EqualFold(format, "unix_ms") {
|
|
||||||
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
|
|
||||||
} else if strings.EqualFold(format, "unix_us") {
|
|
||||||
return time.Unix(0, timeInt*1e3).UTC(), nil
|
|
||||||
} else if strings.EqualFold(format, "unix_ns") {
|
|
||||||
return time.Unix(0, timeInt).UTC(), nil
|
|
||||||
} else {
|
|
||||||
return time.Time{}, errors.New("Invalid unix format")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {
|
func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {
|
||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
for k, v := range p.DefaultTags {
|
for k, v := range p.DefaultTags {
|
||||||
|
@ -132,25 +82,10 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.EqualFold(p.JSONTimeFormat, "unix") ||
|
nTime, err = internal.ParseTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat)
|
||||||
strings.EqualFold(p.JSONTimeFormat, "unix_ms") ||
|
|
||||||
strings.EqualFold(p.JSONTimeFormat, "unix_us") ||
|
|
||||||
strings.EqualFold(p.JSONTimeFormat, "unix_ns") {
|
|
||||||
nTime, err = parseUnixTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
timeStr, ok := f.Fields[p.JSONTimeKey].(string)
|
|
||||||
if !ok {
|
|
||||||
err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey])
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
nTime, err = time.Parse(p.JSONTimeFormat, timeStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(f.Fields, p.JSONTimeKey)
|
delete(f.Fields, p.JSONTimeKey)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue