Add support for precision in http_listener (#2644)
This commit is contained in:
parent
db7c97be32
commit
147200f675
|
@ -65,6 +65,7 @@ be deprecated eventually.
|
|||
- [#2597](https://github.com/influxdata/telegraf/issues/2597): Add support for Linux sysctl-fs metrics.
|
||||
- [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags
|
||||
- [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
|
||||
- [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
@ -40,10 +41,18 @@ const (
|
|||
)
|
||||
|
||||
func Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
return ParseWithDefaultTime(buf, time.Now())
|
||||
return ParseWithDefaultTimePrecision(buf, time.Now(), "")
|
||||
}
|
||||
|
||||
func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
|
||||
return ParseWithDefaultTimePrecision(buf, t, "")
|
||||
}
|
||||
|
||||
func ParseWithDefaultTimePrecision(
|
||||
buf []byte,
|
||||
t time.Time,
|
||||
precision string,
|
||||
) ([]telegraf.Metric, error) {
|
||||
if len(buf) == 0 {
|
||||
return []telegraf.Metric{}, nil
|
||||
}
|
||||
|
@ -63,7 +72,7 @@ func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
m, err := parseMetric(buf[i:i+j], t)
|
||||
m, err := parseMetric(buf[i:i+j], t, precision)
|
||||
if err != nil {
|
||||
i += j + 1 // increment i past the previous newline
|
||||
errStr += " " + err.Error()
|
||||
|
@ -80,7 +89,10 @@ func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
|
|||
return metrics, nil
|
||||
}
|
||||
|
||||
func parseMetric(buf []byte, defaultTime time.Time) (telegraf.Metric, error) {
|
||||
func parseMetric(buf []byte,
|
||||
defaultTime time.Time,
|
||||
precision string,
|
||||
) (telegraf.Metric, error) {
|
||||
var dTime string
|
||||
// scan the first block which is measurement[,tag1=value1,tag2=value=2...]
|
||||
pos, key, err := scanKey(buf, 0)
|
||||
|
@ -114,9 +126,23 @@ func parseMetric(buf []byte, defaultTime time.Time) (telegraf.Metric, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// apply precision multiplier
|
||||
var nsec int64
|
||||
multiplier := getPrecisionMultiplier(precision)
|
||||
if multiplier > 1 {
|
||||
tsint, err := parseIntBytes(ts, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nsec := multiplier * tsint
|
||||
ts = []byte(strconv.FormatInt(nsec, 10))
|
||||
}
|
||||
|
||||
m := &metric{
|
||||
fields: fields,
|
||||
t: ts,
|
||||
nsec: nsec,
|
||||
}
|
||||
|
||||
// parse out the measurement name
|
||||
|
@ -628,3 +654,21 @@ func makeError(reason string, buf []byte, i int) error {
|
|||
return fmt.Errorf("metric parsing error, reason: [%s], buffer: [%s], index: [%d]",
|
||||
reason, buf, i)
|
||||
}
|
||||
|
||||
// getPrecisionMultiplier will return a multiplier for the precision specified.
|
||||
func getPrecisionMultiplier(precision string) int64 {
|
||||
d := time.Nanosecond
|
||||
switch precision {
|
||||
case "u":
|
||||
d = time.Microsecond
|
||||
case "ms":
|
||||
d = time.Millisecond
|
||||
case "s":
|
||||
d = time.Second
|
||||
case "m":
|
||||
d = time.Minute
|
||||
case "h":
|
||||
d = time.Hour
|
||||
}
|
||||
return int64(d)
|
||||
}
|
||||
|
|
|
@ -364,6 +364,27 @@ func TestParseNegativeTimestamps(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestParsePrecision(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
line string
|
||||
precision string
|
||||
expected int64
|
||||
}{
|
||||
{"test v=42 1491847420", "s", 1491847420000000000},
|
||||
{"test v=42 1491847420123", "ms", 1491847420123000000},
|
||||
{"test v=42 1491847420123456", "u", 1491847420123456000},
|
||||
{"test v=42 1491847420123456789", "ns", 1491847420123456789},
|
||||
|
||||
{"test v=42 1491847420123456789", "1s", 1491847420123456789},
|
||||
{"test v=42 1491847420123456789", "asdf", 1491847420123456789},
|
||||
} {
|
||||
metrics, err := ParseWithDefaultTimePrecision(
|
||||
[]byte(tt.line+"\n"), time.Now(), tt.precision)
|
||||
assert.NoError(t, err, tt)
|
||||
assert.Equal(t, tt.expected, metrics[0].UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseMaxKeyLength(t *testing.T) {
|
||||
key := ""
|
||||
for {
|
||||
|
|
|
@ -2,11 +2,18 @@
|
|||
|
||||
The HTTP listener is a service input plugin that listens for messages sent via HTTP POST.
|
||||
The plugin expects messages in the InfluxDB line-protocol ONLY, other Telegraf input data formats are not supported.
|
||||
The intent of the plugin is to allow Telegraf to serve as a proxy/router for the /write endpoint of the InfluxDB HTTP API.
|
||||
The intent of the plugin is to allow Telegraf to serve as a proxy/router for the `/write` endpoint of the InfluxDB HTTP API.
|
||||
|
||||
The `/write` endpoint supports the `precision` query parameter and can be set to one of `ns`, `u`, `ms`, `s`, `m`, `h`. All other parameters are ignored and defer to the output plugins configuration.
|
||||
|
||||
When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.
|
||||
|
||||
See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx).
|
||||
Example: curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
|
||||
|
||||
**Example:**
|
||||
```
|
||||
curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
|
||||
```
|
||||
|
||||
### Configuration:
|
||||
|
||||
|
|
|
@ -207,10 +207,12 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
now := time.Now()
|
||||
|
||||
precision := req.URL.Query().Get("precision")
|
||||
|
||||
// Handle gzip request bodies
|
||||
body := req.Body
|
||||
var err error
|
||||
if req.Header.Get("Content-Encoding") == "gzip" {
|
||||
var err error
|
||||
body, err = gzip.NewReader(req.Body)
|
||||
defer body.Close()
|
||||
if err != nil {
|
||||
|
@ -263,7 +265,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
|||
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
// finished reading the request body
|
||||
if err := h.parse(buf[:n+bufStart], now); err != nil {
|
||||
if err := h.parse(buf[:n+bufStart], now, precision); err != nil {
|
||||
log.Println("E! " + err.Error())
|
||||
return400 = true
|
||||
}
|
||||
|
@ -288,7 +290,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
|||
bufStart = 0
|
||||
continue
|
||||
}
|
||||
if err := h.parse(buf[:i+1], now); err != nil {
|
||||
if err := h.parse(buf[:i+1], now, precision); err != nil {
|
||||
log.Println("E! " + err.Error())
|
||||
return400 = true
|
||||
}
|
||||
|
@ -301,8 +303,8 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
||||
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
||||
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
|
||||
metrics, err := h.parser.ParseWithDefaultTimePrecision(b, t, precision)
|
||||
|
||||
for _, m := range metrics {
|
||||
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -15,13 +15,13 @@ type InfluxParser struct {
|
|||
DefaultTags map[string]string
|
||||
}
|
||||
|
||||
func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
|
||||
func (p *InfluxParser) ParseWithDefaultTimePrecision(buf []byte, t time.Time, precision string) ([]telegraf.Metric, error) {
|
||||
if !bytes.HasSuffix(buf, []byte("\n")) {
|
||||
buf = append(buf, '\n')
|
||||
}
|
||||
// parse even if the buffer begins with a newline
|
||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
||||
metrics, err := metric.ParseWithDefaultTime(buf, t)
|
||||
metrics, err := metric.ParseWithDefaultTimePrecision(buf, t, precision)
|
||||
if len(p.DefaultTags) > 0 {
|
||||
for _, m := range metrics {
|
||||
for k, v := range p.DefaultTags {
|
||||
|
@ -41,7 +41,7 @@ func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf
|
|||
// a non-nil error will be returned in addition to the metrics that parsed
|
||||
// successfully.
|
||||
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
return p.ParseWithDefaultTime(buf, time.Now())
|
||||
return p.ParseWithDefaultTimePrecision(buf, time.Now(), "")
|
||||
}
|
||||
|
||||
func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
|
|
Loading…
Reference in New Issue