Refactor InfluxDB listener (#6974)

Use streaming parser in InfluxDB listener
This commit is contained in:
reimda 2020-03-04 11:13:44 -07:00 committed by GitHub
parent ab8438dcc6
commit a0276385b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 19898 additions and 18303 deletions

View File

@ -6,27 +6,27 @@ import (
"net/http" "net/http"
) )
// ErrorFunc is a callback for writing an error response. type BasicAuthErrorFunc func(rw http.ResponseWriter)
type ErrorFunc func(rw http.ResponseWriter, code int)
// AuthHandler returns a http handler that requires HTTP basic auth // AuthHandler returns a http handler that requires HTTP basic auth
// credentials to match the given username and password. // credentials to match the given username and password.
func AuthHandler(username, password string, onError ErrorFunc) func(h http.Handler) http.Handler { func AuthHandler(username, password, realm string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler {
return &basicAuthHandler{ return &basicAuthHandler{
username: username, username: username,
password: password, password: password,
realm: realm,
onError: onError, onError: onError,
next: h, next: h,
} }
} }
} }
type basicAuthHandler struct { type basicAuthHandler struct {
username string username string
password string password string
onError ErrorFunc realm string
onError BasicAuthErrorFunc
next http.Handler next http.Handler
} }
@ -37,7 +37,9 @@ func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request)
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 || subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 ||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 { subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 {
h.onError(rw, http.StatusUnauthorized) rw.Header().Set("WWW-Authenticate", "Basic realm=\""+h.realm+"\"")
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return return
} }
} }
@ -45,6 +47,9 @@ func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request)
h.next.ServeHTTP(rw, req) h.next.ServeHTTP(rw, req)
} }
// ErrorFunc is a callback for writing an error response.
type ErrorFunc func(rw http.ResponseWriter, code int)
// IPRangeHandler returns a http handler that requires the remote address to be // IPRangeHandler returns a http handler that requires the remote address to be
// in the specified network. // in the specified network.
func IPRangeHandler(network []*net.IPNet, onError ErrorFunc) func(h http.Handler) http.Handler { func IPRangeHandler(network []*net.IPNet, onError ErrorFunc) func(h http.Handler) http.Handler {

View File

@ -1,55 +0,0 @@
package metric
import (
"time"
"github.com/influxdata/telegraf"
)
type TimeFunc func() time.Time
type Builder struct {
TimeFunc
TimePrecision time.Duration
*metric
}
func NewBuilder() *Builder {
b := &Builder{
TimeFunc: time.Now,
TimePrecision: 1 * time.Nanosecond,
}
b.Reset()
return b
}
func (b *Builder) SetName(name string) {
b.name = name
}
func (b *Builder) AddTag(key string, value string) {
b.metric.AddTag(key, value)
}
func (b *Builder) AddField(key string, value interface{}) {
b.metric.AddField(key, value)
}
func (b *Builder) SetTime(tm time.Time) {
b.tm = tm
}
func (b *Builder) Reset() {
b.metric = &metric{
tp: telegraf.Untyped,
}
}
func (b *Builder) Metric() (telegraf.Metric, error) {
if b.tm.IsZero() {
b.tm = b.TimeFunc().Truncate(b.TimePrecision)
}
return b.metric, nil
}

View File

@ -50,6 +50,7 @@ func New(
sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key }) sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key })
} }
if len(fields) > 0 {
m.fields = make([]*telegraf.Field, 0, len(fields)) m.fields = make([]*telegraf.Field, 0, len(fields))
for k, v := range fields { for k, v := range fields {
v := convertField(v) v := convertField(v)
@ -58,6 +59,7 @@ func New(
} }
m.AddField(k, v) m.AddField(k, v)
} }
}
return m, nil return m, nil
} }

View File

@ -30,13 +30,13 @@ submits data to InfluxDB determines the destination database.
## maximum duration before timing out write of the response ## maximum duration before timing out write of the response
write_timeout = "10s" write_timeout = "10s"
## Maximum allowed http request body size in bytes. ## Maximum allowed HTTP request body size in bytes.
## 0 means to use the default of 536,870,912 bytes (500 mebibytes) ## 0 means to use the default of 32MiB.
max_body_size = 0 max_body_size = 0
## Maximum line size allowed to be sent in bytes. ## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes) ## deprecated in 1.14; parser now handles lines of unlimited length and option is ignored
max_line_size = 0 # max_line_size = 0
## Set one or more allowed client CA certificate file names to ## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections ## enable mutually authenticated TLS connections

View File

@ -1,43 +0,0 @@
package http_listener
import (
"sync/atomic"
)
type pool struct {
buffers chan []byte
size int
created int64
}
// NewPool returns a new pool object.
// n is the number of buffers
// bufSize is the size (in bytes) of each buffer
func NewPool(n, bufSize int) *pool {
return &pool{
buffers: make(chan []byte, n),
size: bufSize,
}
}
func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
atomic.AddInt64(&p.created, 1)
return make([]byte, p.size)
}
}
func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
}
}
func (p *pool) ncreated() int64 {
return atomic.LoadInt64(&p.created)
}

View File

@ -1,464 +0,0 @@
package http_listener
import (
"bytes"
"compress/gzip"
"crypto/subtle"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/selfstat"
)
const (
// DEFAULT_MAX_BODY_SIZE is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 500 MB
DEFAULT_MAX_BODY_SIZE = 500 * 1024 * 1024
// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for
// a single InfluxDB point.
// 64 KB
DEFAULT_MAX_LINE_SIZE = 64 * 1024
)
type TimeFunc func() time.Time
type HTTPListener struct {
ServiceAddress string `toml:"service_address"`
// Port gets pulled out of ServiceAddress
Port int
tlsint.ServerConfig
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`
TimeFunc
mu sync.Mutex
wg sync.WaitGroup
listener net.Listener
handler *influx.MetricHandler
parser *influx.Parser
acc telegraf.Accumulator
pool *pool
BytesRecv selfstat.Stat
RequestsServed selfstat.Stat
WritesServed selfstat.Stat
QueriesServed selfstat.Stat
PingsServed selfstat.Stat
RequestsRecv selfstat.Stat
WritesRecv selfstat.Stat
QueriesRecv selfstat.Stat
PingsRecv selfstat.Stat
NotFoundsServed selfstat.Stat
BuffersCreated selfstat.Stat
AuthFailures selfstat.Stat
Log telegraf.Logger
longLines selfstat.Stat
}
const sampleConfig = `
## Address and port to host HTTP listener on
service_address = ":8186"
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 524,288,000 bytes (500 mebibytes)
max_body_size = "500MiB"
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = "64KiB"
## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"
## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
# basic_password = "barfoo"
`
func (h *HTTPListener) SampleConfig() string {
return sampleConfig
}
func (h *HTTPListener) Description() string {
return "Influx HTTP write listener"
}
func (h *HTTPListener) Gather(_ telegraf.Accumulator) error {
h.BuffersCreated.Set(h.pool.ncreated())
return nil
}
// Start starts the http listener service.
func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.mu.Lock()
defer h.mu.Unlock()
tags := map[string]string{
"address": h.ServiceAddress,
}
h.BytesRecv = selfstat.Register("http_listener", "bytes_received", tags)
h.RequestsServed = selfstat.Register("http_listener", "requests_served", tags)
h.WritesServed = selfstat.Register("http_listener", "writes_served", tags)
h.QueriesServed = selfstat.Register("http_listener", "queries_served", tags)
h.PingsServed = selfstat.Register("http_listener", "pings_served", tags)
h.RequestsRecv = selfstat.Register("http_listener", "requests_received", tags)
h.WritesRecv = selfstat.Register("http_listener", "writes_received", tags)
h.QueriesRecv = selfstat.Register("http_listener", "queries_received", tags)
h.PingsRecv = selfstat.Register("http_listener", "pings_received", tags)
h.NotFoundsServed = selfstat.Register("http_listener", "not_founds_served", tags)
h.BuffersCreated = selfstat.Register("http_listener", "buffers_created", tags)
h.AuthFailures = selfstat.Register("http_listener", "auth_failures", tags)
h.longLines = selfstat.Register("http_listener", "long_lines", tags)
if h.MaxBodySize.Size == 0 {
h.MaxBodySize.Size = DEFAULT_MAX_BODY_SIZE
}
if h.MaxLineSize.Size == 0 {
h.MaxLineSize.Size = DEFAULT_MAX_LINE_SIZE
}
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}
h.acc = acc
h.pool = NewPool(200, int(h.MaxLineSize.Size))
tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
return err
}
server := &http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
TLSConfig: tlsConf,
}
var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
}
if err != nil {
return err
}
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port
h.handler = influx.NewMetricHandler()
h.parser = influx.NewParser(h.handler)
h.wg.Add(1)
go func() {
defer h.wg.Done()
server.Serve(h.listener)
}()
h.Log.Infof("Started HTTP listener service on %s", h.ServiceAddress)
return nil
}
// Stop cleans up all resources
func (h *HTTPListener) Stop() {
h.mu.Lock()
defer h.mu.Unlock()
h.listener.Close()
h.wg.Wait()
h.Log.Infof("Stopped HTTP listener service on %s", h.ServiceAddress)
}
func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.RequestsRecv.Incr(1)
defer h.RequestsServed.Incr(1)
switch req.URL.Path {
case "/write":
h.WritesRecv.Incr(1)
defer h.WritesServed.Incr(1)
h.AuthenticateIfSet(h.serveWrite, res, req)
case "/query":
h.QueriesRecv.Incr(1)
defer h.QueriesServed.Incr(1)
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
h.AuthenticateIfSet(func(res http.ResponseWriter, req *http.Request) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusOK)
res.Write([]byte("{\"results\":[]}"))
}, res, req)
case "/ping":
h.PingsRecv.Incr(1)
defer h.PingsServed.Incr(1)
verbose := req.URL.Query().Get("verbose")
// respond to ping requests
if verbose != "" && verbose != "0" && verbose != "false" {
res.WriteHeader(http.StatusOK)
b, _ := json.Marshal(map[string]string{"version": "1.0"}) // based on header set above
res.Write(b)
} else {
res.WriteHeader(http.StatusNoContent)
}
default:
defer h.NotFoundsServed.Incr(1)
// Don't know how to respond to calls to other endpoints
h.AuthenticateIfSet(http.NotFound, res, req)
}
}
func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize.Size {
tooLarge(res)
return
}
now := h.TimeFunc()
precision := req.URL.Query().Get("precision")
db := req.URL.Query().Get("db")
// Handle gzip request bodies
body := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(req.Body)
if err != nil {
h.Log.Debug(err.Error())
badRequest(res, err.Error())
return
}
defer body.Close()
}
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
var return400 bool
var hangingBytes bool
buf := h.pool.get()
defer h.pool.put(buf)
bufStart := 0
for {
n, err := io.ReadFull(body, buf[bufStart:])
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
h.Log.Debug(err.Error())
// problem reading the request body
badRequest(res, err.Error())
return
}
h.BytesRecv.Incr(int64(n))
if err == io.EOF {
if return400 {
badRequest(res, "")
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}
if hangingBytes {
i := bytes.IndexByte(buf, '\n')
if i == -1 {
// still didn't find a newline, keep scanning
continue
}
// rotate the bit remaining after the first newline to the front of the buffer
i++ // start copying after the newline
bufStart = len(buf) - i
if bufStart > 0 {
copy(buf, buf[i:])
}
hangingBytes = false
continue
}
if err == io.ErrUnexpectedEOF {
// finished reading the request body
err = h.parse(buf[:n+bufStart], now, precision, db)
if err != nil {
h.Log.Debugf("%s: %s", err.Error(), bufStart+n)
return400 = true
}
if return400 {
if err != nil {
badRequest(res, err.Error())
} else {
badRequest(res, "")
}
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}
// if we got down here it means that we filled our buffer, and there
// are still bytes remaining to be read. So we will parse up until the
// final newline, then push the rest of the bytes into the next buffer.
i := bytes.LastIndexByte(buf, '\n')
if i == -1 {
h.longLines.Incr(1)
// drop any line longer than the max buffer size
h.Log.Debugf("Http_listener received a single line longer than the maximum of %d bytes",
len(buf))
hangingBytes = true
return400 = true
bufStart = 0
continue
}
if err := h.parse(buf[:i+1], now, precision, db); err != nil {
h.Log.Debug(err.Error())
return400 = true
}
// rotate the bit remaining after the last newline to the front of the buffer
i++ // start copying after the newline
bufStart = len(buf) - i
if bufStart > 0 {
copy(buf, buf[i:])
}
}
}
func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
h.mu.Lock()
defer h.mu.Unlock()
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
h.handler.SetTimeFunc(func() time.Time { return t })
metrics, err := h.parser.Parse(b)
if err != nil {
return fmt.Errorf("unable to parse: %s", err.Error())
}
for _, m := range metrics {
// Do we need to keep the database name in the query string.
// If a tag has been supplied to put the db in and we actually got a db query,
// then we write it in. This overwrites the database tag if one was sent.
// This makes it behave like the influx endpoint.
if h.DatabaseTag != "" && db != "" {
m.AddTag(h.DatabaseTag, db)
}
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
func tooLarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.Header().Set("X-Influxdb-Error", "http: request body too large")
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(`{"error":"http: request body too large"}`))
}
func badRequest(res http.ResponseWriter, errString string) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
if errString == "" {
errString = "http: bad request"
}
res.Header().Set("X-Influxdb-Error", errString)
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":%q}`, errString)))
}
func (h *HTTPListener) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
if h.BasicUsername != "" && h.BasicPassword != "" {
reqUsername, reqPassword, ok := req.BasicAuth()
if !ok ||
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.BasicUsername)) != 1 ||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.BasicPassword)) != 1 {
h.AuthFailures.Incr(1)
http.Error(res, "Unauthorized.", http.StatusUnauthorized)
return
}
handler(res, req)
} else {
handler(res, req)
}
}
func getPrecisionMultiplier(precision string) time.Duration {
d := time.Nanosecond
switch precision {
case "u":
d = time.Microsecond
case "ms":
d = time.Millisecond
case "s":
d = time.Second
case "m":
d = time.Minute
case "h":
d = time.Hour
}
return d
}
func init() {
// http_listener deprecated in 1.9
inputs.Add("http_listener", func() telegraf.Input {
return &HTTPListener{
ServiceAddress: ":8186",
TimeFunc: time.Now,
}
})
inputs.Add("influxdb_listener", func() telegraf.Input {
return &HTTPListener{
ServiceAddress: ":8186",
TimeFunc: time.Now,
}
})
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,406 @@
package influxdb_listener
import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/selfstat"
)
const (
// defaultMaxBodySize is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
defaultMaxBodySize = 32 * 1024 * 1024
)
type InfluxDBListener struct {
ServiceAddress string `toml:"service_address"`
port int
tlsint.ServerConfig
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`
timeFunc influx.TimeFunc
listener net.Listener
server http.Server
acc telegraf.Accumulator
bytesRecv selfstat.Stat
requestsServed selfstat.Stat
writesServed selfstat.Stat
queriesServed selfstat.Stat
pingsServed selfstat.Stat
requestsRecv selfstat.Stat
notFoundsServed selfstat.Stat
buffersCreated selfstat.Stat
authFailures selfstat.Stat
Log telegraf.Logger `toml:"-"`
mux http.ServeMux
}
const sampleConfig = `
## Address and port to host InfluxDB listener on
service_address = ":8186"
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"
## Maximum allowed HTTP request body size in bytes.
## 0 means to use the default of 32MiB.
max_body_size = "32MiB"
## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"
## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
# basic_password = "barfoo"
`
func (h *InfluxDBListener) SampleConfig() string {
return sampleConfig
}
func (h *InfluxDBListener) Description() string {
return "Accept metrics over InfluxDB 1.x HTTP API"
}
func (h *InfluxDBListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (h *InfluxDBListener) routes() {
authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, "influxdb",
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
h.mux.Handle("/write", authHandler(h.handleWrite()))
h.mux.Handle("/query", authHandler(h.handleQuery()))
h.mux.Handle("/ping", h.handlePing())
h.mux.Handle("/", authHandler(h.handleDefault()))
}
func (h *InfluxDBListener) Init() error {
tags := map[string]string{
"address": h.ServiceAddress,
}
h.bytesRecv = selfstat.Register("influxdb_listener", "bytes_received", tags)
h.requestsServed = selfstat.Register("influxdb_listener", "requests_served", tags)
h.writesServed = selfstat.Register("influxdb_listener", "writes_served", tags)
h.queriesServed = selfstat.Register("influxdb_listener", "queries_served", tags)
h.pingsServed = selfstat.Register("influxdb_listener", "pings_served", tags)
h.requestsRecv = selfstat.Register("influxdb_listener", "requests_received", tags)
h.notFoundsServed = selfstat.Register("influxdb_listener", "not_founds_served", tags)
h.buffersCreated = selfstat.Register("influxdb_listener", "buffers_created", tags)
h.authFailures = selfstat.Register("influxdb_listener", "auth_failures", tags)
h.routes()
if h.MaxBodySize.Size == 0 {
h.MaxBodySize.Size = defaultMaxBodySize
}
if h.MaxLineSize.Size != 0 {
h.Log.Warnf("Use of deprecated configuration: 'max_line_size'; parser now handles lines of unlimited length and option is ignored")
}
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}
return nil
}
// Start starts the InfluxDB listener service.
func (h *InfluxDBListener) Start(acc telegraf.Accumulator) error {
h.acc = acc
tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
return err
}
h.server = http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
TLSConfig: tlsConf,
}
var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
if err != nil {
return err
}
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
if err != nil {
return err
}
}
h.listener = listener
h.port = listener.Addr().(*net.TCPAddr).Port
go func() {
err = h.server.Serve(h.listener)
if err != http.ErrServerClosed {
h.Log.Infof("Error serving HTTP on %s", h.ServiceAddress)
}
}()
h.Log.Infof("Started HTTP listener service on %s", h.ServiceAddress)
return nil
}
// Stop cleans up all resources
func (h *InfluxDBListener) Stop() {
err := h.server.Shutdown(context.Background())
if err != nil {
h.Log.Infof("Error shutting down HTTP server: %v", err.Error())
}
}
func (h *InfluxDBListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.requestsRecv.Incr(1)
h.mux.ServeHTTP(res, req)
h.requestsServed.Incr(1)
}
func (h *InfluxDBListener) handleQuery() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.queriesServed.Incr(1)
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusOK)
res.Write([]byte("{\"results\":[]}"))
}
}
func (h *InfluxDBListener) handlePing() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.pingsServed.Incr(1)
verbose := req.URL.Query().Get("verbose")
// respond to ping requests
if verbose != "" && verbose != "0" && verbose != "false" {
res.WriteHeader(http.StatusOK)
b, _ := json.Marshal(map[string]string{"version": "1.0"}) // based on header set above
res.Write(b)
} else {
res.WriteHeader(http.StatusNoContent)
}
}
}
func (h *InfluxDBListener) handleDefault() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.notFoundsServed.Incr(1)
http.NotFound(res, req)
}
}
func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1)
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize.Size {
tooLarge(res)
return
}
db := req.URL.Query().Get("db")
body := req.Body
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
// Handle gzip request bodies
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(body)
if err != nil {
h.Log.Debugf("Error decompressing request body: %v", err.Error())
badRequest(res, err.Error())
return
}
defer body.Close()
}
parser := influx.NewStreamParser(body)
parser.SetTimeFunc(h.timeFunc)
precisionStr := req.URL.Query().Get("precision")
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
parser.SetTimePrecision(precision)
}
var m telegraf.Metric
var err error
var parseErrorCount int
var lastPos int = 0
var firstParseErrorStr string
for {
select {
case <-req.Context().Done():
// Shutting down before parsing is finished.
res.WriteHeader(http.StatusServiceUnavailable)
return
default:
}
m, err = parser.Next()
pos := parser.Position()
h.bytesRecv.Incr(int64(pos - lastPos))
lastPos = pos
// Continue parsing metrics even if some are malformed
if parseErr, ok := err.(*influx.ParseError); ok {
parseErrorCount += 1
errStr := parseErr.Error()
if firstParseErrorStr == "" {
firstParseErrorStr = errStr
}
continue
} else if err != nil {
// Either we're exiting cleanly (err ==
// influx.EOF) or there's an unexpected error
break
}
if h.DatabaseTag != "" && db != "" {
m.AddTag(h.DatabaseTag, db)
}
h.acc.AddMetric(m)
}
if err != influx.EOF {
h.Log.Debugf("Error parsing the request body: %v", err.Error())
badRequest(res, err.Error())
return
}
if parseErrorCount > 0 {
var partialErrorString string
switch parseErrorCount {
case 1:
partialErrorString = fmt.Sprintf("%s", firstParseErrorStr)
case 2:
partialErrorString = fmt.Sprintf("%s (and 1 other parse error)", firstParseErrorStr)
default:
partialErrorString = fmt.Sprintf("%s (and %d other parse errors)", firstParseErrorStr, parseErrorCount-1)
}
partialWrite(res, partialErrorString)
return
}
// http request success
res.WriteHeader(http.StatusNoContent)
}
}
func tooLarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.Header().Set("X-Influxdb-Error", "http: request body too large")
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(`{"error":"http: request body too large"}`))
}
func badRequest(res http.ResponseWriter, errString string) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
if errString == "" {
errString = "http: bad request"
}
res.Header().Set("X-Influxdb-Error", errString)
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":%q}`, errString)))
}
func partialWrite(res http.ResponseWriter, errString string) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.Header().Set("X-Influxdb-Error", errString)
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":%q}`, errString)))
}
func getPrecisionMultiplier(precision string) time.Duration {
// Influxdb defaults silently to nanoseconds if precision isn't
// one of the following:
var d time.Duration
switch precision {
case "u":
d = time.Microsecond
case "ms":
d = time.Millisecond
case "s":
d = time.Second
case "m":
d = time.Minute
case "h":
d = time.Hour
default:
d = time.Nanosecond
}
return d
}
func init() {
// http_listener deprecated in 1.9
inputs.Add("http_listener", func() telegraf.Input {
return &InfluxDBListener{
ServiceAddress: ":8186",
timeFunc: time.Now,
}
})
inputs.Add("influxdb_listener", func() telegraf.Input {
return &InfluxDBListener{
ServiceAddress: ":8186",
timeFunc: time.Now,
}
})
}

