Improve the InfluxDB through-put performance
This changes the current use of the InfluxDB client to instead use a baked-in client that uses the fasthttp library. This allows for significantly smaller allocations, the re-use of http body buffers, and the re-use of the actual bytes of the line-protocol metric representations.
This commit is contained in:
parent
a8b1fe6f00
commit
f5d892d7d3
2
Godeps
2
Godeps
|
@ -50,6 +50,8 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8
|
||||||
github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
|
github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
|
||||||
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
||||||
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
||||||
|
github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7
|
||||||
|
github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074
|
||||||
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
|
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
|
||||||
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
|
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
|
||||||
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
||||||
|
|
|
@ -19,8 +19,15 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Metric interface {
|
type Metric interface {
|
||||||
|
// Serialize serializes the metric into a line-protocol byte buffer,
|
||||||
|
// including a newline at the end.
|
||||||
Serialize() []byte
|
Serialize() []byte
|
||||||
String() string // convenience function for string(Serialize())
|
// same as Serialize, but avoids an allocation.
|
||||||
|
// returns number of bytes copied into dst.
|
||||||
|
SerializeTo(dst []byte) int
|
||||||
|
// String is the same as Serialize, but returns a string.
|
||||||
|
String() string
|
||||||
|
// Copy deep-copies the metric.
|
||||||
Copy() Metric
|
Copy() Metric
|
||||||
// Split will attempt to return multiple metrics with the same timestamp
|
// Split will attempt to return multiple metrics with the same timestamp
|
||||||
// whose string representations are no longer than maxSize.
|
// whose string representations are no longer than maxSize.
|
||||||
|
|
|
@ -178,6 +178,48 @@ func (m *metric) Serialize() []byte {
|
||||||
return tmp
|
return tmp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *metric) SerializeTo(dst []byte) int {
|
||||||
|
i := 0
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
i += copy(dst[i:], m.name)
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
i += copy(dst[i:], m.tags)
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
dst[i] = ' '
|
||||||
|
i++
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
i += copy(dst[i:], m.fields)
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
dst[i] = ' '
|
||||||
|
i++
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
i += copy(dst[i:], m.t)
|
||||||
|
if i >= len(dst) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
dst[i] = '\n'
|
||||||
|
|
||||||
|
return i + 1
|
||||||
|
}
|
||||||
|
|
||||||
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
||||||
if m.Len() < maxSize {
|
if m.Len() < maxSize {
|
||||||
return []telegraf.Metric{m}
|
return []telegraf.Metric{m}
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
package metric
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type state int
|
||||||
|
|
||||||
|
const (
|
||||||
|
_ state = iota
|
||||||
|
// normal state copies whole metrics into the given buffer until we can't
|
||||||
|
// fit the next metric.
|
||||||
|
normal
|
||||||
|
// split state means that we have a metric that we were able to split, so
|
||||||
|
// that we can fit it into multiple metrics (and calls to Read)
|
||||||
|
split
|
||||||
|
// overflow state means that we have a metric that didn't fit into a single
|
||||||
|
// buffer, and needs to be split across multiple calls to Read.
|
||||||
|
overflow
|
||||||
|
// splitOverflow state means that a split metric didn't fit into a single
|
||||||
|
// buffer, and needs to be split across multiple calls to Read.
|
||||||
|
splitOverflow
|
||||||
|
// done means we're done reading metrics, and now always return (0, io.EOF)
|
||||||
|
done
|
||||||
|
)
|
||||||
|
|
||||||
|
type reader struct {
|
||||||
|
metrics []telegraf.Metric
|
||||||
|
splitMetrics []telegraf.Metric
|
||||||
|
buf []byte
|
||||||
|
state state
|
||||||
|
|
||||||
|
// metric index
|
||||||
|
iM int
|
||||||
|
// split metric index
|
||||||
|
iSM int
|
||||||
|
// buffer index
|
||||||
|
iB int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReader(metrics []telegraf.Metric) io.Reader {
|
||||||
|
return &reader{
|
||||||
|
metrics: metrics,
|
||||||
|
state: normal,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *reader) Read(p []byte) (n int, err error) {
|
||||||
|
var i int
|
||||||
|
switch r.state {
|
||||||
|
case done:
|
||||||
|
return 0, io.EOF
|
||||||
|
case normal:
|
||||||
|
for {
|
||||||
|
// this for-loop is the sunny-day scenario, where we are given a
|
||||||
|
// buffer that is large enough to hold at least a single metric.
|
||||||
|
// all of the cases below it are edge-cases.
|
||||||
|
if r.metrics[r.iM].Len() < len(p[i:]) {
|
||||||
|
i += r.metrics[r.iM].SerializeTo(p[i:])
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.iM++
|
||||||
|
if r.iM == len(r.metrics) {
|
||||||
|
r.state = done
|
||||||
|
return i, io.EOF
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we haven't written any bytes, check if we can split the current
|
||||||
|
// metric into multiple full metrics at a smaller size.
|
||||||
|
if i == 0 {
|
||||||
|
tmp := r.metrics[r.iM].Split(len(p))
|
||||||
|
if len(tmp) > 1 {
|
||||||
|
r.splitMetrics = tmp
|
||||||
|
r.state = split
|
||||||
|
if r.splitMetrics[0].Len() < len(p) {
|
||||||
|
i += r.splitMetrics[0].SerializeTo(p)
|
||||||
|
r.iSM = 1
|
||||||
|
} else {
|
||||||
|
// splitting didn't quite work, so we'll drop down and
|
||||||
|
// overflow the metric.
|
||||||
|
r.state = normal
|
||||||
|
r.iSM = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we haven't written any bytes and we're not at the end of the metrics
|
||||||
|
// slice, then it means we have a single metric that is larger than the
|
||||||
|
// provided buffer.
|
||||||
|
if i == 0 {
|
||||||
|
r.buf = r.metrics[r.iM].Serialize()
|
||||||
|
i += copy(p, r.buf[r.iB:])
|
||||||
|
r.iB += i
|
||||||
|
r.state = overflow
|
||||||
|
}
|
||||||
|
|
||||||
|
case split:
|
||||||
|
if r.splitMetrics[r.iSM].Len() < len(p) {
|
||||||
|
// write the current split metric
|
||||||
|
i += r.splitMetrics[r.iSM].SerializeTo(p)
|
||||||
|
r.iSM++
|
||||||
|
if r.iSM >= len(r.splitMetrics) {
|
||||||
|
// done writing the current split metrics
|
||||||
|
r.iSM = 0
|
||||||
|
r.iM++
|
||||||
|
if r.iM == len(r.metrics) {
|
||||||
|
r.state = done
|
||||||
|
return i, io.EOF
|
||||||
|
}
|
||||||
|
r.state = normal
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// This would only happen if we split the metric, and then a
|
||||||
|
// subsequent buffer was smaller than the initial one given,
|
||||||
|
// so that our split metric no longer fits.
|
||||||
|
r.buf = r.splitMetrics[r.iSM].Serialize()
|
||||||
|
i += copy(p, r.buf[r.iB:])
|
||||||
|
r.iB += i
|
||||||
|
r.state = splitOverflow
|
||||||
|
}
|
||||||
|
|
||||||
|
case splitOverflow:
|
||||||
|
i = copy(p, r.buf[r.iB:])
|
||||||
|
r.iB += i
|
||||||
|
if r.iB >= len(r.buf) {
|
||||||
|
r.iB = 0
|
||||||
|
r.iSM++
|
||||||
|
if r.iSM == len(r.splitMetrics) {
|
||||||
|
r.iM++
|
||||||
|
r.state = normal
|
||||||
|
} else {
|
||||||
|
r.state = split
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case overflow:
|
||||||
|
i = copy(p, r.buf[r.iB:])
|
||||||
|
r.iB += i
|
||||||
|
if r.iB >= len(r.buf) {
|
||||||
|
r.iB = 0
|
||||||
|
r.iM++
|
||||||
|
if r.iM == len(r.metrics) {
|
||||||
|
r.state = done
|
||||||
|
return i, io.EOF
|
||||||
|
}
|
||||||
|
r.state = normal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return i, nil
|
||||||
|
}
|
|
@ -0,0 +1,487 @@
|
||||||
|
package metric
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"regexp"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkMetricReader(b *testing.B) {
|
||||||
|
metrics := make([]telegraf.Metric, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
metrics[i], _ = New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"value": int64(1)}, time.Now())
|
||||||
|
}
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
r := NewReader(metrics)
|
||||||
|
io.Copy(ioutil.Discard, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricReader(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
metrics := make([]telegraf.Metric, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
metrics[i], _ = New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"value": int64(1)}, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
|
||||||
|
buf := make([]byte, 35)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
assert.True(t, err == io.EOF, err.Error())
|
||||||
|
}
|
||||||
|
assert.Equal(t, 33, n)
|
||||||
|
assert.Equal(t, "foo value=1i 1481032190000000000\n", string(buf[0:n]))
|
||||||
|
}
|
||||||
|
|
||||||
|
// reader should now be done, and always return 0, io.EOF
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
assert.True(t, err == io.EOF, err.Error())
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricReader_OverflowMetric(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"value": int64(10)}, ts)
|
||||||
|
metrics := []telegraf.Metric{m}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 5)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
exp string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"foo v",
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"alue=",
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"10i 1",
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"48103",
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"21900",
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"00000",
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"000\n",
|
||||||
|
io.EOF,
|
||||||
|
4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
assert.Equal(t, test.n, n)
|
||||||
|
assert.Equal(t, test.exp, string(buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"value": int64(10)}, ts)
|
||||||
|
metrics := []telegraf.Metric{m, m.Copy()}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 10)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
exp string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"foo value=",
|
||||||
|
nil,
|
||||||
|
10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"10i 148103",
|
||||||
|
nil,
|
||||||
|
10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"2190000000",
|
||||||
|
nil,
|
||||||
|
10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"000\n",
|
||||||
|
nil,
|
||||||
|
4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"foo value=",
|
||||||
|
nil,
|
||||||
|
10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"10i 148103",
|
||||||
|
nil,
|
||||||
|
10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"2190000000",
|
||||||
|
nil,
|
||||||
|
10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"000\n",
|
||||||
|
io.EOF,
|
||||||
|
4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
assert.Equal(t, test.n, n)
|
||||||
|
assert.Equal(t, test.exp, string(buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test splitting a metric
|
||||||
|
func TestMetricReader_SplitMetric(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
"value2": int64(10),
|
||||||
|
"value3": int64(10),
|
||||||
|
"value4": int64(10),
|
||||||
|
"value5": int64(10),
|
||||||
|
"value6": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 60)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
expRegex string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
|
||||||
|
nil,
|
||||||
|
57,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
|
||||||
|
io.EOF,
|
||||||
|
57,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
assert.Equal(t, test.n, n)
|
||||||
|
re := regexp.MustCompile(test.expRegex)
|
||||||
|
assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test an array with one split metric and one unsplit
|
||||||
|
func TestMetricReader_SplitMetric2(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
"value2": int64(10),
|
||||||
|
"value3": int64(10),
|
||||||
|
"value4": int64(10),
|
||||||
|
"value5": int64(10),
|
||||||
|
"value6": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
m2, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
metrics := []telegraf.Metric{m1, m2}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 60)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
expRegex string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
|
||||||
|
nil,
|
||||||
|
57,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
|
||||||
|
nil,
|
||||||
|
57,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value1=10i 1481032190000000000\n`,
|
||||||
|
io.EOF,
|
||||||
|
35,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
assert.Equal(t, test.n, n)
|
||||||
|
re := regexp.MustCompile(test.expRegex)
|
||||||
|
assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test split that results in metrics that are still too long, which results in
|
||||||
|
// the reader falling back to regular overflow.
|
||||||
|
func TestMetricReader_SplitMetricTooLong(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
"value2": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 30)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
expRegex string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
`foo value\d=10i,value\d=10i 1481`,
|
||||||
|
nil,
|
||||||
|
30,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`032190000000000\n`,
|
||||||
|
io.EOF,
|
||||||
|
16,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
assert.Equal(t, test.n, n)
|
||||||
|
re := regexp.MustCompile(test.expRegex)
|
||||||
|
assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test split with a changing buffer size in the middle of subsequent calls
|
||||||
|
// to Read
|
||||||
|
func TestMetricReader_SplitMetricChangingBuffer(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
"value2": int64(10),
|
||||||
|
"value3": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
m2, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
metrics := []telegraf.Metric{m1, m2}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
expRegex string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
buf []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
`foo value\d=10i 1481032190000000000\n`,
|
||||||
|
nil,
|
||||||
|
35,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value\d=10i 148103219000000`,
|
||||||
|
nil,
|
||||||
|
30,
|
||||||
|
make([]byte, 30),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`0000\n`,
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
make([]byte, 30),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value\d=10i 1481032190000000000\n`,
|
||||||
|
nil,
|
||||||
|
35,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value1=10i 1481032190000000000\n`,
|
||||||
|
io.EOF,
|
||||||
|
35,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(test.buf)
|
||||||
|
assert.Equal(t, test.n, n, test.expRegex)
|
||||||
|
re := regexp.MustCompile(test.expRegex)
|
||||||
|
assert.True(t, re.MatchString(string(test.buf[0:n])), string(test.buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err, test.expRegex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test split with a changing buffer size in the middle of subsequent calls
|
||||||
|
// to Read
|
||||||
|
func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
"value2": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
m2, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value1": int64(10),
|
||||||
|
},
|
||||||
|
ts,
|
||||||
|
)
|
||||||
|
metrics := []telegraf.Metric{m1, m2}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
expRegex string
|
||||||
|
err error
|
||||||
|
n int
|
||||||
|
buf []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
`foo value\d=10i 1481032190000000000\n`,
|
||||||
|
nil,
|
||||||
|
35,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value\d=10i 148103219000000`,
|
||||||
|
nil,
|
||||||
|
30,
|
||||||
|
make([]byte, 30),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`0000\n`,
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
make([]byte, 30),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
`foo value1=10i 1481032190000000000\n`,
|
||||||
|
io.EOF,
|
||||||
|
35,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
io.EOF,
|
||||||
|
0,
|
||||||
|
make([]byte, 36),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
n, err := r.Read(test.buf)
|
||||||
|
assert.Equal(t, test.n, n, test.expRegex)
|
||||||
|
re := regexp.MustCompile(test.expRegex)
|
||||||
|
assert.True(t, re.MatchString(string(test.buf[0:n])), string(test.buf[0:n]))
|
||||||
|
assert.Equal(t, test.err, err, test.expRegex)
|
||||||
|
}
|
||||||
|
}
|
|
@ -300,6 +300,9 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
||||||
|
if !bytes.HasSuffix(b, []byte("\n")) {
|
||||||
|
b = append(b, '\n')
|
||||||
|
}
|
||||||
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
type Client interface {
|
||||||
|
Query(command string) error
|
||||||
|
|
||||||
|
Write(b []byte) (int, error)
|
||||||
|
WriteWithParams(b []byte, params WriteParams) (int, error)
|
||||||
|
|
||||||
|
WriteStream(b io.Reader, contentLength int) (int, error)
|
||||||
|
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
|
||||||
|
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteParams struct {
|
||||||
|
Database string
|
||||||
|
RetentionPolicy string
|
||||||
|
Precision string
|
||||||
|
Consistency string
|
||||||
|
}
|
|
@ -0,0 +1,258 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultRequestTimeout = time.Second * 5
|
||||||
|
)
|
||||||
|
|
||||||
|
//
|
||||||
|
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
|
||||||
|
// validate required parameters:
|
||||||
|
if len(config.URL) == 0 {
|
||||||
|
return nil, fmt.Errorf("config.URL is required to create an HTTP client")
|
||||||
|
}
|
||||||
|
if len(defaultWP.Database) == 0 {
|
||||||
|
return nil, fmt.Errorf("A default database is required to create an HTTP client")
|
||||||
|
}
|
||||||
|
|
||||||
|
// set defaults:
|
||||||
|
if config.Timeout == 0 {
|
||||||
|
config.Timeout = defaultRequestTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse URL:
|
||||||
|
u, err := url.Parse(config.URL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parsing config.URL: %s", err)
|
||||||
|
}
|
||||||
|
if u.Scheme != "http" && u.Scheme != "https" {
|
||||||
|
return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
|
||||||
|
}
|
||||||
|
|
||||||
|
wu := writeURL(u, defaultWP)
|
||||||
|
return &httpClient{
|
||||||
|
writeURL: []byte(wu),
|
||||||
|
config: config,
|
||||||
|
url: u,
|
||||||
|
client: &fasthttp.Client{
|
||||||
|
TLSConfig: config.TLSConfig,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type HTTPConfig struct {
|
||||||
|
// URL should be of the form "http://host:port" (REQUIRED)
|
||||||
|
URL string
|
||||||
|
|
||||||
|
// UserAgent sets the User-Agent header.
|
||||||
|
UserAgent string
|
||||||
|
|
||||||
|
// Timeout is the time to wait for a response to each HTTP request (writes
|
||||||
|
// and queries).
|
||||||
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// Username is the basic auth username for the server.
|
||||||
|
Username string
|
||||||
|
// Password is the basic auth password for the server.
|
||||||
|
Password string
|
||||||
|
|
||||||
|
// TLSConfig is the tls auth settings to use for each request.
|
||||||
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
|
// Gzip, if true, compresses each payload using gzip.
|
||||||
|
// TODO
|
||||||
|
// Gzip bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
// ignore Results:
|
||||||
|
Results []interface{} `json:"-"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// Returns nil if no errors occurred on any statements.
|
||||||
|
func (r *Response) Error() error {
|
||||||
|
if r.Err != "" {
|
||||||
|
return fmt.Errorf(r.Err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpClient struct {
|
||||||
|
writeURL []byte
|
||||||
|
config HTTPConfig
|
||||||
|
client *fasthttp.Client
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) Query(command string) error {
|
||||||
|
req := c.makeRequest()
|
||||||
|
req.Header.SetRequestURI(queryURL(c.url, command))
|
||||||
|
|
||||||
|
return c.doRequest(req, fasthttp.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) Write(b []byte) (int, error) {
|
||||||
|
req := c.makeWriteRequest(len(b), c.writeURL)
|
||||||
|
req.SetBody(b)
|
||||||
|
|
||||||
|
err := c.doRequest(req, fasthttp.StatusNoContent)
|
||||||
|
if err == nil {
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||||
|
req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp)))
|
||||||
|
req.SetBody(b)
|
||||||
|
|
||||||
|
err := c.doRequest(req, fasthttp.StatusNoContent)
|
||||||
|
if err == nil {
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
|
req := c.makeWriteRequest(contentLength, c.writeURL)
|
||||||
|
req.SetBodyStream(r, contentLength)
|
||||||
|
|
||||||
|
err := c.doRequest(req, fasthttp.StatusNoContent)
|
||||||
|
if err == nil {
|
||||||
|
return contentLength, nil
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) WriteStreamWithParams(
|
||||||
|
r io.Reader,
|
||||||
|
contentLength int,
|
||||||
|
wp WriteParams,
|
||||||
|
) (int, error) {
|
||||||
|
req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp)))
|
||||||
|
req.SetBodyStream(r, contentLength)
|
||||||
|
|
||||||
|
err := c.doRequest(req, fasthttp.StatusNoContent)
|
||||||
|
if err == nil {
|
||||||
|
return contentLength, nil
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) doRequest(
|
||||||
|
req *fasthttp.Request,
|
||||||
|
expectedCode int,
|
||||||
|
) error {
|
||||||
|
resp := fasthttp.AcquireResponse()
|
||||||
|
|
||||||
|
err := c.client.DoTimeout(req, resp, c.config.Timeout)
|
||||||
|
|
||||||
|
code := resp.StatusCode()
|
||||||
|
// If it's a "no content" response, then release and return nil
|
||||||
|
if code == fasthttp.StatusNoContent {
|
||||||
|
fasthttp.ReleaseResponse(resp)
|
||||||
|
fasthttp.ReleaseRequest(req)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// not a "no content" response, so parse the result:
|
||||||
|
var response Response
|
||||||
|
decErr := json.Unmarshal(resp.Body(), &response)
|
||||||
|
|
||||||
|
// If we got a JSON decode error, send that back
|
||||||
|
if decErr != nil {
|
||||||
|
err = fmt.Errorf("Unable to decode json: received status code %d err: %s", code, decErr)
|
||||||
|
}
|
||||||
|
// Unexpected response code OR error in JSON response body overrides
|
||||||
|
// a JSON decode error:
|
||||||
|
if code != expectedCode || response.Error() != nil {
|
||||||
|
err = fmt.Errorf("Response Error: Status Code [%d], expected [%d], [%v]",
|
||||||
|
code, expectedCode, response.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
fasthttp.ReleaseResponse(resp)
|
||||||
|
fasthttp.ReleaseRequest(req)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) makeWriteRequest(
|
||||||
|
contentLength int,
|
||||||
|
writeURL []byte,
|
||||||
|
) *fasthttp.Request {
|
||||||
|
req := c.makeRequest()
|
||||||
|
req.Header.SetContentLength(contentLength)
|
||||||
|
req.Header.SetRequestURIBytes(writeURL)
|
||||||
|
// TODO
|
||||||
|
// if gzip {
|
||||||
|
// req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
|
||||||
|
// }
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) makeRequest() *fasthttp.Request {
|
||||||
|
req := fasthttp.AcquireRequest()
|
||||||
|
req.Header.SetContentTypeBytes([]byte("text/plain"))
|
||||||
|
req.Header.SetMethodBytes([]byte("POST"))
|
||||||
|
req.Header.SetUserAgent(c.config.UserAgent)
|
||||||
|
if c.config.Username != "" && c.config.Password != "" {
|
||||||
|
req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password))
|
||||||
|
}
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) Close() error {
|
||||||
|
// Nothing to do.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeURL(u *url.URL, wp WriteParams) string {
|
||||||
|
params := url.Values{}
|
||||||
|
params.Set("db", wp.Database)
|
||||||
|
if wp.RetentionPolicy != "" {
|
||||||
|
params.Set("rp", wp.RetentionPolicy)
|
||||||
|
}
|
||||||
|
if wp.Precision != "n" && wp.Precision != "" {
|
||||||
|
params.Set("precision", wp.Precision)
|
||||||
|
}
|
||||||
|
if wp.Consistency != "one" && wp.Consistency != "" {
|
||||||
|
params.Set("consistency", wp.Consistency)
|
||||||
|
}
|
||||||
|
|
||||||
|
u.RawQuery = params.Encode()
|
||||||
|
u.Path = "write"
|
||||||
|
return u.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func queryURL(u *url.URL, command string) string {
|
||||||
|
params := url.Values{}
|
||||||
|
params.Set("q", command)
|
||||||
|
|
||||||
|
u.RawQuery = params.Encode()
|
||||||
|
u.Path = "query"
|
||||||
|
return u.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt
|
||||||
|
// "To receive authorization, the httpClient sends the userid and password,
|
||||||
|
// separated by a single colon (":") character, within a base64
|
||||||
|
// encoded string in the credentials."
|
||||||
|
// It is not meant to be urlencoded.
|
||||||
|
func basicAuth(username, password string) string {
|
||||||
|
auth := username + ":" + password
|
||||||
|
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||||
|
}
|
|
@ -0,0 +1,343 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHTTPClient_Write(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
// test form values:
|
||||||
|
if r.FormValue("db") != "test" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
|
||||||
|
}
|
||||||
|
if r.FormValue("rp") != "policy" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"wrong rp name"}`)
|
||||||
|
}
|
||||||
|
if r.FormValue("precision") != "ns" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"wrong precision"}`)
|
||||||
|
}
|
||||||
|
if r.FormValue("consistency") != "all" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"wrong consistency"}`)
|
||||||
|
}
|
||||||
|
// test that user agent is set properly
|
||||||
|
if r.UserAgent() != "test-agent" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"wrong agent name"}`)
|
||||||
|
}
|
||||||
|
// test basic auth params
|
||||||
|
user, pass, ok := r.BasicAuth()
|
||||||
|
if !ok {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth not set"}`)
|
||||||
|
}
|
||||||
|
if user != "test-user" || pass != "test-password" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth incorrect"}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate Content-Length Header
|
||||||
|
if r.ContentLength != 13 {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
msg := fmt.Sprintf(`{"results":[{}],"error":"Content-Length: expected [13], got [%d]"}`, r.ContentLength)
|
||||||
|
fmt.Fprintln(w, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the request body:
|
||||||
|
buf := make([]byte, 100)
|
||||||
|
n, _ := r.Body.Read(buf)
|
||||||
|
expected := "cpu value=99"
|
||||||
|
got := string(buf[0 : n-1])
|
||||||
|
if expected != got {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
|
||||||
|
fmt.Fprintln(w, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
config := HTTPConfig{
|
||||||
|
URL: ts.URL,
|
||||||
|
UserAgent: "test-agent",
|
||||||
|
Username: "test-user",
|
||||||
|
Password: "test-password",
|
||||||
|
}
|
||||||
|
wp := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
RetentionPolicy: "policy",
|
||||||
|
Precision: "ns",
|
||||||
|
Consistency: "all",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, wp)
|
||||||
|
defer client.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
n, err := client.Write([]byte("cpu value=99\n"))
|
||||||
|
assert.Equal(t, 13, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
// test that database is set properly
|
||||||
|
if r.FormValue("db") != "override" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the request body:
|
||||||
|
buf := make([]byte, 100)
|
||||||
|
n, _ := r.Body.Read(buf)
|
||||||
|
expected := "cpu value=99"
|
||||||
|
got := string(buf[0 : n-1])
|
||||||
|
if expected != got {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
|
||||||
|
fmt.Fprintln(w, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
config := HTTPConfig{
|
||||||
|
URL: ts.URL,
|
||||||
|
}
|
||||||
|
defaultWP := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, defaultWP)
|
||||||
|
defer client.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// test that WriteWithParams overrides the default write params
|
||||||
|
wp := WriteParams{
|
||||||
|
Database: "override",
|
||||||
|
}
|
||||||
|
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
|
||||||
|
assert.Equal(t, 13, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPClient_Write_Errors(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
config := HTTPConfig{
|
||||||
|
URL: ts.URL,
|
||||||
|
}
|
||||||
|
defaultWP := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, defaultWP)
|
||||||
|
defer client.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
lp := []byte("cpu value=99\n")
|
||||||
|
n, err := client.Write(lp)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
n, err = client.WriteStream(bytes.NewReader(lp), 13)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
wp := WriteParams{
|
||||||
|
Database: "override",
|
||||||
|
}
|
||||||
|
n, err = client.WriteWithParams(lp, wp)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewHTTPErrors(t *testing.T) {
|
||||||
|
// No URL:
|
||||||
|
config := HTTPConfig{}
|
||||||
|
defaultWP := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, defaultWP)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Nil(t, client)
|
||||||
|
|
||||||
|
// No Database:
|
||||||
|
config = HTTPConfig{
|
||||||
|
URL: "http://localhost:8086",
|
||||||
|
}
|
||||||
|
defaultWP = WriteParams{}
|
||||||
|
client, err = NewHTTP(config, defaultWP)
|
||||||
|
assert.Nil(t, client)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
// Invalid URL:
|
||||||
|
config = HTTPConfig{
|
||||||
|
URL: "http://192.168.0.%31:8080/",
|
||||||
|
}
|
||||||
|
defaultWP = WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err = NewHTTP(config, defaultWP)
|
||||||
|
assert.Nil(t, client)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
// Invalid URL scheme:
|
||||||
|
config = HTTPConfig{
|
||||||
|
URL: "mailto://localhost:8086",
|
||||||
|
}
|
||||||
|
defaultWP = WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err = NewHTTP(config, defaultWP)
|
||||||
|
assert.Nil(t, client)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPClient_Query(t *testing.T) {
|
||||||
|
command := "CREATE DATABASE test"
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
case "/query":
|
||||||
|
// validate the create database command is correct
|
||||||
|
got := r.FormValue("q")
|
||||||
|
if got != command {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
msg := fmt.Sprintf(`{"results":[{}],"error":"got %s, expected %s"}`, got, command)
|
||||||
|
fmt.Fprintln(w, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
config := HTTPConfig{
|
||||||
|
URL: ts.URL,
|
||||||
|
}
|
||||||
|
defaultWP := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, defaultWP)
|
||||||
|
defer client.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = client.Query(command)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPClient_Query_ResponseError(t *testing.T) {
|
||||||
|
command := "CREATE DATABASE test"
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
msg := fmt.Sprintf(`{"results":[{}],"error":"couldnt create database"}`)
|
||||||
|
fmt.Fprintln(w, msg)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
config := HTTPConfig{
|
||||||
|
URL: ts.URL,
|
||||||
|
}
|
||||||
|
defaultWP := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, defaultWP)
|
||||||
|
defer client.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = client.Query(command)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPClient_Query_JSONDecodeError(t *testing.T) {
|
||||||
|
command := "CREATE DATABASE test"
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
// write JSON missing a ']'
|
||||||
|
msg := fmt.Sprintf(`{"results":[{}}`)
|
||||||
|
fmt.Fprintln(w, msg)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
config := HTTPConfig{
|
||||||
|
URL: ts.URL,
|
||||||
|
}
|
||||||
|
defaultWP := WriteParams{
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
client, err := NewHTTP(config, defaultWP)
|
||||||
|
defer client.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = client.Query(command)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "json")
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// UDPPayloadSize is a reasonable default payload size for UDP packets that
|
||||||
|
// could be travelling over the internet.
|
||||||
|
UDPPayloadSize = 512
|
||||||
|
)
|
||||||
|
|
||||||
|
// UDPConfig is the config data needed to create a UDP Client
|
||||||
|
type UDPConfig struct {
|
||||||
|
// URL should be of the form "udp://host:port"
|
||||||
|
// or "udp://[ipv6-host%zone]:port".
|
||||||
|
URL string
|
||||||
|
|
||||||
|
// PayloadSize is the maximum size of a UDP client message, optional
|
||||||
|
// Tune this based on your network. Defaults to UDPPayloadSize.
|
||||||
|
PayloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUDP(config UDPConfig) (Client, error) {
|
||||||
|
p, err := url.Parse(config.URL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error parsing UDP url [%s]: %s", config.URL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
udpAddr, err := net.ResolveUDPAddr("udp", p.Host)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error resolving UDP Address [%s]: %s", p.Host, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error dialing UDP address [%s]: %s",
|
||||||
|
udpAddr.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
size := config.PayloadSize
|
||||||
|
if size == 0 {
|
||||||
|
size = UDPPayloadSize
|
||||||
|
}
|
||||||
|
buf := make([]byte, size)
|
||||||
|
return &udpClient{conn: conn, buffer: buf}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type udpClient struct {
|
||||||
|
conn *net.UDPConn
|
||||||
|
buffer []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *udpClient) Query(command string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *udpClient) Write(b []byte) (int, error) {
|
||||||
|
return c.WriteStream(bytes.NewReader(b), -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// write params are ignored by the UDP client
|
||||||
|
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||||
|
return c.WriteStream(bytes.NewReader(b), -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// contentLength is ignored by the UDP client.
|
||||||
|
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
|
var totaln int
|
||||||
|
for {
|
||||||
|
nR, err := r.Read(c.buffer)
|
||||||
|
if nR == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != io.EOF && err != nil {
|
||||||
|
return totaln, err
|
||||||
|
}
|
||||||
|
nW, err := c.conn.Write(c.buffer[0:nR])
|
||||||
|
totaln += nW
|
||||||
|
if err != nil {
|
||||||
|
return totaln, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return totaln, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// contentLength is ignored by the UDP client.
|
||||||
|
// write params are ignored by the UDP client
|
||||||
|
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
|
||||||
|
return c.WriteStream(r, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *udpClient) Close() error {
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
|
@ -0,0 +1,163 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUDPClient(t *testing.T) {
|
||||||
|
config := UDPConfig{
|
||||||
|
URL: "udp://localhost:8089",
|
||||||
|
}
|
||||||
|
client, err := NewUDP(config)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = client.Query("ANY QUERY RETURNS NIL")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NoError(t, client.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewUDPClient_Errors(t *testing.T) {
|
||||||
|
// url.Parse Error
|
||||||
|
config := UDPConfig{
|
||||||
|
URL: "udp://localhost%35:8089",
|
||||||
|
}
|
||||||
|
_, err := NewUDP(config)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
// ResolveUDPAddr Error
|
||||||
|
config = UDPConfig{
|
||||||
|
URL: "udp://localhost:999999",
|
||||||
|
}
|
||||||
|
_, err = NewUDP(config)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUDPClient_Write(t *testing.T) {
|
||||||
|
config := UDPConfig{
|
||||||
|
URL: "udp://localhost:8199",
|
||||||
|
}
|
||||||
|
client, err := NewUDP(config)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
packets := make(chan string, 100)
|
||||||
|
address, err := net.ResolveUDPAddr("udp", "localhost:8199")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
listener, err := net.ListenUDP("udp", address)
|
||||||
|
defer listener.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
go func() {
|
||||||
|
buf := make([]byte, 200)
|
||||||
|
for {
|
||||||
|
n, _, err := listener.ReadFromUDP(buf)
|
||||||
|
if err != nil {
|
||||||
|
packets <- err.Error()
|
||||||
|
}
|
||||||
|
packets <- string(buf[0:n])
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// test sending simple metric
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
n, err := client.Write([]byte("cpu value=99\n"))
|
||||||
|
assert.Equal(t, n, 13)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pkt := <-packets
|
||||||
|
assert.Equal(t, "cpu value=99\n", pkt)
|
||||||
|
|
||||||
|
metrics := `cpu value=99
|
||||||
|
cpu value=55
|
||||||
|
cpu value=44
|
||||||
|
cpu value=101
|
||||||
|
cpu value=91
|
||||||
|
cpu value=92
|
||||||
|
`
|
||||||
|
// test sending packet with 6 metrics in a stream.
|
||||||
|
reader := bytes.NewReader([]byte(metrics))
|
||||||
|
// contentLength is ignored:
|
||||||
|
n, err = client.WriteStream(reader, 10)
|
||||||
|
assert.Equal(t, n, len(metrics))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
|
||||||
|
|
||||||
|
//
|
||||||
|
// Test that UDP packets get broken up properly:
|
||||||
|
config2 := UDPConfig{
|
||||||
|
URL: "udp://localhost:8199",
|
||||||
|
PayloadSize: 25,
|
||||||
|
}
|
||||||
|
client2, err := NewUDP(config2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
wp := WriteParams{}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Using Write():
|
||||||
|
buf := []byte(metrics)
|
||||||
|
n, err = client2.WriteWithParams(buf, wp)
|
||||||
|
assert.Equal(t, n, len(metrics))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "=92\n", pkt)
|
||||||
|
|
||||||
|
//
|
||||||
|
// Using WriteStream():
|
||||||
|
reader = bytes.NewReader([]byte(metrics))
|
||||||
|
n, err = client2.WriteStreamWithParams(reader, 10, wp)
|
||||||
|
assert.Equal(t, n, len(metrics))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "=92\n", pkt)
|
||||||
|
|
||||||
|
//
|
||||||
|
// Using WriteStream() & a metric.Reader:
|
||||||
|
config3 := UDPConfig{
|
||||||
|
URL: "udp://localhost:8199",
|
||||||
|
PayloadSize: 40,
|
||||||
|
}
|
||||||
|
client3, err := NewUDP(config3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
now := time.Unix(1484142942, 0)
|
||||||
|
m1, _ := metric.New("test", map[string]string{},
|
||||||
|
map[string]interface{}{"value": 1.1}, now)
|
||||||
|
m2, _ := metric.New("test", map[string]string{},
|
||||||
|
map[string]interface{}{"value": 1.1}, now)
|
||||||
|
m3, _ := metric.New("test", map[string]string{},
|
||||||
|
map[string]interface{}{"value": 1.1}, now)
|
||||||
|
ms := []telegraf.Metric{m1, m2, m3}
|
||||||
|
mReader := metric.NewReader(ms)
|
||||||
|
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
|
||||||
|
// 3 metrics at 35 bytes each (including the newline)
|
||||||
|
assert.Equal(t, 105, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||||
|
|
||||||
|
assert.NoError(t, client.Close())
|
||||||
|
}
|
|
@ -1,19 +1,18 @@
|
||||||
package influxdb
|
package influxdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/url"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/client/v2"
|
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
type InfluxDB struct {
|
type InfluxDB struct {
|
||||||
|
@ -41,7 +40,7 @@ type InfluxDB struct {
|
||||||
// Precision is only here for legacy support. It will be ignored.
|
// Precision is only here for legacy support. It will be ignored.
|
||||||
Precision string
|
Precision string
|
||||||
|
|
||||||
conns []client.Client
|
clients []client.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
@ -88,79 +87,56 @@ func (i *InfluxDB) Connect() error {
|
||||||
urls = append(urls, i.URL)
|
urls = append(urls, i.URL)
|
||||||
}
|
}
|
||||||
|
|
||||||
tlsCfg, err := internal.GetTLSConfig(
|
tlsConfig, err := internal.GetTLSConfig(
|
||||||
i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify)
|
i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var conns []client.Client
|
|
||||||
for _, u := range urls {
|
for _, u := range urls {
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(u, "udp"):
|
case strings.HasPrefix(u, "udp"):
|
||||||
parsed_url, err := url.Parse(u)
|
config := client.UDPConfig{
|
||||||
if err != nil {
|
URL: u,
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if i.UDPPayload == 0 {
|
|
||||||
i.UDPPayload = client.UDPPayloadSize
|
|
||||||
}
|
|
||||||
c, err := client.NewUDPClient(client.UDPConfig{
|
|
||||||
Addr: parsed_url.Host,
|
|
||||||
PayloadSize: i.UDPPayload,
|
PayloadSize: i.UDPPayload,
|
||||||
})
|
c, err := client.NewUDP(config)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
conns = append(conns, c)
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
||||||
|
}
|
||||||
|
i.clients = append(i.clients, c)
|
||||||
default:
|
default:
|
||||||
// If URL doesn't start with "udp", assume HTTP client
|
// If URL doesn't start with "udp", assume HTTP client
|
||||||
c, err := client.NewHTTPClient(client.HTTPConfig{
|
config := client.HTTPConfig{
|
||||||
Addr: u,
|
URL: u,
|
||||||
Username: i.Username,
|
|
||||||
Password: i.Password,
|
|
||||||
UserAgent: i.UserAgent,
|
|
||||||
Timeout: i.Timeout.Duration,
|
Timeout: i.Timeout.Duration,
|
||||||
TLSConfig: tlsCfg,
|
TLSConfig: tlsConfig,
|
||||||
})
|
UserAgent: i.UserAgent,
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
wp := client.WriteParams{
|
||||||
|
Database: i.Database,
|
||||||
|
RetentionPolicy: i.RetentionPolicy,
|
||||||
|
Consistency: i.WriteConsistency,
|
||||||
|
}
|
||||||
|
c, err := client.NewHTTP(config, wp)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error creating HTTP Client [%s]: %s", u, err)
|
||||||
|
}
|
||||||
|
i.clients = append(i.clients, c)
|
||||||
|
|
||||||
err = createDatabase(c, i.Database)
|
err = c.Query("CREATE DATABASE " + i.Database)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("E! Database creation failed: " + err.Error())
|
log.Println("E! Database creation failed: " + err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
conns = append(conns, c)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
i.conns = conns
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createDatabase(c client.Client, database string) error {
|
|
||||||
// Create Database if it doesn't exist
|
|
||||||
_, err := c.Query(client.Query{
|
|
||||||
Command: fmt.Sprintf("CREATE DATABASE \"%s\"", database),
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *InfluxDB) Close() error {
|
func (i *InfluxDB) Close() error {
|
||||||
var errS string
|
|
||||||
for j, _ := range i.conns {
|
|
||||||
if err := i.conns[j].Close(); err != nil {
|
|
||||||
errS += err.Error()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if errS != "" {
|
|
||||||
return fmt.Errorf("output influxdb close failed: %s", errS)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,34 +151,24 @@ func (i *InfluxDB) Description() string {
|
||||||
// Choose a random server in the cluster to write to until a successful write
|
// Choose a random server in the cluster to write to until a successful write
|
||||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
if len(i.conns) == 0 {
|
bufsize := 0
|
||||||
err := i.Connect()
|
for _, m := range metrics {
|
||||||
if err != nil {
|
bufsize += m.Len()
|
||||||
return err
|
r := metric.NewReader(metrics)
|
||||||
}
|
|
||||||
}
|
|
||||||
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
|
||||||
Database: i.Database,
|
|
||||||
RetentionPolicy: i.RetentionPolicy,
|
|
||||||
WriteConsistency: i.WriteConsistency,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, metric := range metrics {
|
|
||||||
bp.AddPoint(metric.Point())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will get set to nil if a successful write occurs
|
// This will get set to nil if a successful write occurs
|
||||||
err = errors.New("Could not write to any InfluxDB server in cluster")
|
err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
|
||||||
|
|
||||||
p := rand.Perm(len(i.conns))
|
p := rand.Perm(len(i.clients))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if e := i.conns[n].Write(bp); e != nil {
|
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
||||||
// If the database was not found, try to recreate it
|
// Log write failure:
|
||||||
|
log.Printf("E! InfluxDB Output Error: %s", e)
|
||||||
|
|
||||||
|
// If the database was not found, try to recreate it:
|
||||||
if strings.Contains(e.Error(), "database not found") {
|
if strings.Contains(e.Error(), "database not found") {
|
||||||
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
|
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
|
||||||
log.Printf("E! Error: Database %s not found and failed to recreate\n",
|
log.Printf("E! Error: Database %s not found and failed to recreate\n",
|
||||||
i.Database)
|
i.Database)
|
||||||
}
|
}
|
||||||
|
@ -225,10 +191,12 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func newInflux() *InfluxDB {
|
||||||
outputs.Add("influxdb", func() telegraf.Output {
|
return &InfluxDB{
|
||||||
return &InfluxDB{
|
Timeout: internal.Duration{Duration: time.Second * 5},
|
||||||
Timeout: internal.Duration{Duration: time.Second * 5},
|
}
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("influxdb", func() telegraf.Output { return newInflux() })
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,22 +20,123 @@ func TestUDPInflux(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = i.Write(testutil.MockMetrics())
|
err = i.Write(testutil.MockMetrics())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, i.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHTTPInflux(t *testing.T) {
|
func TestHTTPInflux(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
switch r.URL.Path {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
case "/write":
|
||||||
fmt.Fprintln(w, `{"results":[{}]}`)
|
// test that database is set properly
|
||||||
|
if r.FormValue("db") != "test" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
}
|
||||||
|
// test that user agent is set properly
|
||||||
|
if r.UserAgent() != "telegraf" {
|
||||||
|
w.WriteHeader(http.StatusTeapot)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||||
|
}
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
i := InfluxDB{
|
i := newInflux()
|
||||||
URLs: []string{ts.URL},
|
i.URLs = []string{ts.URL}
|
||||||
}
|
i.Database = "test"
|
||||||
|
i.UserAgent = "telegraf"
|
||||||
|
|
||||||
err := i.Connect()
|
err := i.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = i.Write(testutil.MockMetrics())
|
err = i.Write(testutil.MockMetrics())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, i.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUDPConnectError(t *testing.T) {
|
||||||
|
i := InfluxDB{
|
||||||
|
URLs: []string{"udp://foobar:8089"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := i.Connect()
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
i = InfluxDB{
|
||||||
|
URLs: []string{"udp://localhost:9999999"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = i.Connect()
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPConnectError_InvalidURL(t *testing.T) {
|
||||||
|
i := InfluxDB{
|
||||||
|
URLs: []string{"http://foobar:8089"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := i.Connect()
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
i = InfluxDB{
|
||||||
|
URLs: []string{"http://localhost:9999999"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = i.Connect()
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPConnectError_DatabaseCreateFail(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"test error"}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
i := InfluxDB{
|
||||||
|
URLs: []string{ts.URL},
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
|
||||||
|
// database creation errors do not return an error from Connect
|
||||||
|
// they are only logged.
|
||||||
|
err := i.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, i.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPError_DatabaseNotFound(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
|
||||||
|
case "/query":
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
i := InfluxDB{
|
||||||
|
URLs: []string{ts.URL},
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
|
||||||
|
err := i.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = i.Write(testutil.MockMetrics())
|
||||||
|
require.Error(t, err)
|
||||||
|
require.NoError(t, i.Close())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue