Add new line protocol parser and serializer, influxdb output (#3924)
This commit is contained in:
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/templating"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
@@ -195,7 +195,7 @@ func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
|
||||
return jsonOut, nil
|
||||
}
|
||||
|
||||
func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, time time.Time) []telegraf.Metric {
|
||||
func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
|
||||
|
||||
switch dwmsTyped := dwms.(type) {
|
||||
case map[string]interface{}:
|
||||
@@ -240,10 +240,15 @@ func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
|
||||
metricsBuffer.WriteString(strings.Join(fields, ","))
|
||||
metricsBuffer.WriteString("\n")
|
||||
}
|
||||
newMetrics, err := metric.ParseWithDefaultTime(metricsBuffer.Bytes(), time)
|
||||
|
||||
handler := influx.NewMetricHandler()
|
||||
handler.SetTimeFunc(func() time.Time { return tm })
|
||||
parser := influx.NewParser(handler)
|
||||
newMetrics, err := parser.Parse(metricsBuffer.Bytes())
|
||||
if err != nil {
|
||||
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
|
||||
}
|
||||
|
||||
return append(metrics, newMetrics...)
|
||||
default:
|
||||
return metrics
|
||||
|
||||
Reference in New Issue
Block a user