View File

@ -0,0 +1,108 @@
package influxdb_listener
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
)
// newListener is the minimal InfluxDBListener construction to serve writes.
func newListener() *InfluxDBListener {
listener := &InfluxDBListener{
timeFunc: time.Now,
acc: &testutil.NopAccumulator{},
bytesRecv: selfstat.Register("influxdb_listener", "bytes_received", map[string]string{}),
writesServed: selfstat.Register("influxdb_listener", "writes_served", map[string]string{}),
MaxBodySize: internal.Size{
Size: defaultMaxBodySize,
},
}
return listener
}
func BenchmarkInfluxDBListener_serveWrite(b *testing.B) {
res := httptest.NewRecorder()
addr := "http://localhost/write?db=mydb"
benchmarks := []struct {
name string
lines string
}{
{
name: "single line, tag, and field",
lines: lines(1, 1, 1),
},
{
name: "single line, 10 tags and fields",
lines: lines(1, 10, 10),
},
{
name: "single line, 100 tags and fields",
lines: lines(1, 100, 100),
},
{
name: "1k lines, single tag and field",
lines: lines(1000, 1, 1),
},
{
name: "1k lines, 10 tags and fields",
lines: lines(1000, 10, 10),
},
{
name: "10k lines, 10 tags and fields",
lines: lines(10000, 10, 10),
},
{
name: "100k lines, 10 tags and fields",
lines: lines(100000, 10, 10),
},
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
listener := newListener()
b.ResetTimer()
for n := 0; n < b.N; n++ {
req, err := http.NewRequest("POST", addr, strings.NewReader(bm.lines))
if err != nil {
b.Error(err)
}
listener.handleWrite()(res, req)
if res.Code != http.StatusNoContent {
b.Errorf("unexpected status %d", res.Code)
}
}
})
}
}
func lines(lines, numTags, numFields int) string {
lp := make([]string, lines)
for i := 0; i < lines; i++ {
tags := make([]string, numTags)
for j := 0; j < numTags; j++ {
tags[j] = fmt.Sprintf("t%d=v%d", j, j)
}
fields := make([]string, numFields)
for k := 0; k < numFields; k++ {
fields[k] = fmt.Sprintf("f%d=%d", k, k)
}
lp[i] = fmt.Sprintf("m%d,%s %s",
i,
strings.Join(tags, ","),
strings.Join(fields, ","),
)
}
return strings.Join(lp, "\n")
}

File diff suppressed because one or more lines are too long

View File

@ -136,7 +136,7 @@ func (h *Health) Init() error {
// Connect starts the HTTP server. // Connect starts the HTTP server.
func (h *Health) Connect() error { func (h *Health) Connect() error {
authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, onAuthError) authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, "health", onAuthError)
h.server = &http.Server{ h.server = &http.Server{
Addr: h.ServiceAddress, Addr: h.ServiceAddress,
@ -168,8 +168,7 @@ func (h *Health) Connect() error {
return nil return nil
} }
func onAuthError(rw http.ResponseWriter, code int) { func onAuthError(_ http.ResponseWriter) {
http.Error(rw, http.StatusText(code), code)
} }
func (h *Health) listen() (net.Listener, error) { func (h *Health) listen() (net.Listener, error) {

View File

@ -156,7 +156,7 @@ func (p *PrometheusClient) Init() error {
ipRange = append(ipRange, ipNet) ipRange = append(ipRange, ipNet)
} }
authHandler := internal.AuthHandler(p.BasicUsername, p.BasicPassword, onAuthError) authHandler := internal.AuthHandler(p.BasicUsername, p.BasicPassword, "prometheus", onAuthError)
rangeHandler := internal.IPRangeHandler(ipRange, onError) rangeHandler := internal.IPRangeHandler(ipRange, onError)
promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}) promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})
@ -219,9 +219,7 @@ func (p *PrometheusClient) Connect() error {
return nil return nil
} }
func onAuthError(rw http.ResponseWriter, code int) { func onAuthError(_ http.ResponseWriter) {
rw.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
http.Error(rw, http.StatusText(code), code)
} }
func onError(rw http.ResponseWriter, code int) { func onError(rw http.ResponseWriter, code int) {

View File

@ -13,6 +13,8 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
) )
type TimeFunc func() time.Time
type Parser struct { type Parser struct {
MetricName string MetricName string
HeaderRowCount int HeaderRowCount int
@ -31,7 +33,7 @@ type Parser struct {
TimeFunc func() time.Time TimeFunc func() time.Time
} }
func (p *Parser) SetTimeFunc(fn metric.TimeFunc) { func (p *Parser) SetTimeFunc(fn TimeFunc) {
p.TimeFunc = fn p.TimeFunc = fn
} }

View File

@ -17,6 +17,8 @@ import (
var fieldEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") var fieldEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
var keyEscaper = strings.NewReplacer(" ", "\\ ", ",", "\\,", "=", "\\=") var keyEscaper = strings.NewReplacer(" ", "\\ ", ",", "\\,", "=", "\\=")
type TimeFunc func() time.Time
// Parser parses json inputs containing dropwizard metrics, // Parser parses json inputs containing dropwizard metrics,
// either top-level or embedded inside a json field. // either top-level or embedded inside a json field.
// This parser is using gjson for retrieving paths within the json file. // This parser is using gjson for retrieving paths within the json file.
@ -48,7 +50,7 @@ type parser struct {
separator string separator string
templateEngine *templating.Engine templateEngine *templating.Engine
timeFunc metric.TimeFunc timeFunc TimeFunc
// seriesParser parses line protocol measurement + tags // seriesParser parses line protocol measurement + tags
seriesParser *influx.Parser seriesParser *influx.Parser
@ -267,6 +269,6 @@ func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
return metrics return metrics
} }
func (p *parser) SetTimeFunc(f metric.TimeFunc) { func (p *parser) SetTimeFunc(f TimeFunc) {
p.timeFunc = f p.timeFunc = f
} }

View File

@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var TimeFunc = func() time.Time { var testTimeFunc = func() time.Time {
return time.Unix(0, 0) return time.Unix(0, 0)
} }
@ -528,7 +528,7 @@ func TestDropWizard(t *testing.T) {
map[string]interface{}{ map[string]interface{}{
"value": 42.0, "value": 42.0,
}, },
TimeFunc(), testTimeFunc(),
), ),
), ),
}, },
@ -547,7 +547,7 @@ func TestDropWizard(t *testing.T) {
map[string]interface{}{ map[string]interface{}{
"value": 42.0, "value": 42.0,
}, },
TimeFunc(), testTimeFunc(),
), ),
), ),
}, },
@ -573,7 +573,7 @@ func TestDropWizard(t *testing.T) {
map[string]interface{}{ map[string]interface{}{
"value": 42.0, "value": 42.0,
}, },
TimeFunc(), testTimeFunc(),
), ),
), ),
}, },
@ -584,7 +584,7 @@ func TestDropWizard(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
parser := NewParser() parser := NewParser()
parser.SetTimeFunc(TimeFunc) parser.SetTimeFunc(testTimeFunc)
metrics, err := parser.Parse(tt.input) metrics, err := parser.Parse(tt.input)
tt.errFunc(t, err) tt.errFunc(t, err)

View File

@ -10,43 +10,53 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
) )
// MetricHandler implements the Handler interface and produces telegraf.Metric.
type MetricHandler struct { type MetricHandler struct {
builder *metric.Builder
err error err error
precision time.Duration timePrecision time.Duration
timeFunc TimeFunc
metric telegraf.Metric
} }
func NewMetricHandler() *MetricHandler { func NewMetricHandler() *MetricHandler {
return &MetricHandler{ return &MetricHandler{
builder: metric.NewBuilder(), timePrecision: time.Nanosecond,
precision: time.Nanosecond, timeFunc: time.Now,
} }
} }
func (h *MetricHandler) SetTimeFunc(f metric.TimeFunc) { func (h *MetricHandler) SetTimePrecision(p time.Duration) {
h.builder.TimeFunc = f h.timePrecision = p
// When the timestamp is omitted from the metric, the timestamp
// comes from the server clock, truncated to the nearest unit of
// measurement provided in precision.
//
// When a timestamp is provided in the metric, precsision is
// overloaded to hold the unit of measurement of the timestamp.
} }
func (h *MetricHandler) SetTimePrecision(precision time.Duration) { func (h *MetricHandler) SetTimeFunc(f TimeFunc) {
h.builder.TimePrecision = precision h.timeFunc = f
h.precision = precision
} }
func (h *MetricHandler) Metric() (telegraf.Metric, error) { func (h *MetricHandler) Metric() (telegraf.Metric, error) {
m, err := h.builder.Metric() if h.metric.Time().IsZero() {
h.builder.Reset() h.metric.SetTime(h.timeFunc().Truncate(h.timePrecision))
return m, err }
return h.metric, nil
} }
func (h *MetricHandler) SetMeasurement(name []byte) error { func (h *MetricHandler) SetMeasurement(name []byte) error {
h.builder.SetName(nameUnescape(name)) var err error
return nil h.metric, err = metric.New(nameUnescape(name),
nil, nil, time.Time{})
return err
} }
func (h *MetricHandler) AddTag(key []byte, value []byte) error { func (h *MetricHandler) AddTag(key []byte, value []byte) error {
tk := unescape(key) tk := unescape(key)
tv := unescape(value) tv := unescape(value)
h.builder.AddTag(tk, tv) h.metric.AddTag(tk, tv)
return nil return nil
} }
@ -59,7 +69,7 @@ func (h *MetricHandler) AddInt(key []byte, value []byte) error {
} }
return err return err
} }
h.builder.AddField(fk, fv) h.metric.AddField(fk, fv)
return nil return nil
} }
@ -72,7 +82,7 @@ func (h *MetricHandler) AddUint(key []byte, value []byte) error {
} }
return err return err
} }
h.builder.AddField(fk, fv) h.metric.AddField(fk, fv)
return nil return nil
} }
@ -85,14 +95,14 @@ func (h *MetricHandler) AddFloat(key []byte, value []byte) error {
} }
return err return err
} }
h.builder.AddField(fk, fv) h.metric.AddField(fk, fv)
return nil return nil
} }
func (h *MetricHandler) AddString(key []byte, value []byte) error { func (h *MetricHandler) AddString(key []byte, value []byte) error {
fk := unescape(key) fk := unescape(key)
fv := stringFieldUnescape(value) fv := stringFieldUnescape(value)
h.builder.AddField(fk, fv) h.metric.AddField(fk, fv)
return nil return nil
} }
@ -102,7 +112,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) error {
if err != nil { if err != nil {
return errors.New("unparseable bool") return errors.New("unparseable bool")
} }
h.builder.AddField(fk, fv) h.metric.AddField(fk, fv)
return nil return nil
} }
@ -114,11 +124,9 @@ func (h *MetricHandler) SetTimestamp(tm []byte) error {
} }
return err return err
} }
ns := v * int64(h.precision)
h.builder.SetTime(time.Unix(0, ns)) //time precision is overloaded to mean time unit here
ns := v * int64(h.timePrecision)
h.metric.SetTime(time.Unix(0, ns))
return nil return nil
} }
func (h *MetricHandler) Reset() {
h.builder.Reset()
}

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ package influx
import ( import (
"errors" "errors"
"io"
) )
var ( var (
@ -70,8 +71,8 @@ action goto_align {
fgoto align; fgoto align;
} }
action found_metric { action begin_metric {
foundMetric = true m.beginMetric = true
} }
action name { action name {
@ -84,11 +85,11 @@ action name {
} }
action tagkey { action tagkey {
key = m.text() m.key = m.text()
} }
action tagvalue { action tagvalue {
err = m.handler.AddTag(key, m.text()) err = m.handler.AddTag(m.key, m.text())
if err != nil { if err != nil {
fhold; fhold;
fnext discard_line; fnext discard_line;
@ -97,11 +98,11 @@ action tagvalue {
} }
action fieldkey { action fieldkey {
key = m.text() m.key = m.text()
} }
action integer { action integer {
err = m.handler.AddInt(key, m.text()) err = m.handler.AddInt(m.key, m.text())
if err != nil { if err != nil {
fhold; fhold;
fnext discard_line; fnext discard_line;
@ -110,7 +111,7 @@ action integer {
} }
action unsigned { action unsigned {
err = m.handler.AddUint(key, m.text()) err = m.handler.AddUint(m.key, m.text())
if err != nil { if err != nil {
fhold; fhold;
fnext discard_line; fnext discard_line;
@ -119,7 +120,7 @@ action unsigned {
} }
action float { action float {
err = m.handler.AddFloat(key, m.text()) err = m.handler.AddFloat(m.key, m.text())
if err != nil { if err != nil {
fhold; fhold;
fnext discard_line; fnext discard_line;
@ -128,7 +129,7 @@ action float {
} }
action bool { action bool {
err = m.handler.AddBool(key, m.text()) err = m.handler.AddBool(m.key, m.text())
if err != nil { if err != nil {
fhold; fhold;
fnext discard_line; fnext discard_line;
@ -137,7 +138,7 @@ action bool {
} }
action string { action string {
err = m.handler.AddString(key, m.text()) err = m.handler.AddString(m.key, m.text())
if err != nil { if err != nil {
fhold; fhold;
fnext discard_line; fnext discard_line;
@ -161,15 +162,20 @@ action incr_newline {
} }
action eol { action eol {
m.finishMetric = true
fnext align; fnext align;
fbreak; fbreak;
} }
action finish_metric {
m.finishMetric = true
}
ws = ws =
[\t\v\f ]; [\t\v\f ];
newline = newline =
'\r'? '\n' %to(incr_newline); '\r'? '\n' >incr_newline;
non_zero_digit = non_zero_digit =
[1-9]; [1-9];
@ -273,7 +279,7 @@ line_without_term =
main := main :=
(line_with_term* (line_with_term*
(line_with_term | line_without_term?) (line_with_term | line_without_term?)
) >found_metric ) >begin_metric %eof(finish_metric)
; ;
# The discard_line machine discards the current line. Useful for recovering # The discard_line machine discards the current line. Useful for recovering
@ -299,7 +305,7 @@ align :=
# Series is a machine for matching measurement+tagset # Series is a machine for matching measurement+tagset
series := series :=
(measurement >err(name_error) tagset eol_break?) (measurement >err(name_error) tagset eol_break?)
>found_metric >begin_metric
; ;
}%% }%%
@ -325,6 +331,9 @@ type machine struct {
sol int sol int
handler Handler handler Handler
initState int initState int
key []byte
beginMetric bool
finishMetric bool
} }
func NewMachine(handler Handler) *machine { func NewMachine(handler Handler) *machine {
@ -368,6 +377,9 @@ func (m *machine) SetData(data []byte) {
m.sol = 0 m.sol = 0
m.pe = len(data) m.pe = len(data)
m.eof = len(data) m.eof = len(data)
m.key = nil
m.beginMetric = false
m.finishMetric = false
%% write init; %% write init;
m.cs = m.initState m.cs = m.initState
@ -382,10 +394,15 @@ func (m *machine) Next() error {
return EOF return EOF
} }
var err error m.key = nil
var key []byte m.beginMetric = false
foundMetric := false m.finishMetric = false
return m.exec()
}
func (m *machine) exec() error {
var err error
%% write exec; %% write exec;
if err != nil { if err != nil {
@ -405,7 +422,7 @@ func (m *machine) Next() error {
// //
// Otherwise we have successfully parsed a metric line, so if we are at // Otherwise we have successfully parsed a metric line, so if we are at
// the EOF we will report it the next call. // the EOF we will report it the next call.
if !foundMetric && m.p == m.pe && m.pe == m.eof { if !m.beginMetric && m.p == m.pe && m.pe == m.eof {
return EOF return EOF
} }
@ -437,3 +454,96 @@ func (m *machine) Column() int {
func (m *machine) text() []byte { func (m *machine) text() []byte {
return m.data[m.pb:m.p] return m.data[m.pb:m.p]
} }
type streamMachine struct {
machine *machine
reader io.Reader
}
func NewStreamMachine(r io.Reader, handler Handler) *streamMachine {
m := &streamMachine{
machine: NewMachine(handler),
reader: r,
}
m.machine.SetData(make([]byte, 1024))
m.machine.pe = 0
m.machine.eof = -1
return m
}
func (m *streamMachine) Next() error {
// Check if we are already at EOF, this should only happen if called again
// after already returning EOF.
if m.machine.p == m.machine.pe && m.machine.pe == m.machine.eof {
return EOF
}
copy(m.machine.data, m.machine.data[m.machine.p:])
m.machine.pe = m.machine.pe - m.machine.p
m.machine.sol = m.machine.sol - m.machine.p
m.machine.pb = 0
m.machine.p = 0
m.machine.eof = -1
m.machine.key = nil
m.machine.beginMetric = false
m.machine.finishMetric = false
for {
// Expand the buffer if it is full
if m.machine.pe == len(m.machine.data) {
expanded := make([]byte, 2 * len(m.machine.data))
copy(expanded, m.machine.data)
m.machine.data = expanded
}
n, err := m.reader.Read(m.machine.data[m.machine.pe:])
if n == 0 && err == io.EOF {
m.machine.eof = m.machine.pe
} else if err != nil && err != io.EOF {
return err
}
m.machine.pe += n
err = m.machine.exec()
if err != nil {
return err
}
// If we have successfully parsed a full metric line break out
if m.machine.finishMetric {
break
}
}
return nil
}
// Position returns the current byte offset into the data.
func (m *streamMachine) Position() int {
return m.machine.Position()
}
// LineOffset returns the byte offset of the current line.
func (m *streamMachine) LineOffset() int {
return m.machine.LineOffset()
}
// LineNumber returns the current line number. Lines are counted based on the
// regular expression `\r?\n`.
func (m *streamMachine) LineNumber() int {
return m.machine.LineNumber()
}
// Column returns the current column.
func (m *streamMachine) Column() int {
return m.machine.Column()
}
// LineText returns the text of the current line that has been parsed so far.
func (m *streamMachine) LineText() string {
return string(m.machine.data[0:m.machine.p])
}

View File

@ -1,8 +1,10 @@
package influx_test package influx_test
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io"
"testing" "testing"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
@ -14,41 +16,59 @@ type TestingHandler struct {
} }
func (h *TestingHandler) SetMeasurement(name []byte) error { func (h *TestingHandler) SetMeasurement(name []byte) error {
n := make([]byte, len(name))
copy(n, name)
mname := Result{ mname := Result{
Name: Measurement, Name: Measurement,
Value: name, Value: n,
} }
h.results = append(h.results, mname) h.results = append(h.results, mname)
return nil return nil
} }
func (h *TestingHandler) AddTag(key []byte, value []byte) error { func (h *TestingHandler) AddTag(key []byte, value []byte) error {
k := make([]byte, len(key))
copy(k, key)
v := make([]byte, len(value))
copy(v, value)
tagkey := Result{ tagkey := Result{
Name: TagKey, Name: TagKey,
Value: key, Value: k,
} }
tagvalue := Result{ tagvalue := Result{
Name: TagValue, Name: TagValue,
Value: value, Value: v,
} }
h.results = append(h.results, tagkey, tagvalue) h.results = append(h.results, tagkey, tagvalue)
return nil return nil
} }
func (h *TestingHandler) AddInt(key []byte, value []byte) error { func (h *TestingHandler) AddInt(key []byte, value []byte) error {
k := make([]byte, len(key))
copy(k, key)
v := make([]byte, len(value))
copy(v, value)
fieldkey := Result{ fieldkey := Result{
Name: FieldKey, Name: FieldKey,
Value: key, Value: k,
} }
fieldvalue := Result{ fieldvalue := Result{
Name: FieldInt, Name: FieldInt,
Value: value, Value: v,
} }
h.results = append(h.results, fieldkey, fieldvalue) h.results = append(h.results, fieldkey, fieldvalue)
return nil return nil
} }
func (h *TestingHandler) AddUint(key []byte, value []byte) error { func (h *TestingHandler) AddUint(key []byte, value []byte) error {
k := make([]byte, len(key))
copy(k, key)
v := make([]byte, len(value))
copy(v, value)
fieldkey := Result{ fieldkey := Result{
Name: FieldKey, Name: FieldKey,
Value: key, Value: key,
@ -62,48 +82,66 @@ func (h *TestingHandler) AddUint(key []byte, value []byte) error {
} }
func (h *TestingHandler) AddFloat(key []byte, value []byte) error { func (h *TestingHandler) AddFloat(key []byte, value []byte) error {
k := make([]byte, len(key))
copy(k, key)
v := make([]byte, len(value))
copy(v, value)
fieldkey := Result{ fieldkey := Result{
Name: FieldKey, Name: FieldKey,
Value: key, Value: k,
} }
fieldvalue := Result{ fieldvalue := Result{
Name: FieldFloat, Name: FieldFloat,
Value: value, Value: v,
} }
h.results = append(h.results, fieldkey, fieldvalue) h.results = append(h.results, fieldkey, fieldvalue)
return nil return nil
} }
func (h *TestingHandler) AddString(key []byte, value []byte) error { func (h *TestingHandler) AddString(key []byte, value []byte) error {
k := make([]byte, len(key))
copy(k, key)
v := make([]byte, len(value))
copy(v, value)
fieldkey := Result{ fieldkey := Result{
Name: FieldKey, Name: FieldKey,
Value: key, Value: k,
} }
fieldvalue := Result{ fieldvalue := Result{
Name: FieldString, Name: FieldString,
Value: value, Value: v,
} }
h.results = append(h.results, fieldkey, fieldvalue) h.results = append(h.results, fieldkey, fieldvalue)
return nil return nil
} }
func (h *TestingHandler) AddBool(key []byte, value []byte) error { func (h *TestingHandler) AddBool(key []byte, value []byte) error {
k := make([]byte, len(key))
copy(k, key)
v := make([]byte, len(value))
copy(v, value)
fieldkey := Result{ fieldkey := Result{
Name: FieldKey, Name: FieldKey,
Value: key, Value: k,
} }
fieldvalue := Result{ fieldvalue := Result{
Name: FieldBool, Name: FieldBool,
Value: value, Value: v,
} }
h.results = append(h.results, fieldkey, fieldvalue) h.results = append(h.results, fieldkey, fieldvalue)
return nil return nil
} }
func (h *TestingHandler) SetTimestamp(tm []byte) error { func (h *TestingHandler) SetTimestamp(tm []byte) error {
t := make([]byte, len(tm))
copy(t, tm)
timestamp := Result{ timestamp := Result{
Name: Timestamp, Name: Timestamp,
Value: tm, Value: t,
} }
h.results = append(h.results, timestamp) h.results = append(h.results, timestamp)
return nil return nil
@ -1676,8 +1714,7 @@ func TestMachine(t *testing.T) {
} }
} }
func TestMachinePosition(t *testing.T) { var positionTests = []struct {
var tests = []struct {
name string name string
input []byte input []byte
lineno int lineno int
@ -1732,7 +1769,9 @@ func TestMachinePosition(t *testing.T) {
column: 14, column: 14,
}, },
} }
for _, tt := range tests {
func TestMachinePosition(t *testing.T) {
for _, tt := range positionTests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
handler := &TestingHandler{} handler := &TestingHandler{}
fsm := influx.NewMachine(handler) fsm := influx.NewMachine(handler)
@ -1932,8 +1971,7 @@ func (h *MockHandler) SetTimestamp(tm []byte) error {
return h.SetTimestampF(tm) return h.SetTimestampF(tm)
} }
func TestHandlerErrorRecovery(t *testing.T) { var errorRecoveryTests = []struct {
var tests = []struct {
name string name string
input []byte input []byte
handler *MockHandler handler *MockHandler
@ -2060,7 +2098,9 @@ func TestHandlerErrorRecovery(t *testing.T) {
}, },
}, },
} }
for _, tt := range tests {
func TestHandlerErrorRecovery(t *testing.T) {
for _, tt := range errorRecoveryTests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
fsm := influx.NewMachine(tt.handler) fsm := influx.NewMachine(tt.handler)
fsm.SetData(tt.input) fsm.SetData(tt.input)
@ -2078,3 +2118,79 @@ func TestHandlerErrorRecovery(t *testing.T) {
}) })
} }
} }
func TestStreamMachine(t *testing.T) {
type testcase struct {
name string
input io.Reader
results []Result
err error
}
var tc []testcase
for _, tt := range tests {
tc = append(tc, testcase{
name: tt.name,
input: bytes.NewBuffer([]byte(tt.input)),
results: tt.results,
err: tt.err,
})
}
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
handler := &TestingHandler{}
fsm := influx.NewStreamMachine(tt.input, handler)
// Parse only up to 20 metrics; to avoid any bugs where the parser
// isn't terminated.
for i := 0; i < 20; i++ {
err := fsm.Next()
if err != nil && err == influx.EOF {
break
}
handler.Result(err)
}
results := handler.Results()
require.Equal(t, tt.results, results)
})
}
}
func TestStreamMachinePosition(t *testing.T) {
type testcase struct {
name string
input io.Reader
lineno int
column int
}
var tc []testcase
for _, tt := range positionTests {
tc = append(tc, testcase{
name: tt.name,
input: bytes.NewBuffer([]byte(tt.input)),
lineno: tt.lineno,
column: tt.column,
})
}
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
handler := &TestingHandler{}
fsm := influx.NewStreamMachine(tt.input, handler)
// Parse until an error or eof
for i := 0; i < 20; i++ {
err := fsm.Next()
if err != nil {
break
}
}
require.Equal(t, tt.lineno, fsm.LineNumber(), "lineno")
require.Equal(t, tt.column, fsm.Column(), "column")
})
}
}

View File

@ -3,8 +3,10 @@ package influx
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
@ -17,6 +19,9 @@ var (
ErrNoMetric = errors.New("no metric in line") ErrNoMetric = errors.New("no metric in line")
) )
type TimeFunc func() time.Time
// ParseError indicates a error in the parsing of the text.
type ParseError struct { type ParseError struct {
Offset int Offset int
LineOffset int LineOffset int
@ -38,6 +43,8 @@ func (e *ParseError) Error() string {
return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.msg, e.LineNumber, e.Column, buffer) return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.msg, e.LineNumber, e.Column, buffer)
} }
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct { type Parser struct {
DefaultTags map[string]string DefaultTags map[string]string
@ -62,6 +69,10 @@ func NewSeriesParser(handler *MetricHandler) *Parser {
} }
} }
func (h *Parser) SetTimeFunc(f TimeFunc) {
h.handler.SetTimeFunc(f)
}
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@ -75,7 +86,6 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
} }
if err != nil { if err != nil {
p.handler.Reset()
return nil, &ParseError{ return nil, &ParseError{
Offset: p.machine.Position(), Offset: p.machine.Position(),
LineOffset: p.machine.LineOffset(), LineOffset: p.machine.LineOffset(),
@ -88,7 +98,6 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
metric, err := p.handler.Metric() metric, err := p.handler.Metric()
if err != nil { if err != nil {
p.handler.Reset()
return nil, err return nil, err
} }
@ -126,10 +135,93 @@ func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
} }
for _, m := range metrics { for _, m := range metrics {
p.applyDefaultTagsSingle(m)
}
}
func (p *Parser) applyDefaultTagsSingle(metric telegraf.Metric) {
for k, v := range p.DefaultTags { for k, v := range p.DefaultTags {
if !m.HasTag(k) { if !metric.HasTag(k) {
m.AddTag(k, v) metric.AddTag(k, v)
} }
} }
} }
// StreamParser is an InfluxDB Line Protocol parser. It is not safe for
// concurrent use in multiple goroutines.
type StreamParser struct {
machine *streamMachine
handler *MetricHandler
}
func NewStreamParser(r io.Reader) *StreamParser {
handler := NewMetricHandler()
return &StreamParser{
machine: NewStreamMachine(r, handler),
handler: handler,
}
}
// SetTimeFunc changes the function used to determine the time of metrics
// without a timestamp. The default TimeFunc is time.Now. Useful mostly for
// testing, or perhaps if you want all metrics to have the same timestamp.
func (h *StreamParser) SetTimeFunc(f TimeFunc) {
h.handler.SetTimeFunc(f)
}
func (h *StreamParser) SetTimePrecision(u time.Duration) {
h.handler.SetTimePrecision(u)
}
// Next parses the next item from the stream. You can repeat calls to this
// function until it returns EOF.
func (p *StreamParser) Next() (telegraf.Metric, error) {
err := p.machine.Next()
if err == EOF {
return nil, EOF
}
if err != nil {
return nil, &ParseError{
Offset: p.machine.Position(),
LineOffset: p.machine.LineOffset(),
LineNumber: p.machine.LineNumber(),
Column: p.machine.Column(),
msg: err.Error(),
buf: p.machine.LineText(),
}
}
metric, err := p.handler.Metric()
if err != nil {
return nil, err
}
return metric, nil
}
// Position returns the current byte offset into the data.
func (p *StreamParser) Position() int {
return p.machine.Position()
}
// LineOffset returns the byte offset of the current line.
func (p *StreamParser) LineOffset() int {
return p.machine.LineOffset()
}
// LineNumber returns the current line number. Lines are counted based on the
// regular expression `\r?\n`.
func (p *StreamParser) LineNumber() int {
return p.machine.LineNumber()
}
// Column returns the current column.
func (p *StreamParser) Column() int {
return p.machine.Column()
}
// LineText returns the text of the current line that has been parsed so far.
func (p *StreamParser) LineText() string {
return p.machine.LineText()
} }

View File

@ -1,6 +1,7 @@
package influx package influx
import ( import (
"bytes"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -8,6 +9,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -26,7 +28,6 @@ var ptests = []struct {
name string name string
input []byte input []byte
timeFunc func() time.Time timeFunc func() time.Time
precision time.Duration
metrics []telegraf.Metric metrics []telegraf.Metric
err error err error
}{ }{
@ -495,7 +496,7 @@ var ptests = []struct {
err: nil, err: nil,
}, },
{ {
name: "no timestamp full precision", name: "no timestamp",
input: []byte("cpu value=42"), input: []byte("cpu value=42"),
timeFunc: func() time.Time { timeFunc: func() time.Time {
return time.Unix(42, 123456789) return time.Unix(42, 123456789)
@ -514,27 +515,6 @@ var ptests = []struct {
}, },
err: nil, err: nil,
}, },
{
name: "no timestamp partial precision",
input: []byte("cpu value=42"),
timeFunc: func() time.Time {
return time.Unix(42, 123456789)
},
precision: 1 * time.Millisecond,
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 123000000),
),
),
},
err: nil,
},
{ {
name: "multiple lines", name: "multiple lines",
input: []byte("cpu value=42\ncpu value=42"), input: []byte("cpu value=42\ncpu value=42"),
@ -651,14 +631,11 @@ func TestParser(t *testing.T) {
for _, tt := range ptests { for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
handler := NewMetricHandler() handler := NewMetricHandler()
handler.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
handler.SetTimeFunc(tt.timeFunc)
}
if tt.precision > 0 {
handler.SetTimePrecision(tt.precision)
}
parser := NewParser(handler) parser := NewParser(handler)
parser.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
metrics, err := parser.Parse(tt.input) metrics, err := parser.Parse(tt.input)
require.Equal(t, tt.err, err) require.Equal(t, tt.err, err)
@ -688,12 +665,39 @@ func BenchmarkParser(b *testing.B) {
} }
} }
func TestStreamParser(t *testing.T) {
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
r := bytes.NewBuffer(tt.input)
parser := NewStreamParser(r)
parser.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
var i int
for {
m, err := parser.Next()
if err != nil {
if err == EOF {
break
}
require.Equal(t, tt.err, err)
break
}
testutil.RequireMetricEqual(t, tt.metrics[i], m)
i++
}
})
}
}
func TestSeriesParser(t *testing.T) { func TestSeriesParser(t *testing.T) {
var tests = []struct { var tests = []struct {
name string name string
input []byte input []byte
timeFunc func() time.Time timeFunc func() time.Time
precision time.Duration
metrics []telegraf.Metric metrics []telegraf.Metric
err error err error
}{ }{
@ -749,14 +753,10 @@ func TestSeriesParser(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
handler := NewMetricHandler() handler := NewMetricHandler()
handler.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
handler.SetTimeFunc(tt.timeFunc)
}
if tt.precision > 0 {
handler.SetTimePrecision(tt.precision)
}
parser := NewSeriesParser(handler) parser := NewSeriesParser(handler)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
metrics, err := parser.Parse(tt.input) metrics, err := parser.Parse(tt.input)
require.Equal(t, tt.err, err) require.Equal(t, tt.err, err)
@ -791,6 +791,11 @@ func TestParserErrorString(t *testing.T) {
input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"), input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"),
errString: "metric parse error: expected field at 1:2054: \"cpu " + strings.Repeat("ab", maxErrorBufferSize)[:maxErrorBufferSize-4] + "...\"", errString: "metric parse error: expected field at 1:2054: \"cpu " + strings.Repeat("ab", maxErrorBufferSize)[:maxErrorBufferSize-4] + "...\"",
}, },
{
name: "multiple line error",
input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42\ncpu value=invalid"),
errString: `metric parse error: expected field at 2:11: "cpu value=invalid"`,
},
} }
for _, tt := range ptests { for _, tt := range ptests {
@ -803,3 +808,64 @@ func TestParserErrorString(t *testing.T) {
}) })
} }
} }
func TestStreamParserErrorString(t *testing.T) {
var ptests = []struct {
name string
input []byte
errs []string
}{
{
name: "multiple line error",
input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42"),
errs: []string{
`metric parse error: expected field at 2:11: "cpu value="`,
},
},
{
name: "handler error",
input: []byte("cpu value=9223372036854775808i\ncpu value=42"),
errs: []string{
`metric parse error: value out of range at 1:31: "cpu value=9223372036854775808i"`,
},
},
{
name: "buffer too long",
input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"),
errs: []string{
"metric parse error: expected field at 1:2054: \"cpu " + strings.Repeat("ab", maxErrorBufferSize)[:maxErrorBufferSize-4] + "...\"",
},
},
{
name: "multiple errors",
input: []byte("foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4.0"),
errs: []string{
`metric parse error: expected field at 1:12: "foo value=1"`,
`metric parse error: expected field at 3:12: "foo value=3"`,
},
},
}
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
parser := NewStreamParser(bytes.NewBuffer(tt.input))
var errs []error
for i := 0; i < 20; i++ {
_, err := parser.Next()
if err == EOF {
break
}
if err != nil {
errs = append(errs, err)
}
}
require.Equal(t, len(tt.errs), len(errs))
for i, err := range errs {
require.Equal(t, tt.errs[i], err.Error())
}
})
}
}