Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34b7a4c361 | ||
|
|
f46370d982 | ||
|
|
07b7e09749 | ||
|
|
e54795795d | ||
|
|
b2b2bd8a27 | ||
|
|
f96cbb48c7 | ||
|
|
9077cb83bc | ||
|
|
0f188f280f | ||
|
|
b9420e73bd | ||
|
|
1e43e5e7ae | ||
|
|
5e104ad974 | ||
|
|
cc9d8c700c | ||
|
|
b15ec21ba7 | ||
|
|
a9abfe8f08 | ||
|
|
307210242c | ||
|
|
0a41db16f1 | ||
|
|
7480267fd2 | ||
|
|
30949c4596 | ||
|
|
47264bc860 |
@@ -1,4 +1,4 @@
|
||||
## v1.4 [unreleased]
|
||||
## v1.4 [2017-09-05]
|
||||
|
||||
### Release Notes
|
||||
|
||||
@@ -62,6 +62,7 @@
|
||||
- [#2978](https://github.com/influxdata/telegraf/pull/2978): Add gzip content-encoding support to influxdb output.
|
||||
- [#3127](https://github.com/influxdata/telegraf/pull/3127): Allow using system plugin in Windows.
|
||||
- [#3112](https://github.com/influxdata/telegraf/pull/3112): Add tomcat input plugin.
|
||||
- [#3182](https://github.com/influxdata/telegraf/pull/3182): HTTP headers can be added to InfluxDB output.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
@@ -97,6 +98,12 @@
|
||||
- [#3015](https://github.com/influxdata/telegraf/issues/3015): Don't start Telegraf on install in Amazon Linux.
|
||||
- [#3153](https://github.com/influxdata/telegraf/issues/3053): Enable hddtemp input on all platforms.
|
||||
- [#3142](https://github.com/influxdata/telegraf/issues/3142): Escape backslash within string fields.
|
||||
- [#3162](https://github.com/influxdata/telegraf/issues/3162): Fix parsing of SHM remotes in ntpq input
|
||||
- [#3149](https://github.com/influxdata/telegraf/issues/3149): Don't fail parsing zpool stats if pool health is UNAVAIL on FreeBSD.
|
||||
- [#2672](https://github.com/influxdata/telegraf/issues/2672): Fix NSQ input plugin when used with version 1.0.0-compat.
|
||||
- [#2523](https://github.com/influxdata/telegraf/issues/2523): Added CloudWatch metric constraint validation.
|
||||
- [#3179](https://github.com/influxdata/telegraf/issues/3179): Skip non-numerical values in graphite format.
|
||||
- [#3187](https://github.com/influxdata/telegraf/issues/3187): Fix panic when handling string fields with escapes.
|
||||
|
||||
## v1.3.5 [2017-07-26]
|
||||
|
||||
|
||||
@@ -96,6 +96,9 @@ tars.cpu-total.us-east-1.cpu.usage_user 0.89 1455320690
|
||||
tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
|
||||
```
|
||||
|
||||
Fields with string values will be skipped. Boolean fields will be converted
|
||||
to 1 (true) or 0 (false).
|
||||
|
||||
### Graphite Configuration:
|
||||
|
||||
```toml
|
||||
|
||||
@@ -150,12 +150,6 @@ func makemetric(
|
||||
continue
|
||||
}
|
||||
case string:
|
||||
if strings.HasSuffix(val, `\`) {
|
||||
log.Printf("D! Measurement [%s] field [%s] has a value "+
|
||||
"ending with a backslash, skipping", measurement, k)
|
||||
delete(fields, k)
|
||||
continue
|
||||
}
|
||||
fields[k] = v
|
||||
default:
|
||||
fields[k] = v
|
||||
|
||||
@@ -370,16 +370,17 @@ func TestMakeMetric_TrailingSlash(t *testing.T) {
|
||||
expectedTags: map[string]string{},
|
||||
},
|
||||
{
|
||||
name: "Field value with trailing slash dropped",
|
||||
name: "Field value with trailing slash okay",
|
||||
measurement: `cpu`,
|
||||
fields: map[string]interface{}{
|
||||
"value": int64(42),
|
||||
"bad": `xyzzy\`,
|
||||
"ok": `xyzzy\`,
|
||||
},
|
||||
tags: map[string]string{},
|
||||
expectedMeasurement: `cpu`,
|
||||
expectedFields: map[string]interface{}{
|
||||
"value": int64(42),
|
||||
"ok": `xyzzy\`,
|
||||
},
|
||||
expectedTags: map[string]string{},
|
||||
},
|
||||
@@ -387,7 +388,7 @@ func TestMakeMetric_TrailingSlash(t *testing.T) {
|
||||
name: "Must have one field after dropped",
|
||||
measurement: `cpu`,
|
||||
fields: map[string]interface{}{
|
||||
"bad": `xyzzy\`,
|
||||
"bad": math.NaN(),
|
||||
},
|
||||
tags: map[string]string{},
|
||||
expectedNil: true,
|
||||
|
||||
@@ -21,14 +21,14 @@ func New(
|
||||
t time.Time,
|
||||
mType ...telegraf.ValueType,
|
||||
) (telegraf.Metric, error) {
|
||||
if len(fields) == 0 {
|
||||
return nil, fmt.Errorf("Metric cannot be made without any fields")
|
||||
}
|
||||
if len(name) == 0 {
|
||||
return nil, fmt.Errorf("Metric cannot be made with an empty name")
|
||||
return nil, fmt.Errorf("missing measurement name")
|
||||
}
|
||||
if len(fields) == 0 {
|
||||
return nil, fmt.Errorf("%s: must have one or more fields", name)
|
||||
}
|
||||
if strings.HasSuffix(name, `\`) {
|
||||
return nil, fmt.Errorf("Metric cannot have measurement name ending with a backslash")
|
||||
return nil, fmt.Errorf("%s: measurement name cannot end with a backslash", name)
|
||||
}
|
||||
|
||||
var thisType telegraf.ValueType
|
||||
@@ -49,10 +49,10 @@ func New(
|
||||
taglen := 0
|
||||
for k, v := range tags {
|
||||
if strings.HasSuffix(k, `\`) {
|
||||
return nil, fmt.Errorf("Metric cannot have tag key ending with a backslash")
|
||||
return nil, fmt.Errorf("%s: tag key cannot end with a backslash: %s", name, k)
|
||||
}
|
||||
if strings.HasSuffix(v, `\`) {
|
||||
return nil, fmt.Errorf("Metric cannot have tag value ending with a backslash")
|
||||
return nil, fmt.Errorf("%s: tag value cannot end with a backslash: %s", name, v)
|
||||
}
|
||||
|
||||
if len(k) == 0 || len(v) == 0 {
|
||||
@@ -79,7 +79,7 @@ func New(
|
||||
fieldlen := 0
|
||||
for k, _ := range fields {
|
||||
if strings.HasSuffix(k, `\`) {
|
||||
return nil, fmt.Errorf("Metric cannot have field key ending with a backslash")
|
||||
return nil, fmt.Errorf("%s: field key cannot end with a backslash: %s", name, k)
|
||||
}
|
||||
|
||||
// 10 bytes is completely arbitrary, but will at least prevent some
|
||||
@@ -102,7 +102,8 @@ func New(
|
||||
}
|
||||
|
||||
// indexUnescapedByte finds the index of the first byte equal to b in buf that
|
||||
// is not escaped. Returns -1 if not found.
|
||||
// is not escaped. Does not allow the escape char to be escaped. Returns -1 if
|
||||
// not found.
|
||||
func indexUnescapedByte(buf []byte, b byte) int {
|
||||
var keyi int
|
||||
for {
|
||||
@@ -122,6 +123,46 @@ func indexUnescapedByte(buf []byte, b byte) int {
|
||||
return keyi
|
||||
}
|
||||
|
||||
// indexUnescapedByteBackslashEscaping finds the index of the first byte equal
|
||||
// to b in buf that is not escaped. Allows for the escape char `\` to be
|
||||
// escaped. Returns -1 if not found.
|
||||
func indexUnescapedByteBackslashEscaping(buf []byte, b byte) int {
|
||||
var keyi int
|
||||
for {
|
||||
i := bytes.IndexByte(buf[keyi:], b)
|
||||
if i == -1 {
|
||||
return -1
|
||||
} else if i == 0 {
|
||||
break
|
||||
}
|
||||
keyi += i
|
||||
if countBackslashes(buf, keyi-1)%2 == 0 {
|
||||
break
|
||||
} else {
|
||||
keyi++
|
||||
}
|
||||
}
|
||||
return keyi
|
||||
}
|
||||
|
||||
// countBackslashes counts the number of preceding backslashes starting at
|
||||
// the 'start' index.
|
||||
func countBackslashes(buf []byte, index int) int {
|
||||
var count int
|
||||
for {
|
||||
if index < 0 {
|
||||
return count
|
||||
}
|
||||
if buf[index] == '\\' {
|
||||
count++
|
||||
index--
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
type metric struct {
|
||||
name []byte
|
||||
tags []byte
|
||||
@@ -283,7 +324,7 @@ func (m *metric) Fields() map[string]interface{} {
|
||||
// end index of field value
|
||||
var i3 int
|
||||
if m.fields[i:][i2] == '"' {
|
||||
i3 = indexUnescapedByte(m.fields[i:][i2+1:], '"')
|
||||
i3 = indexUnescapedByteBackslashEscaping(m.fields[i:][i2+1:], '"')
|
||||
if i3 == -1 {
|
||||
i3 = len(m.fields[i:])
|
||||
}
|
||||
|
||||
@@ -258,6 +258,7 @@ func TestNewMetric_Fields(t *testing.T) {
|
||||
"quote_string": `x"y`,
|
||||
"backslash_quote_string": `x\"y`,
|
||||
"backslash": `x\y`,
|
||||
"ends_with_backslash": `x\`,
|
||||
}
|
||||
m, err := New("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -39,9 +39,9 @@ The following defaults are known to work with RabbitMQ:
|
||||
## Use SSL but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
|
||||
## Data format to output.
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
```
|
||||
|
||||
@@ -85,10 +85,10 @@ func (a *AMQPConsumer) SampleConfig() string {
|
||||
## Use SSL but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
|
||||
## Data format to output.
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
`
|
||||
}
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
# Fail2ban Plugin
|
||||
# Fail2ban Input Plugin
|
||||
|
||||
The fail2ban plugin gathers counts of failed and banned ip addresses from fail2ban.
|
||||
The fail2ban plugin gathers the count of failed and banned ip addresses using [fail2ban](https://www.fail2ban.org).
|
||||
|
||||
This plugin run fail2ban-client command, and fail2ban-client require root access.
|
||||
You have to grant telegraf to run fail2ban-client:
|
||||
This plugin runs the `fail2ban-client` command which generally requires root access.
|
||||
Acquiring the required permissions can be done using several methods:
|
||||
|
||||
- Run telegraf as root. (deprecate)
|
||||
- Configure sudo to grant telegraf to fail2ban-client.
|
||||
- Use sudo run fail2ban-client.
|
||||
- Run telegraf as root. (not recommended)
|
||||
|
||||
### Using sudo
|
||||
|
||||
You may edit your sudo configuration with the following:
|
||||
|
||||
``` sudo
|
||||
telegraf ALL=(root) NOPASSWD: /usr/bin/fail2ban-client status *
|
||||
telegraf ALL=(root) NOEXEC: NOPASSWD: /usr/bin/fail2ban-client status, /usr/bin/fail2ban-client status *
|
||||
```
|
||||
|
||||
### Configuration:
|
||||
@@ -21,10 +21,7 @@ telegraf ALL=(root) NOPASSWD: /usr/bin/fail2ban-client status *
|
||||
``` toml
|
||||
# Read metrics from fail2ban.
|
||||
[[inputs.fail2ban]]
|
||||
## fail2ban-client require root access.
|
||||
## Setting 'use_sudo' to true will make use of sudo to run fail2ban-client.
|
||||
## Users must configure sudo to allow telegraf user to run fail2ban-client with no password.
|
||||
## This plugin run only "fail2ban-client status".
|
||||
## Use sudo to run fail2ban-client
|
||||
use_sudo = false
|
||||
```
|
||||
|
||||
@@ -38,7 +35,7 @@ telegraf ALL=(root) NOPASSWD: /usr/bin/fail2ban-client status *
|
||||
|
||||
- All measurements have the following tags:
|
||||
- jail
|
||||
|
||||
|
||||
### Example Output:
|
||||
|
||||
```
|
||||
@@ -55,6 +52,5 @@ Status for the jail: sshd
|
||||
```
|
||||
|
||||
```
|
||||
$ ./telegraf --config telegraf.conf --input-filter fail2ban --test
|
||||
fail2ban,jail=sshd failed=5i,banned=2i 1495868667000000000
|
||||
```
|
||||
|
||||
@@ -6,9 +6,10 @@ import (
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
"strconv"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -21,10 +22,7 @@ type Fail2ban struct {
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## fail2ban-client require root access.
|
||||
## Setting 'use_sudo' to true will make use of sudo to run fail2ban-client.
|
||||
## Users must configure sudo to allow telegraf user to run fail2ban-client with no password.
|
||||
## This plugin run only "fail2ban-client status".
|
||||
## Use sudo to run fail2ban-client
|
||||
use_sudo = false
|
||||
`
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ package nsq
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@@ -101,28 +102,42 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
||||
return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status)
|
||||
}
|
||||
|
||||
s := &NSQStats{}
|
||||
err = json.NewDecoder(r.Body).Decode(s)
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error reading body: %s`, err)
|
||||
}
|
||||
|
||||
data := &NSQStatsData{}
|
||||
err = json.Unmarshal(body, data)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||
}
|
||||
// Data was not parsed correctly attempt to use old format.
|
||||
if len(data.Version) < 1 {
|
||||
wrapper := &NSQStats{}
|
||||
err = json.Unmarshal(body, wrapper)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||
}
|
||||
data = &wrapper.Data
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
`server_host`: u.Host,
|
||||
`server_version`: s.Data.Version,
|
||||
`server_version`: data.Version,
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
if s.Data.Health == `OK` {
|
||||
if data.Health == `OK` {
|
||||
fields["server_count"] = int64(1)
|
||||
} else {
|
||||
fields["server_count"] = int64(0)
|
||||
}
|
||||
fields["topic_count"] = int64(len(s.Data.Topics))
|
||||
fields["topic_count"] = int64(len(data.Topics))
|
||||
|
||||
acc.AddFields("nsq_server", fields, tags)
|
||||
for _, t := range s.Data.Topics {
|
||||
topicStats(t, acc, u.Host, s.Data.Version)
|
||||
for _, t := range data.Topics {
|
||||
topicStats(t, acc, u.Host, data.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -189,7 +204,6 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
||||
"server_version": version,
|
||||
"topic": topic,
|
||||
"channel": channel,
|
||||
"client_name": c.Name,
|
||||
"client_id": c.ID,
|
||||
"client_hostname": c.Hostname,
|
||||
"client_version": c.Version,
|
||||
@@ -199,6 +213,9 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
||||
"client_snappy": strconv.FormatBool(c.Snappy),
|
||||
"client_deflate": strconv.FormatBool(c.Deflate),
|
||||
}
|
||||
if len(c.Name) > 0 {
|
||||
tags["client_name"] = c.Name
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"ready_count": c.ReadyCount,
|
||||
@@ -248,7 +265,7 @@ type ChannelStats struct {
|
||||
}
|
||||
|
||||
type ClientStats struct {
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name"` // DEPRECATED 1.x+, still here as the structs are currently being shared for parsing v3.x and 1.x
|
||||
ID string `json:"client_id"`
|
||||
Hostname string `json:"hostname"`
|
||||
Version string `json:"version"`
|
||||
|
||||
@@ -12,10 +12,267 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNSQStats(t *testing.T) {
|
||||
func TestNSQStatsV1(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, response)
|
||||
fmt.Fprintln(w, responseV1)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
n := &NSQ{
|
||||
Endpoints: []string{ts.URL},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err := acc.GatherError(n.Gather)
|
||||
require.NoError(t, err)
|
||||
|
||||
u, err := url.Parse(ts.URL)
|
||||
require.NoError(t, err)
|
||||
host := u.Host
|
||||
|
||||
// actually validate the tests
|
||||
tests := []struct {
|
||||
m string
|
||||
f map[string]interface{}
|
||||
g map[string]string
|
||||
}{
|
||||
{
|
||||
"nsq_server",
|
||||
map[string]interface{}{
|
||||
"server_count": int64(1),
|
||||
"topic_count": int64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_topic",
|
||||
map[string]interface{}{
|
||||
"depth": int64(12),
|
||||
"backend_depth": int64(13),
|
||||
"message_count": int64(14),
|
||||
"channel_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t1"},
|
||||
},
|
||||
{
|
||||
"nsq_channel",
|
||||
map[string]interface{}{
|
||||
"depth": int64(0),
|
||||
"backend_depth": int64(1),
|
||||
"inflight_count": int64(2),
|
||||
"deferred_count": int64(3),
|
||||
"message_count": int64(4),
|
||||
"requeue_count": int64(5),
|
||||
"timeout_count": int64(6),
|
||||
"client_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t1",
|
||||
"channel": "c1",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_client",
|
||||
map[string]interface{}{
|
||||
"ready_count": int64(200),
|
||||
"inflight_count": int64(7),
|
||||
"message_count": int64(8),
|
||||
"finish_count": int64(9),
|
||||
"requeue_count": int64(10),
|
||||
},
|
||||
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
|
||||
"topic": "t1", "channel": "c1",
|
||||
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
|
||||
"client_version": "V2", "client_address": "172.17.0.11:35560",
|
||||
"client_tls": "false", "client_snappy": "false",
|
||||
"client_deflate": "false",
|
||||
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
|
||||
},
|
||||
{
|
||||
"nsq_topic",
|
||||
map[string]interface{}{
|
||||
"depth": int64(28),
|
||||
"backend_depth": int64(29),
|
||||
"message_count": int64(30),
|
||||
"channel_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t2"},
|
||||
},
|
||||
{
|
||||
"nsq_channel",
|
||||
map[string]interface{}{
|
||||
"depth": int64(15),
|
||||
"backend_depth": int64(16),
|
||||
"inflight_count": int64(17),
|
||||
"deferred_count": int64(18),
|
||||
"message_count": int64(19),
|
||||
"requeue_count": int64(20),
|
||||
"timeout_count": int64(21),
|
||||
"client_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t2",
|
||||
"channel": "c2",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_client",
|
||||
map[string]interface{}{
|
||||
"ready_count": int64(22),
|
||||
"inflight_count": int64(23),
|
||||
"message_count": int64(24),
|
||||
"finish_count": int64(25),
|
||||
"requeue_count": int64(26),
|
||||
},
|
||||
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
|
||||
"topic": "t2", "channel": "c2",
|
||||
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
|
||||
"client_version": "V2", "client_address": "172.17.0.8:48145",
|
||||
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
|
||||
"client_snappy": "true", "client_deflate": "true"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
|
||||
}
|
||||
}
|
||||
|
||||
// v1 version of localhost/stats?format=json reesponse body
|
||||
var responseV1 = `
|
||||
{
|
||||
"version": "1.0.0-compat",
|
||||
"health": "OK",
|
||||
"start_time": 1452021674,
|
||||
"topics": [
|
||||
{
|
||||
"topic_name": "t1",
|
||||
"channels": [
|
||||
{
|
||||
"channel_name": "c1",
|
||||
"depth": 0,
|
||||
"backend_depth": 1,
|
||||
"in_flight_count": 2,
|
||||
"deferred_count": 3,
|
||||
"message_count": 4,
|
||||
"requeue_count": 5,
|
||||
"timeout_count": 6,
|
||||
"clients": [
|
||||
{
|
||||
"client_id": "373a715cd990",
|
||||
"hostname": "373a715cd990",
|
||||
"version": "V2",
|
||||
"remote_address": "172.17.0.11:35560",
|
||||
"state": 3,
|
||||
"ready_count": 200,
|
||||
"in_flight_count": 7,
|
||||
"message_count": 8,
|
||||
"finish_count": 9,
|
||||
"requeue_count": 10,
|
||||
"connect_ts": 1452021675,
|
||||
"sample_rate": 11,
|
||||
"deflate": false,
|
||||
"snappy": false,
|
||||
"user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5",
|
||||
"tls": false,
|
||||
"tls_cipher_suite": "",
|
||||
"tls_version": "",
|
||||
"tls_negotiated_protocol": "",
|
||||
"tls_negotiated_protocol_is_mutual": false
|
||||
}
|
||||
],
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"depth": 12,
|
||||
"backend_depth": 13,
|
||||
"message_count": 14,
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"topic_name": "t2",
|
||||
"channels": [
|
||||
{
|
||||
"channel_name": "c2",
|
||||
"depth": 15,
|
||||
"backend_depth": 16,
|
||||
"in_flight_count": 17,
|
||||
"deferred_count": 18,
|
||||
"message_count": 19,
|
||||
"requeue_count": 20,
|
||||
"timeout_count": 21,
|
||||
"clients": [
|
||||
{
|
||||
"client_id": "377569bd462b",
|
||||
"hostname": "377569bd462b",
|
||||
"version": "V2",
|
||||
"remote_address": "172.17.0.8:48145",
|
||||
"state": 3,
|
||||
"ready_count": 22,
|
||||
"in_flight_count": 23,
|
||||
"message_count": 24,
|
||||
"finish_count": 25,
|
||||
"requeue_count": 26,
|
||||
"connect_ts": 1452021678,
|
||||
"sample_rate": 27,
|
||||
"deflate": true,
|
||||
"snappy": true,
|
||||
"user_agent": "go-nsq\/1.0.5",
|
||||
"tls": true,
|
||||
"tls_cipher_suite": "",
|
||||
"tls_version": "",
|
||||
"tls_negotiated_protocol": "",
|
||||
"tls_negotiated_protocol_is_mutual": false
|
||||
}
|
||||
],
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"depth": 28,
|
||||
"backend_depth": 29,
|
||||
"message_count": 30,
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
`
|
||||
|
||||
// TestNSQStatsPreV1 is for backwards compatibility with nsq versions < 1.0
|
||||
func TestNSQStatsPreV1(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, responsePreV1)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -152,7 +409,7 @@ func TestNSQStats(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
var response = `
|
||||
var responsePreV1 = `
|
||||
{
|
||||
"status_code": 200,
|
||||
"status_txt": "OK",
|
||||
|
||||
@@ -69,7 +69,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
||||
// Due to problems with a parsing, we have to use regexp expression in order
|
||||
// to remove string that starts from '(' and ends with space
|
||||
// see: https://github.com/influxdata/telegraf/issues/2386
|
||||
reg, err := regexp.Compile("\\([\\S]*")
|
||||
reg, err := regexp.Compile("\\s+\\([\\S]*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -260,6 +260,57 @@ func TestParserNTPQ(t *testing.T) {
|
||||
}
|
||||
acc := testutil.Accumulator{}
|
||||
assert.NoError(t, acc.GatherError(n.Gather))
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"poll": int64(64),
|
||||
"when": int64(60),
|
||||
"reach": int64(377),
|
||||
"delay": float64(0.0),
|
||||
"offset": float64(0.045),
|
||||
"jitter": float64(1.012),
|
||||
}
|
||||
tags := map[string]string{
|
||||
"remote": "SHM(0)",
|
||||
"state_prefix": "*",
|
||||
"refid": ".PPS.",
|
||||
"stratum": "1",
|
||||
"type": "u",
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
|
||||
|
||||
fields = map[string]interface{}{
|
||||
"poll": int64(128),
|
||||
"when": int64(121),
|
||||
"reach": int64(377),
|
||||
"delay": float64(0.0),
|
||||
"offset": float64(10.105),
|
||||
"jitter": float64(2.012),
|
||||
}
|
||||
tags = map[string]string{
|
||||
"remote": "SHM(1)",
|
||||
"state_prefix": "-",
|
||||
"refid": ".GPS.",
|
||||
"stratum": "1",
|
||||
"type": "u",
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
|
||||
|
||||
fields = map[string]interface{}{
|
||||
"poll": int64(1024),
|
||||
"when": int64(10),
|
||||
"reach": int64(377),
|
||||
"delay": float64(1.748),
|
||||
"offset": float64(0.373),
|
||||
"jitter": float64(0.101),
|
||||
}
|
||||
tags = map[string]string{
|
||||
"remote": "37.58.57.238",
|
||||
"state_prefix": "+",
|
||||
"refid": "192.53.103.103",
|
||||
"stratum": "2",
|
||||
"type": "u",
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
|
||||
}
|
||||
|
||||
func TestMultiNTPQ(t *testing.T) {
|
||||
@@ -480,7 +531,9 @@ var multiNTPQ = ` remote refid st t when poll reach delay
|
||||
`
|
||||
var multiParserNTPQ = ` remote refid st t when poll reach delay offset jitter
|
||||
==============================================================================
|
||||
*SHM(0) .PPS. 1 u 60 64 377 0.000 0.045 1.012
|
||||
+37.58.57.238 (d 192.53.103.103 2 u 10 1024 377 1.748 0.373 0.101
|
||||
+37.58.57.238 (domain) 192.53.103.103 2 u 10 1024 377 1.748 0.373 0.101
|
||||
+37.58.57.238 ( 192.53.103.103 2 u 10 1024 377 1.748 0.373 0.101
|
||||
-SHM(1) .GPS. 1 u 121 128 377 0.000 10.105 2.012
|
||||
`
|
||||
|
||||
@@ -33,41 +33,48 @@ func (z *Zfs) gatherPoolStats(acc telegraf.Accumulator) (string, error) {
|
||||
tags := map[string]string{"pool": col[0], "health": col[8]}
|
||||
fields := map[string]interface{}{}
|
||||
|
||||
size, err := strconv.ParseInt(col[1], 10, 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing size: %s", err)
|
||||
}
|
||||
fields["size"] = size
|
||||
if tags["health"] == "UNAVAIL" {
|
||||
|
||||
alloc, err := strconv.ParseInt(col[2], 10, 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing allocation: %s", err)
|
||||
}
|
||||
fields["allocated"] = alloc
|
||||
fields["size"] = int64(0)
|
||||
|
||||
free, err := strconv.ParseInt(col[3], 10, 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing free: %s", err)
|
||||
}
|
||||
fields["free"] = free
|
||||
} else {
|
||||
|
||||
frag, err := strconv.ParseInt(strings.TrimSuffix(col[5], "%"), 10, 0)
|
||||
if err != nil { // This might be - for RO devs
|
||||
frag = 0
|
||||
}
|
||||
fields["fragmentation"] = frag
|
||||
size, err := strconv.ParseInt(col[1], 10, 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing size: %s", err)
|
||||
}
|
||||
fields["size"] = size
|
||||
|
||||
capval, err := strconv.ParseInt(col[6], 10, 0)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing capacity: %s", err)
|
||||
}
|
||||
fields["capacity"] = capval
|
||||
alloc, err := strconv.ParseInt(col[2], 10, 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing allocation: %s", err)
|
||||
}
|
||||
fields["allocated"] = alloc
|
||||
|
||||
dedup, err := strconv.ParseFloat(strings.TrimSuffix(col[7], "x"), 32)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing dedupratio: %s", err)
|
||||
free, err := strconv.ParseInt(col[3], 10, 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing free: %s", err)
|
||||
}
|
||||
fields["free"] = free
|
||||
|
||||
frag, err := strconv.ParseInt(strings.TrimSuffix(col[5], "%"), 10, 0)
|
||||
if err != nil { // This might be - for RO devs
|
||||
frag = 0
|
||||
}
|
||||
fields["fragmentation"] = frag
|
||||
|
||||
capval, err := strconv.ParseInt(col[6], 10, 0)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing capacity: %s", err)
|
||||
}
|
||||
fields["capacity"] = capval
|
||||
|
||||
dedup, err := strconv.ParseFloat(strings.TrimSuffix(col[7], "x"), 32)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing dedupratio: %s", err)
|
||||
}
|
||||
fields["dedupratio"] = dedup
|
||||
}
|
||||
fields["dedupratio"] = dedup
|
||||
|
||||
acc.AddFields("zfs_pool", fields, tags)
|
||||
}
|
||||
|
||||
@@ -22,6 +22,15 @@ func mock_zpool() ([]string, error) {
|
||||
return zpool_output, nil
|
||||
}
|
||||
|
||||
// $ zpool list -Hp
|
||||
var zpool_output_unavail = []string{
|
||||
"temp2 - - - - - - - UNAVAIL -",
|
||||
}
|
||||
|
||||
func mock_zpool_unavail() ([]string, error) {
|
||||
return zpool_output_unavail, nil
|
||||
}
|
||||
|
||||
// sysctl -q kstat.zfs.misc.arcstats
|
||||
|
||||
// sysctl -q kstat.zfs.misc.vdev_cache_stats
|
||||
@@ -82,6 +91,41 @@ func TestZfsPoolMetrics(t *testing.T) {
|
||||
acc.AssertContainsTaggedFields(t, "zfs_pool", poolMetrics, tags)
|
||||
}
|
||||
|
||||
func TestZfsPoolMetrics_unavail(t *testing.T) {
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
z := &Zfs{
|
||||
KstatMetrics: []string{"vdev_cache_stats"},
|
||||
sysctl: mock_sysctl,
|
||||
zpool: mock_zpool_unavail,
|
||||
}
|
||||
err := z.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.False(t, acc.HasMeasurement("zfs_pool"))
|
||||
acc.Metrics = nil
|
||||
|
||||
z = &Zfs{
|
||||
KstatMetrics: []string{"vdev_cache_stats"},
|
||||
PoolMetrics: true,
|
||||
sysctl: mock_sysctl,
|
||||
zpool: mock_zpool_unavail,
|
||||
}
|
||||
err = z.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
//one pool, UNAVAIL
|
||||
tags := map[string]string{
|
||||
"pool": "temp2",
|
||||
"health": "UNAVAIL",
|
||||
}
|
||||
|
||||
poolMetrics := getTemp2PoolMetrics()
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "zfs_pool", poolMetrics, tags)
|
||||
}
|
||||
|
||||
func TestZfsGeneratesMetrics(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
|
||||
@@ -128,6 +172,12 @@ func getFreeNasBootPoolMetrics() map[string]interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
func getTemp2PoolMetrics() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"size": int64(0),
|
||||
}
|
||||
}
|
||||
|
||||
func getKstatMetricsVdevOnly() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"vdev_cache_stats_misses": int64(87789),
|
||||
|
||||
@@ -193,6 +193,25 @@ func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
|
||||
continue
|
||||
}
|
||||
|
||||
// Do CloudWatch boundary checking
|
||||
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
|
||||
if math.IsNaN(value) {
|
||||
datums = datums[:len(datums)-1]
|
||||
continue
|
||||
}
|
||||
if math.IsInf(value, 0) {
|
||||
datums = datums[:len(datums)-1]
|
||||
continue
|
||||
}
|
||||
if value > 0 && value < float64(8.515920e-109) {
|
||||
datums = datums[:len(datums)-1]
|
||||
continue
|
||||
}
|
||||
if value > float64(1.174271e+108) {
|
||||
datums = datums[:len(datums)-1]
|
||||
continue
|
||||
}
|
||||
|
||||
datums[i] = &cloudwatch.MetricDatum{
|
||||
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")),
|
||||
Value: aws.Float64(value),
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
@@ -51,22 +53,32 @@ func TestBuildDimensions(t *testing.T) {
|
||||
func TestBuildMetricDatums(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
zero := 0.0
|
||||
validMetrics := []telegraf.Metric{
|
||||
testutil.TestMetric(1),
|
||||
testutil.TestMetric(int32(1)),
|
||||
testutil.TestMetric(int64(1)),
|
||||
testutil.TestMetric(float64(1)),
|
||||
testutil.TestMetric(float64(0)),
|
||||
testutil.TestMetric(math.Copysign(zero, -1)), // the CW documentation does not call out -0 as rejected
|
||||
testutil.TestMetric(float64(8.515920e-109)),
|
||||
testutil.TestMetric(float64(1.174271e+108)), // largest should be 1.174271e+108
|
||||
testutil.TestMetric(true),
|
||||
}
|
||||
|
||||
invalidMetrics := []telegraf.Metric{
|
||||
testutil.TestMetric("Foo"),
|
||||
testutil.TestMetric(math.Log(-1.0)),
|
||||
testutil.TestMetric(float64(8.515919e-109)), // smallest should be 8.515920e-109
|
||||
testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108
|
||||
}
|
||||
for _, point := range validMetrics {
|
||||
datums := BuildMetricDatum(point)
|
||||
assert.Equal(1, len(datums), "Valid type should create a Datum")
|
||||
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
|
||||
}
|
||||
for _, point := range invalidMetrics {
|
||||
datums := BuildMetricDatum(point)
|
||||
assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
|
||||
}
|
||||
|
||||
nonValidPoint := testutil.TestMetric("Foo")
|
||||
|
||||
assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum")
|
||||
}
|
||||
|
||||
func TestPartitionDatums(t *testing.T) {
|
||||
@@ -78,10 +90,13 @@ func TestPartitionDatums(t *testing.T) {
|
||||
Value: aws.Float64(1),
|
||||
}
|
||||
|
||||
zeroDatum := []*cloudwatch.MetricDatum{}
|
||||
oneDatum := []*cloudwatch.MetricDatum{&testDatum}
|
||||
twoDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum}
|
||||
threeDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum, &testDatum}
|
||||
|
||||
assert.Equal([][]*cloudwatch.MetricDatum{}, PartitionDatums(2, zeroDatum))
|
||||
assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
||||
assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
||||
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
|
||||
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum))
|
||||
|
||||
@@ -44,6 +44,9 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
|
||||
## HTTP Proxy Config
|
||||
# http_proxy = "http://corporate.proxy:3128"
|
||||
|
||||
## Optional HTTP headers
|
||||
# http_headers = {"X-Special-Header" = "Special-Value"}
|
||||
|
||||
## Compress each HTTP request payload using GZIP.
|
||||
# content_encoding = "gzip"
|
||||
```
|
||||
@@ -70,4 +73,5 @@ to write to. Each URL should start with either `http://` or `udp://`
|
||||
* `ssl_key`: SSL key
|
||||
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
|
||||
* `http_proxy`: HTTP Proxy URI
|
||||
* `http_headers`: HTTP headers to add to each HTTP request
|
||||
* `content_encoding`: Compress each HTTP request payload using gzip if set to: "gzip"
|
||||
|
||||
@@ -68,6 +68,8 @@ func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
type HTTPHeaders map[string]string
|
||||
|
||||
type HTTPConfig struct {
|
||||
// URL should be of the form "http://host:port" (REQUIRED)
|
||||
URL string
|
||||
@@ -95,6 +97,9 @@ type HTTPConfig struct {
|
||||
// Proxy URL should be of the form "http://host:port"
|
||||
HTTPProxy string
|
||||
|
||||
// HTTP headers to append to HTTP requests.
|
||||
HTTPHeaders HTTPHeaders
|
||||
|
||||
// The content encoding mechanism to use for each request.
|
||||
ContentEncoding string
|
||||
}
|
||||
@@ -253,6 +258,11 @@ func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for header, value := range c.config.HTTPHeaders {
|
||||
req.Header.Set(header, value)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "text/plain")
|
||||
req.Header.Set("User-Agent", c.config.UserAgent)
|
||||
if c.config.Username != "" && c.config.Password != "" {
|
||||
|
||||
@@ -55,6 +55,13 @@ func TestHTTPClient_Write(t *testing.T) {
|
||||
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth incorrect"}`)
|
||||
}
|
||||
|
||||
// test that user-specified http header is set properly
|
||||
if r.Header.Get("X-Test-Header") != "Test-Value" {
|
||||
w.WriteHeader(http.StatusTeapot)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `{"results":[{}],"error":"wrong http header value"}`)
|
||||
}
|
||||
|
||||
// Validate Content-Length Header
|
||||
if r.ContentLength != 13 {
|
||||
w.WriteHeader(http.StatusTeapot)
|
||||
@@ -90,6 +97,9 @@ func TestHTTPClient_Write(t *testing.T) {
|
||||
UserAgent: "test-agent",
|
||||
Username: "test-user",
|
||||
Password: "test-password",
|
||||
HTTPHeaders: HTTPHeaders{
|
||||
"X-Test-Header": "Test-Value",
|
||||
},
|
||||
}
|
||||
wp := WriteParams{
|
||||
Database: "test",
|
||||
|
||||
@@ -32,9 +32,10 @@ type InfluxDB struct {
|
||||
RetentionPolicy string
|
||||
WriteConsistency string
|
||||
Timeout internal.Duration
|
||||
UDPPayload int `toml:"udp_payload"`
|
||||
HTTPProxy string `toml:"http_proxy"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
UDPPayload int `toml:"udp_payload"`
|
||||
HTTPProxy string `toml:"http_proxy"`
|
||||
HTTPHeaders map[string]string `toml:"http_headers"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
|
||||
// Path to CA file
|
||||
SSLCA string `toml:"ssl_ca"`
|
||||
@@ -89,6 +90,9 @@ var sampleConfig = `
|
||||
## HTTP Proxy Config
|
||||
# http_proxy = "http://corporate.proxy:3128"
|
||||
|
||||
## Optional HTTP headers
|
||||
# http_headers = {"X-Special-Header" = "Special-Value"}
|
||||
|
||||
## Compress each HTTP request payload using GZIP.
|
||||
# content_encoding = "gzip"
|
||||
`
|
||||
@@ -132,8 +136,12 @@ func (i *InfluxDB) Connect() error {
|
||||
Username: i.Username,
|
||||
Password: i.Password,
|
||||
HTTPProxy: i.HTTPProxy,
|
||||
HTTPHeaders: client.HTTPHeaders{},
|
||||
ContentEncoding: i.ContentEncoding,
|
||||
}
|
||||
for header, value := range i.HTTPHeaders {
|
||||
config.HTTPHeaders[header] = value
|
||||
}
|
||||
wp := client.WriteParams{
|
||||
Database: i.Database,
|
||||
RetentionPolicy: i.RetentionPolicy,
|
||||
|
||||
@@ -32,13 +32,22 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||
}
|
||||
|
||||
for fieldName, value := range metric.Fields() {
|
||||
// Convert value to string
|
||||
valueS := fmt.Sprintf("%#v", value)
|
||||
point := []byte(fmt.Sprintf("%s %s %d\n",
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
continue
|
||||
case bool:
|
||||
if v {
|
||||
value = 1
|
||||
} else {
|
||||
value = 0
|
||||
}
|
||||
}
|
||||
metricString := fmt.Sprintf("%s %#v %d\n",
|
||||
// insert "field" section of template
|
||||
sanitizedChars.Replace(InsertField(bucket, fieldName)),
|
||||
sanitizedChars.Replace(valueS),
|
||||
timestamp))
|
||||
value,
|
||||
timestamp)
|
||||
point := []byte(metricString)
|
||||
out = append(out, point...)
|
||||
}
|
||||
return out, nil
|
||||
|
||||
@@ -165,6 +165,58 @@ func TestSerializeValueField2(t *testing.T) {
|
||||
assert.Equal(t, expS, mS)
|
||||
}
|
||||
|
||||
func TestSerializeValueString(t *testing.T) {
|
||||
now := time.Now()
|
||||
tags := map[string]string{
|
||||
"host": "localhost",
|
||||
"cpu": "cpu0",
|
||||
"datacenter": "us-west-2",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"value": "asdasd",
|
||||
}
|
||||
m, err := metric.New("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
s := GraphiteSerializer{
|
||||
Template: "host.field.tags.measurement",
|
||||
}
|
||||
buf, _ := s.Serialize(m)
|
||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "", mS[0])
|
||||
}
|
||||
|
||||
func TestSerializeValueBoolean(t *testing.T) {
|
||||
now := time.Now()
|
||||
tags := map[string]string{
|
||||
"host": "localhost",
|
||||
"cpu": "cpu0",
|
||||
"datacenter": "us-west-2",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"enabled": true,
|
||||
"disabled": false,
|
||||
}
|
||||
m, err := metric.New("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
s := GraphiteSerializer{
|
||||
Template: "host.field.tags.measurement",
|
||||
}
|
||||
buf, _ := s.Serialize(m)
|
||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||
assert.NoError(t, err)
|
||||
|
||||
expS := []string{
|
||||
fmt.Sprintf("localhost.enabled.cpu0.us-west-2.cpu 1 %d", now.Unix()),
|
||||
fmt.Sprintf("localhost.disabled.cpu0.us-west-2.cpu 0 %d", now.Unix()),
|
||||
}
|
||||
sort.Strings(mS)
|
||||
sort.Strings(expS)
|
||||
assert.Equal(t, expS, mS)
|
||||
}
|
||||
|
||||
// test that fields with spaces get fixed.
|
||||
func TestSerializeFieldWithSpaces(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
Reference in New Issue
Block a user