Compare commits

...

63 Commits

Author SHA1 Message Date
Daniel Nelson 7192e68b24
Set 1.3.5 release date
(cherry picked from commit 284ab79a37)
2017-07-26 15:54:45 -07:00
Daniel Nelson ca4234cfd0
Update changelog
(cherry picked from commit ffd1f25b75)
2017-07-25 16:10:34 -07:00
Daniel Nelson e7ce063f8a
Don't match pattern on any error (#3040)
This prevents a pattern with no wildcards from matching in case
permissions is denied.
(cherry picked from commit 31ce98fa91)
2017-07-25 16:08:41 -07:00
DanKans d4f59a1bd7
Fix ntpq parse issue when using dns_lookup (#3026)
(cherry picked from commit d2626f1da6)
2017-07-25 16:07:37 -07:00
Daniel Nelson 2a83f97546
Prevent startup if intervals are 0
(cherry picked from commit d8f7b76253)
2017-07-25 16:07:29 -07:00
Daniel Nelson 99650a20e3
Update changelog
(cherry picked from commit 1658404cea)
2017-07-25 15:47:38 -07:00
Daniel Nelson d4536ad29e
Fix prometheus output cannot be reloaded (#3053)
(cherry picked from commit 82ea04f188)
2017-07-25 15:46:54 -07:00
Daniel Nelson 7bbd3daa98
Add release date for 1.3.4
(cherry picked from commit d9d1ca5a46)
2017-07-12 17:16:31 -07:00
Daniel Nelson ebea9f16df
Update changelog
(cherry picked from commit 2c10806fef)
2017-07-12 12:05:11 -07:00
Daniel Nelson b7205e9a6c
Prevent Write from being called concurrently (#3011)
(cherry picked from commit 5d2c093105)
2017-07-12 12:05:01 -07:00
Daniel Nelson 8ea07a9678
Update changelog
(cherry picked from commit f68bab1667)
2017-07-11 15:56:25 -07:00
Daniel Nelson aef603889f
Do not allow metrics with trailing slashes (#3007)
It is not possible to encode a measurement, tag, or field whose last
character is a backslash due to it being an unescapable character.
Because the tight coupling between line protocol and the internal metric
model, prevent metrics like this from being created.

Measurements with a trailing slash are not allowed and the point will be
dropped.  Tags and fields with a trailing a slash will be dropped from
the point.
(cherry picked from commit 1388e2cf92)
2017-07-11 15:56:06 -07:00
Daniel Nelson 88c3a4bb4e
Update changelog
(cherry picked from commit af318f4959)
2017-07-11 14:11:38 -07:00
JSH c6e2500a84
Fix chrony plugin does not track system time offset (#2989)
(cherry picked from commit 9f244cf1ac)
2017-07-11 14:10:50 -07:00
Daniel Nelson 6751718fb2
Update changelog
(cherry picked from commit 885aa8e6e1)
2017-07-10 19:08:27 -07:00
Daniel Nelson 5b5d562bd4
Fix handling of escapes within fieldset (#3003)
Line protocol does not require or allow escaping of backslash, the only
requirement for a byte to be escaped is if it is an escapable char and
preceeded immediately by a slash.
(cherry picked from commit 945446b36f)
2017-07-10 19:07:50 -07:00
Daniel Nelson 46db92aad3
Set release date for 1.3.3
(cherry picked from commit 75dbf2b0f8)
2017-06-28 13:06:16 -07:00
Daniel Nelson 5fcc5d1bba
Update changelog
(cherry picked from commit f2bb4acd4a)
2017-06-26 15:25:38 -07:00
Bob Shannon 2e6f7055cb
Fix panic in elasticsearch input if cannot determine master (#2954)
(cherry picked from commit a7595c918a)
2017-06-26 15:25:26 -07:00
Daniel Nelson da640a8af7
Update changelog
(cherry picked from commit e028f10586)
2017-06-23 11:06:17 -07:00
Daniel Nelson 1e7750c502
Fix bug parsing default timestamps with modified precision (#2949)
(cherry picked from commit 9276318faf)
2017-06-23 11:06:01 -07:00
Daniel Nelson 0231b3ea0a
Update changelog
(cherry picked from commit 9211985c63)
2017-06-21 12:40:01 -07:00
Daniel Nelson b0ba853395
Remove label value sanitization in prometheus output (#2939)
(cherry picked from commit 929ba0a637)
2017-06-21 12:39:53 -07:00
Daniel Nelson 9722d675bb
Update changelog
(cherry picked from commit a729a44284)
2017-06-16 13:20:43 -07:00
Daniel Nelson a3acfa8163
Allow dos line endings in tail and logparser (#2920)
Parsing dos line ending delimited line protocol is still illegal in most
cases.
(cherry picked from commit 3ecfd32df5)
2017-06-16 13:20:10 -07:00
Daniel Nelson 0f419e9a0d
Update 1.3.2 release date
(cherry picked from commit ca72df5868)
2017-06-14 12:17:44 -07:00
Daniel Nelson 958d689274
Update changelog
(cherry picked from commit ea787b83bf)
2017-06-13 18:07:59 -07:00
Daniel Nelson 2370c04dd7
Ensure prometheus metrics have same set of labels (#2857)
(cherry picked from commit 949072e8dc)
2017-06-13 18:07:34 -07:00
Daniel Nelson 4978c8ccb9
Update changelog
(cherry picked from commit 0c53de6700)
2017-06-08 16:56:10 -07:00
Daniel Nelson abe5e28575
Fix support for mongodb/leofs urls without scheme (#2900)
This was broken by changes in go 1.8 to url.Parse.  This change allows
the string but prompts the user to move to the correct url string.

(cherry picked from commit b277e6e2d7)
2017-06-08 16:55:46 -07:00
Daniel Nelson 34da5a6e45
Update changelog
(cherry picked from commit 84dbf8bb25)
2017-06-07 13:48:58 -07:00
Daniel Nelson 4de1bf29cc
Fix metric splitting edge cases (#2896)
Metrics needing one extra byte to fit the output buffer would not be split, so we would emit lines without a line ending. Metrics which overflowed by exactly one field length would be split one field too late, causing truncated fields.
(cherry picked from commit a275e6792a)
2017-06-07 13:47:58 -07:00
Daniel Nelson f23845afef
Update changelog
(cherry picked from commit a47e6e6efe)
2017-06-05 12:51:58 -07:00
Daniel Nelson 721ed8b3f2
Fix udp metric splitting (#2880)
(cherry picked from commit 5bab4616ff)
2017-06-05 12:50:59 -07:00
Daniel Nelson 385c114d00
Update release date 2017-05-31 14:53:01 -07:00
Daniel Nelson f93615672b
Generate sha256 hashes when packaging
(cherry picked from commit 0b6db905ff)
2017-05-31 12:30:51 -07:00
Daniel Nelson 444f1ba09c
Update changelog
(cherry picked from commit 9529199a44)
2017-05-30 17:41:06 -07:00
Daniel Nelson 5417a12ee3
Fix length calculation of split metric buffer (#2869)
(cherry picked from commit be03abd464)
2017-05-30 17:41:00 -07:00
Daniel Nelson f6c986b5ca
Update changelog
(cherry picked from commit 04aa732e94)
2017-05-30 11:05:56 -07:00
Steve Nardone 7f67bf593f
Fix panic in mongo input (#2848)
(cherry picked from commit e7f9db297e)
2017-05-30 11:05:28 -07:00
Daniel Nelson 7ca902f987
Update changelog
(cherry picked from commit 7d7206b3e2)
2017-05-25 16:24:46 -07:00
Daniel Nelson dc69cacaa1
Update gopsutil version
fixes #2856

(cherry picked from commit 03ca3975b5)
2017-05-25 16:24:33 -07:00
Daniel Nelson 29671154bc
Update changelog
(cherry picked from commit e1088b9eee)
2017-05-25 13:39:55 -07:00
Daniel Nelson 4f8341670e
Fix influxdb output database quoting (#2851)
(cherry picked from commit f47924ffc5)
2017-05-25 13:27:16 -07:00
Daniel Nelson 99edca80ef
Handle process termination during read from /proc (#2816)
Fixes #2815.
(cherry picked from commit c53d9fa9b7)
2017-05-18 18:03:54 -07:00
Daniel Nelson 44654e011c
Reuse transports in input plugins (#2782)
(cherry picked from commit a47aa0dcc2)
2017-05-18 18:02:09 -07:00
Daniel Nelson 389439111b
Fixed sqlserver input to work with case sensitive server collation. (#2749)
Fixed a problem with sqlserver input where database properties are not returned by Telegraf when SQL Server has been set up with a case sensitive server-level collation.

* Added bugfix entry to CHANGELOG.md for sqlserver collation input fix.

(cherry picked from commit e2983383e4)
2017-05-18 17:59:14 -07:00
Daniel Nelson 2bc5594b44
Add release date for 1.3.0 2017-05-15 20:05:22 -07:00
Daniel Nelson 99b53c8745
Add back the changelog entry for 2141 2017-05-15 12:56:11 -07:00
Daniel Nelson 27b89dff48
Only split metrics if there is an udp output (#2799) 2017-05-12 15:34:31 -07:00
Sebastian Borza b16eb6eae6 split metrics based on UDPPayload size (#2795) 2017-05-12 14:42:18 -07:00
Daniel Nelson feaf76913b
Add missing plugins to README 2017-05-09 13:51:26 -07:00
Daniel Nelson ff704fbe0d
Add SLES11 support to rpm package (#2768) 2017-05-05 14:30:31 -07:00
Sébastien ebef47f56a
fix systemd path in order to add compatibility with SuSe (#2499) 2017-05-05 14:30:24 -07:00
Daniel Nelson 18fd2d987d
Return an error if no valid patterns. (#2753) 2017-05-02 14:55:16 -07:00
Alexander Blagoev 5e70cb3e44
Improve redis input documentation (#2708) 2017-05-02 14:12:09 -07:00
Patrick Hemmer ce203dc687
fix close on closed socket_writer (#2748) 2017-05-02 11:07:58 -07:00
Daniel Nelson b0a2e8e1bd
Add initial documentation for rabbitmq input. (#2745) 2017-05-01 18:57:19 -07:00
Daniel Nelson 499495f844
Don't log error creating database on connect (#2740)
closes #2739
2017-04-28 15:59:28 -07:00
Daniel Nelson 20ab8fb2c3
Update telegraf.conf 2017-04-28 13:49:09 -07:00
Daniel Nelson bc474d3a53
Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:48:24 -07:00
Daniel Nelson 547be87d79
Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:43:00 -07:00
Daniel Nelson 619d4d5d29
Use go 1.8.1 for CI and Release builds (#2732) 2017-04-27 16:22:41 -07:00
56 changed files with 1823 additions and 511 deletions

View File

@ -1,4 +1,51 @@
## v1.3 [unreleased]
## v1.3.5 [2017-07-26]
### Bugfixes
- [#3049](https://github.com/influxdata/telegraf/issues/3049): Fix prometheus output cannot be reloaded.
- [#3037](https://github.com/influxdata/telegraf/issues/3037): Fix filestat reporting exists when cannot list directory.
- [#2386](https://github.com/influxdata/telegraf/issues/2386): Fix ntpq parse issue when using dns_lookup.
- [#2554](https://github.com/influxdata/telegraf/issues/2554): Fix panic when agent.interval = "0s".
## v1.3.4 [2017-07-12]
### Bugfixes
- [#3001](https://github.com/influxdata/telegraf/issues/3001): Fix handling of escape characters within fields.
- [#2988](https://github.com/influxdata/telegraf/issues/2988): Fix chrony plugin does not track system time offset.
- [#3004](https://github.com/influxdata/telegraf/issues/3004): Do not allow metrics with trailing slashes.
- [#3011](https://github.com/influxdata/telegraf/issues/3011): Prevent Write from being called concurrently.
## v1.3.3 [2017-06-28]
### Bugfixes
- [#2915](https://github.com/influxdata/telegraf/issues/2915): Allow dos line endings in tail and logparser.
- [#2937](https://github.com/influxdata/telegraf/issues/2937): Remove label value sanitization in prometheus output.
- [#2948](https://github.com/influxdata/telegraf/issues/2948): Fix bug parsing default timestamps with modified precision.
- [#2954](https://github.com/influxdata/telegraf/issues/2954): Fix panic in elasticsearch input if cannot determine master.
## v1.3.2 [2017-06-14]
### Bugfixes
- [#2862](https://github.com/influxdata/telegraf/issues/2862): Fix InfluxDB UDP metric splitting.
- [#2888](https://github.com/influxdata/telegraf/issues/2888): Fix mongodb/leofs urls without scheme.
- [#2822](https://github.com/influxdata/telegraf/issues/2822): Fix inconsistent label dimensions in prometheus output.
## v1.3.1 [2017-05-31]
### Bugfixes
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
- [#2815](https://github.com/influxdata/telegraf/issues/2815): Inputs processes fails with "no such process".
- [#2851](https://github.com/influxdata/telegraf/pull/2851): Fix InfluxDB output database quoting.
- [#2856](https://github.com/influxdata/telegraf/issues/2856): Fix net input on older Linux kernels.
- [#2848](https://github.com/influxdata/telegraf/pull/2848): Fix panic in mongo input.
- [#2869](https://github.com/influxdata/telegraf/pull/2869): Fix length calculation of split metric buffer.
## v1.3 [2017-05-15]
### Release Notes
@ -79,6 +126,9 @@ be deprecated eventually.
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
### Bugfixes
@ -118,6 +168,7 @@ be deprecated eventually.
- [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale
- [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload.
- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs
- [#2706](https://github.com/influxdata/telegraf/issues/2706): Improve documentation for redis input plugin
## v1.2.1 [2017-02-01]

2
Godeps
View File

@ -46,7 +46,7 @@ github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062
github.com/shirou/gopsutil 9a4a9167ad3b4355dbf1c2c7a0f5f0d3fb1e9ab9
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987

View File

@ -111,6 +111,7 @@ configuration options.
* [couchbase](./plugins/inputs/couchbase)
* [couchdb](./plugins/inputs/couchdb)
* [disque](./plugins/inputs/disque)
* [dmcache](./plugins/inputs/dmcache)
* [dns query time](./plugins/inputs/dns_query)
* [docker](./plugins/inputs/docker)
* [dovecot](./plugins/inputs/dovecot)
@ -127,6 +128,7 @@ configuration options.
* [ipmi_sensor](./plugins/inputs/ipmi_sensor)
* [iptables](./plugins/inputs/iptables)
* [jolokia](./plugins/inputs/jolokia)
* [kapacitor](./plugins/inputs/kapacitor)
* [kubernetes](./plugins/inputs/kubernetes)
* [leofs](./plugins/inputs/leofs)
* [lustre2](./plugins/inputs/lustre2)
@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
* [github](./plugins/inputs/webhooks/github)
* [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar)
* [papertrail](./plugins/inputs/webhooks/papertrail)
Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option:

View File

@ -1,13 +1,11 @@
machine:
go:
version: 1.8.1
services:
- docker
post:
- sudo service zookeeper stop
- go version
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.8.linux-amd64.tar.gz
- go version
- memcached
- redis
- rabbitmq-server
dependencies:
override:

View File

@ -151,6 +151,16 @@ func reloadLoop(
log.Fatalf("E! Error: no inputs found, did you provide a valid config file?")
}
if int64(c.Agent.Interval.Duration) <= 0 {
log.Fatalf("E! Agent interval must be positive, found %s",
c.Agent.Interval.Duration)
}
if int64(c.Agent.FlushInterval.Duration) <= 0 {
log.Fatalf("E! Agent flush_interval must be positive; found %s",
c.Agent.Interval.Duration)
}
ag, err := agent.NewAgent(c)
if err != nil {
log.Fatal("E! " + err.Error())

View File

@ -95,7 +95,8 @@
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
@ -141,7 +142,7 @@
# ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN"
# ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, it's value will be used as the routing key
# ## ie, if this tag exists, its value will be used as the routing key
# routing_tag = "host"
#
# ## InfluxDB retention policy
@ -335,6 +336,10 @@
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to output.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
@ -1325,6 +1330,18 @@
# attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
# # Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints
# [[inputs.kapacitor]]
# ## Multiple URLs from which to read Kapacitor-formatted JSON
# ## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
# urls = [
# "http://localhost:9092/kapacitor/v1/debug/vars"
# ]
#
# ## Time limit for http requests
# timeout = "5s"
# # Get kernel statistics from /proc/vmstat
# [[inputs.kernel_vmstat]]
# # no configuration

View File

@ -45,7 +45,7 @@ func (g *GlobPath) Match() map[string]os.FileInfo {
if !g.hasMeta {
out := make(map[string]os.FileInfo)
info, err := os.Stat(g.path)
if !os.IsNotExist(err) {
if err == nil {
out[g.path] = info
}
return out
@ -55,7 +55,7 @@ func (g *GlobPath) Match() map[string]os.FileInfo {
files, _ := filepath.Glob(g.path)
for _, file := range files {
info, err := os.Stat(file)
if !os.IsNotExist(err) {
if err == nil {
out[file] = info
}
}

View File

@ -1,6 +1,7 @@
package globpath
import (
"os"
"runtime"
"strings"
"testing"
@ -70,3 +71,20 @@ func getTestdataDir() string {
_, filename, _, _ := runtime.Caller(1)
return strings.Replace(filename, "globpath_test.go", "testdata", 1)
}
func TestMatch_ErrPermission(t *testing.T) {
tests := []struct {
input string
expected map[string]os.FileInfo
}{
{"/root/foo", map[string]os.FileInfo{}},
{"/root/f*", map[string]os.FileInfo{}},
}
for _, test := range tests {
glob, err := Compile(test.input)
require.NoError(t, err)
actual := glob.Match()
require.Equal(t, test.expected, actual)
}
}

View File

@ -3,6 +3,7 @@ package models
import (
"log"
"math"
"strings"
"time"
"github.com/influxdata/telegraf"
@ -77,7 +78,27 @@ func makemetric(
}
}
for k, v := range tags {
if strings.HasSuffix(k, `\`) {
log.Printf("D! Measurement [%s] tag [%s] "+
"ends with a backslash, skipping", measurement, k)
delete(tags, k)
continue
} else if strings.HasSuffix(v, `\`) {
log.Printf("D! Measurement [%s] tag [%s] has a value "+
"ending with a backslash, skipping", measurement, k)
delete(tags, k)
continue
}
}
for k, v := range fields {
if strings.HasSuffix(k, `\`) {
log.Printf("D! Measurement [%s] field [%s] "+
"ends with a backslash, skipping", measurement, k)
delete(fields, k)
continue
}
// Validate uint64 and float64 fields
// convert all int & uint types to int64
switch val := v.(type) {
@ -128,6 +149,14 @@ func makemetric(
delete(fields, k)
continue
}
case string:
if strings.HasSuffix(val, `\`) {
log.Printf("D! Measurement [%s] field [%s] has a value "+
"ending with a backslash, skipping", measurement, k)
delete(fields, k)
continue
}
fields[k] = v
default:
fields[k] = v
}

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMakeMetricNoFields(t *testing.T) {
@ -332,6 +333,128 @@ func TestMakeMetricNameSuffix(t *testing.T) {
)
}
func TestMakeMetric_TrailingSlash(t *testing.T) {
now := time.Now()
tests := []struct {
name string
measurement string
fields map[string]interface{}
tags map[string]string
expectedNil bool
expectedMeasurement string
expectedFields map[string]interface{}
expectedTags map[string]string
}{
{
name: "Measurement cannot have trailing slash",
measurement: `cpu\`,
fields: map[string]interface{}{
"value": int64(42),
},
tags: map[string]string{},
expectedNil: true,
},
{
name: "Field key with trailing slash dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"value": int64(42),
`bad\`: `xyzzy`,
},
tags: map[string]string{},
expectedMeasurement: `cpu`,
expectedFields: map[string]interface{}{
"value": int64(42),
},
expectedTags: map[string]string{},
},
{
name: "Field value with trailing slash dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"value": int64(42),
"bad": `xyzzy\`,
},
tags: map[string]string{},
expectedMeasurement: `cpu`,
expectedFields: map[string]interface{}{
"value": int64(42),
},
expectedTags: map[string]string{},
},
{
name: "Must have one field after dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"bad": `xyzzy\`,
},
tags: map[string]string{},
expectedNil: true,
},
{
name: "Tag key with trailing slash dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"value": int64(42),
},
tags: map[string]string{
`host\`: "localhost",
"a": "x",
},
expectedMeasurement: `cpu`,
expectedFields: map[string]interface{}{
"value": int64(42),
},
expectedTags: map[string]string{
"a": "x",
},
},
{
name: "Tag value with trailing slash dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"value": int64(42),
},
tags: map[string]string{
`host`: `localhost\`,
"a": "x",
},
expectedMeasurement: `cpu`,
expectedFields: map[string]interface{}{
"value": int64(42),
},
expectedTags: map[string]string{
"a": "x",
},
},
}
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
m := ri.MakeMetric(
tc.measurement,
tc.fields,
tc.tags,
telegraf.Untyped,
now)
if tc.expectedNil {
require.Nil(t, m)
} else {
require.NotNil(t, m)
require.Equal(t, tc.expectedMeasurement, m.Name())
require.Equal(t, tc.expectedFields, m.Fields())
require.Equal(t, tc.expectedTags, m.Tags())
}
})
}
}
type testInput struct{}
func (t *testInput) Description() string { return "" }

View File

@ -2,6 +2,7 @@ package models
import (
"log"
"sync"
"time"
"github.com/influxdata/telegraf"
@ -34,6 +35,9 @@ type RunningOutput struct {
metrics *buffer.Buffer
failMetrics *buffer.Buffer
// Guards against concurrent calls to the Output as described in #3009
sync.Mutex
}
func NewRunningOutput(
@ -169,6 +173,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
if nMetrics == 0 {
return nil
}
ro.Lock()
defer ro.Unlock()
start := time.Now()
err := ro.Output.Write(metrics)
elapsed := time.Since(start)

View File

@ -6,6 +6,7 @@ import (
"hash/fnv"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
@ -26,6 +27,9 @@ func New(
if len(name) == 0 {
return nil, fmt.Errorf("Metric cannot be made with an empty name")
}
if strings.HasSuffix(name, `\`) {
return nil, fmt.Errorf("Metric cannot have measurement name ending with a backslash")
}
var thisType telegraf.ValueType
if len(mType) > 0 {
@ -44,6 +48,13 @@ func New(
// pre-allocate exact size of the tags slice
taglen := 0
for k, v := range tags {
if strings.HasSuffix(k, `\`) {
return nil, fmt.Errorf("Metric cannot have tag key ending with a backslash")
}
if strings.HasSuffix(v, `\`) {
return nil, fmt.Errorf("Metric cannot have tag value ending with a backslash")
}
if len(k) == 0 || len(v) == 0 {
continue
}
@ -66,7 +77,17 @@ func New(
// pre-allocate capacity of the fields slice
fieldlen := 0
for k, _ := range fields {
for k, v := range fields {
if strings.HasSuffix(k, `\`) {
return nil, fmt.Errorf("Metric cannot have field key ending with a backslash")
}
switch val := v.(type) {
case string:
if strings.HasSuffix(val, `\`) {
return nil, fmt.Errorf("Metric cannot have field value ending with a backslash")
}
}
// 10 bytes is completely arbitrary, but will at least prevent some
// amount of allocations. There's a small possibility this will create
// slightly more allocations for a metric that has many short fields.
@ -98,7 +119,7 @@ func indexUnescapedByte(buf []byte, b byte) int {
break
}
keyi += i
if countBackslashes(buf, keyi-1)%2 == 0 {
if buf[keyi-1] != '\\' {
break
} else {
keyi++
@ -107,24 +128,6 @@ func indexUnescapedByte(buf []byte, b byte) int {
return keyi
}
// countBackslashes counts the number of preceding backslashes starting at
// the 'start' index.
func countBackslashes(buf []byte, index int) int {
var count int
for {
if index < 0 {
return count
}
if buf[index] == '\\' {
count++
index--
} else {
break
}
}
return count
}
type metric struct {
name []byte
tags []byte
@ -218,7 +221,7 @@ func (m *metric) SerializeTo(dst []byte) int {
}
func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize {
if m.Len() <= maxSize {
return []telegraf.Metric{m}
}
var out []telegraf.Metric
@ -248,7 +251,7 @@ func (m *metric) Split(maxSize int) []telegraf.Metric {
// if true, then we need to create a metric _not_ including the currently
// selected field
if len(m.fields[i:j])+len(fields)+constant > maxSize {
if len(m.fields[i:j])+len(fields)+constant >= maxSize {
// if false, then we'll create a metric including the currently
// selected field anyways. This means that the given maxSize is too
// small for a single field to fit.

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewMetric(t *testing.T) {
@ -249,11 +250,13 @@ func TestNewMetric_Fields(t *testing.T) {
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(1),
"int": int64(1),
"bool": true,
"false": false,
"string": "test",
"float": float64(1),
"int": int64(1),
"bool": true,
"false": false,
"string": "test",
"quote_string": `x"y`,
"backslash_quote_string": `x\"y`,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
@ -366,7 +369,7 @@ func TestIndexUnescapedByte(t *testing.T) {
{
in: []byte(`foo\\bar`),
b: 'b',
expected: 5,
expected: -1,
},
{
in: []byte(`foobar`),
@ -458,7 +461,7 @@ func TestSplitMetric(t *testing.T) {
assert.Len(t, split70, 3)
split60 := m.Split(60)
assert.Len(t, split60, 4)
assert.Len(t, split60, 5)
}
// test splitting metric into various max lengths
@ -578,6 +581,42 @@ func TestSplitMetric_OneField(t *testing.T) {
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
}
func TestSplitMetric_ExactSize(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len())
// check that no copy was made
require.Equal(t, &m, &actual[0])
}
func TestSplitMetric_NoRoomForNewline(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len() - 1)
require.Equal(t, 2, len(actual))
}
func TestNewMetricAggregate(t *testing.T) {
now := time.Now()
@ -648,3 +687,55 @@ func TestEmptyTagValueOrKey(t *testing.T) {
assert.NoError(t, err)
}
func TestNewMetric_TrailingSlash(t *testing.T) {
now := time.Now()
tests := []struct {
name string
tags map[string]string
fields map[string]interface{}
}{
{
name: `cpu\`,
fields: map[string]interface{}{
"value": int64(42),
},
},
{
name: "cpu",
fields: map[string]interface{}{
`value\`: "x",
},
},
{
name: "cpu",
fields: map[string]interface{}{
"value": `x\`,
},
},
{
name: "cpu",
tags: map[string]string{
`host\`: "localhost",
},
fields: map[string]interface{}{
"value": int64(42),
},
},
{
name: "cpu",
tags: map[string]string{
"host": `localhost\`,
},
fields: map[string]interface{}{
"value": int64(42),
},
},
}
for _, tc := range tests {
_, err := New(tc.name, tc.tags, tc.fields, now)
assert.Error(t, err)
}
}

View File

@ -129,7 +129,7 @@ func parseMetric(buf []byte,
// apply precision multiplier
var nsec int64
multiplier := getPrecisionMultiplier(precision)
if multiplier > 1 {
if len(ts) > 0 && multiplier > 1 {
tsint, err := parseIntBytes(ts, 10, 64)
if err != nil {
return nil, err

View File

@ -380,11 +380,25 @@ func TestParsePrecision(t *testing.T) {
} {
metrics, err := ParseWithDefaultTimePrecision(
[]byte(tt.line+"\n"), time.Now(), tt.precision)
assert.NoError(t, err, tt)
assert.NoError(t, err)
assert.Equal(t, tt.expected, metrics[0].UnixNano())
}
}
func TestParsePrecisionUnsetTime(t *testing.T) {
for _, tt := range []struct {
line string
precision string
}{
{"test v=42", "s"},
{"test v=42", "ns"},
} {
_, err := ParseWithDefaultTimePrecision(
[]byte(tt.line+"\n"), time.Now(), tt.precision)
assert.NoError(t, err)
}
}
func TestParseMaxKeyLength(t *testing.T) {
key := ""
for {

View File

@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
// this for-loop is the sunny-day scenario, where we are given a
// buffer that is large enough to hold at least a single metric.
// all of the cases below it are edge-cases.
if r.metrics[r.iM].Len() < len(p[i:]) {
if r.metrics[r.iM].Len() <= len(p[i:]) {
i += r.metrics[r.iM].SerializeTo(p[i:])
} else {
break
@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
if len(tmp) > 1 {
r.splitMetrics = tmp
r.state = split
if r.splitMetrics[0].Len() < len(p) {
if r.splitMetrics[0].Len() <= len(p) {
i += r.splitMetrics[0].SerializeTo(p)
r.iSM = 1
} else {
@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
}
case split:
if r.splitMetrics[r.iSM].Len() < len(p) {
if r.splitMetrics[r.iSM].Len() <= len(p) {
// write the current split metric
i += r.splitMetrics[r.iSM].SerializeTo(p)
r.iSM++
@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
r.iSM++
if r.iSM == len(r.splitMetrics) {
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal
} else {
r.state = split

View File

@ -8,8 +8,8 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func BenchmarkMetricReader(b *testing.B) {
@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) {
}
}
// Regression test for when a metric is the same size as the buffer.
//
// Previously EOF would not be set until the next call to Read.
func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, m1.Len())
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regression test for when a metric requires to be split and one of the
// split metrics is exactly the size of the buffer.
//
// Previously an empty string would be returned on the next Read without error,
// and then next Read call would panic.
func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 30)
// foo a=1i,bb=2i 1481032190000000000\n // len 35
//
// Requires this specific split order:
// foo a=1i 1481032190000000000\n // len 29
// foo bb=2i 1481032190000000000\n // len 30
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regresssion test for when a metric requires to be split and one of the
// split metrics is larger than the buffer.
//
// Previously the metric index would be set incorrectly causing a panic.
func TestMetricReader_SplitOverflowOversized(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{
"a": int64(1),
"bbb": int64(2),
}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 30)
// foo a=1i,bbb=2i 1481032190000000000\n // len 36
//
// foo a=1i 1481032190000000000\n // len 29
// foo bbb=2i 1481032190000000000\n // len 31
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regresssion test for when a split metric exactly fits in the buffer.
//
// Previously the metric would be overflow split when not required.
func TestMetricReader_SplitOverflowUneeded(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "b": int64(2)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 29)
// foo a=1i,b=2i 1481032190000000000\n // len 34
//
// foo a=1i 1481032190000000000\n // len 29
// foo b=2i 1481032190000000000\n // len 29
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
ts := time.Unix(1481032190, 0)
m, _ := New("foo", map[string]string{},
@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
assert.Equal(t, test.err, err, test.expRegex)
}
}
func TestMetricRoundtrip(t *testing.T) {
const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000
nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000
`
metrics, err := Parse([]byte(lp))
require.NoError(t, err)
r := NewReader(metrics)
buf := make([]byte, 128)
_, err = r.Read(buf)
require.NoError(t, err)
metrics, err = Parse(buf)
require.NoError(t, err)
}

View File

@ -29,6 +29,8 @@ type Apache struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
client *http.Client
}
var sampleConfig = `
@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
n.ResponseTimeout.Duration = time.Second * 5
}
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
var wg sync.WaitGroup
wg.Add(len(n.Urls))
for _, u := range n.Urls {
@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
return nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
var tr *http.Transport
if addr.Scheme == "https" {
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return err
}
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
} else {
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
func (n *Apache) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: tr,
Timeout: n.ResponseTimeout.Duration,
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
}
return client, nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req, err := http.NewRequest("GET", addr.String(), nil)
if err != nil {
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req.SetBasicAuth(n.Username, n.Password)
}
resp, err := client.Do(req)
resp, err := n.client.Do(req)
if err != nil {
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
}

View File

@ -63,6 +63,7 @@ Delete second or Not synchronised.
### Measurements & Fields:
- chrony
- system_time (float, seconds)
- last_offset (float, seconds)
- rms_offset (float, seconds)
- frequency (float, ppm)
@ -84,7 +85,7 @@ Delete second or Not synchronised.
```
$ telegraf -config telegraf.conf -input-filter chrony -test
* Plugin: chrony, Collection 1
> chrony,leap_status=normal,reference_id=192.168.1.1,stratum=3 frequency=-35.657,last_offset=-0.000013616,residual_freq=-0,rms_offset=0.000027073,root_delay=0.000644,root_dispersion=0.003444,skew=0.001,update_interval=1031.2 1463750789687639161
> chrony,leap_status=normal,reference_id=192.168.1.1,stratum=3 frequency=-35.657,system_time=0.000027073,last_offset=-0.000013616,residual_freq=-0,rms_offset=0.000027073,root_delay=0.000644,root_dispersion=0.003444,skew=0.001,update_interval=1031.2 1463750789687639161
```

View File

@ -92,7 +92,7 @@ func processChronycOutput(out string) (map[string]interface{}, map[string]string
}
name := strings.ToLower(strings.Replace(strings.TrimSpace(stats[0]), " ", "_", -1))
// ignore reference time
if strings.Contains(name, "time") {
if strings.Contains(name, "ref_time") {
continue
}
valueFields := strings.Fields(stats[1])

View File

@ -31,6 +31,7 @@ func TestGather(t *testing.T) {
"stratum": "3",
}
fields := map[string]interface{}{
"system_time": 0.000020390,
"last_offset": 0.000012651,
"rms_offset": 0.000025577,
"frequency": -16.001,

View File

@ -169,7 +169,10 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
e.setCatMaster(s + "/_cat/master")
if err := e.setCatMaster(s + "/_cat/master"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}
// Always gather node states
@ -353,7 +356,7 @@ func (e *Elasticsearch) setCatMaster(url string) error {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK)
return fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
response, err := ioutil.ReadAll(r.Body)

View File

@ -25,7 +25,6 @@ type HTTPResponse struct {
Headers map[string]string
FollowRedirects bool
ResponseStringMatch string
compiledStringMatch *regexp.Regexp
// Path to CA file
SSLCA string `toml:"ssl_ca"`
@ -35,6 +34,9 @@ type HTTPResponse struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
compiledStringMatch *regexp.Regexp
client *http.Client
}
// Description returns the plugin Description
@ -88,13 +90,12 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
if err != nil {
return nil, err
}
tr := &http.Transport{
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
TLSClientConfig: tlsCfg,
}
client := &http.Client{
Transport: tr,
Timeout: h.ResponseTimeout.Duration,
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: h.ResponseTimeout.Duration,
}
if h.FollowRedirects == false {
@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
}
// HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
// Prepare fields
fields := make(map[string]interface{})
client, err := h.createHttpClient()
if err != nil {
return nil, err
}
var body io.Reader
if h.Body != "" {
body = strings.NewReader(h.Body)
@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Start Timer
start := time.Now()
resp, err := client.Do(request)
resp, err := h.client.Do(request)
if err != nil {
if h.FollowRedirects {
return nil, err
@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
return nil, err
}
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fields["response_time"] = time.Since(start).Seconds()
fields["http_response_code"] = resp.StatusCode
@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Prepare data
tags := map[string]string{"server": h.Address, "method": h.Method}
var fields map[string]interface{}
if h.client == nil {
client, err := h.createHttpClient()
if err != nil {
return err
}
h.client = client
}
// Gather data
fields, err = h.HTTPGather()
fields, err = h.httpGather()
if err != nil {
return err
}

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
"Host": "Hello",
},
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
}
func TestFields(t *testing.T) {
@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
}
func TestRedirects(t *testing.T) {
@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{
Address: ts.URL + "/badredirect",
@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.Error(t, err)
value, ok = acc.IntField("http_response", "http_response_code")
require.False(t, ok)
}
func TestMethod(t *testing.T) {
@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{
Address: ts.URL + "/mustbepostmethod",
@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
}
value, ok = acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusMethodNotAllowed, value)
//check that lowercase methods work correctly
h = &HTTPResponse{
@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
}
value, ok = acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusMethodNotAllowed, value)
}
func TestBody(t *testing.T) {
@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{
Address: ts.URL + "/musthaveabody",
@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"])
}
value, ok = acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusBadRequest, value)
}
func TestStringMatch(t *testing.T) {
@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
}
func TestStringMatchJson(t *testing.T) {
@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
}
func TestStringMatchFail(t *testing.T) {
@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 0, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 0, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
}
func TestTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test with sleep in short mode.")
}
mux := setUpTestMux()
ts := httptest.NewServer(mux)
defer ts.Close()
@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
},
FollowRedirects: true,
}
_, err := h.HTTPGather()
require.Error(t, err)
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
ok := acc.HasIntField("http_response", "http_response_code")
require.False(t, ok)
}

View File

@ -3,6 +3,7 @@ package leofs
import (
"bufio"
"fmt"
"log"
"net/url"
"os/exec"
"strconv"
@ -18,7 +19,7 @@ import (
const oid = ".1.3.6.1.4.1.35450"
// For Manager Master
const defaultEndpoint = "127.0.0.1:4020"
const defaultEndpoint = "udp://127.0.0.1:4020"
type ServerType int
@ -135,9 +136,9 @@ var serverTypeMapping = map[string]ServerType{
}
var sampleConfig = `
## An array of URI to gather stats about LeoFS.
## Specify an ip or hostname with port. ie 127.0.0.1:4020
servers = ["127.0.0.1:4021"]
## An array of URLs of the form:
## "udp://" host [ ":" port]
servers = ["udp://127.0.0.1:4020"]
`
func (l *LeoFS) SampleConfig() string {
@ -154,17 +155,28 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return nil
}
var wg sync.WaitGroup
for _, endpoint := range l.Servers {
_, err := url.Parse(endpoint)
for i, endpoint := range l.Servers {
if !strings.HasPrefix(endpoint, "udp://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
endpoint = "udp://" + endpoint
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", endpoint)
l.Servers[i] = endpoint
}
u, err := url.Parse(endpoint)
if err != nil {
acc.AddError(fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err))
acc.AddError(fmt.Errorf("Unable to parse address %q: %s", endpoint, err))
continue
}
port, err := retrieveTokenAfterColon(endpoint)
if err != nil {
acc.AddError(err)
if u.Host == "" {
acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
continue
}
port := u.Port()
if port == "" {
port = "4020"
}
st, ok := serverTypeMapping[port]
if !ok {
st = ServerTypeStorage

View File

@ -118,11 +118,18 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns))
p.namedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns {
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}"
p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
}
if len(p.namedPatterns) == 0 {
return fmt.Errorf("pattern required")
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse

View File

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"reflect"
"strings"
"sync"
"github.com/influxdata/tail"
@ -45,6 +46,7 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
@ -117,16 +119,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
// compile log parser patterns:
var haveError bool
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
acc.AddError(err)
haveError = true
return err
}
}
if haveError {
return nil
}
l.wg.Add(1)
go l.parser()
@ -191,9 +188,12 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
continue
}
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")
select {
case <-l.done:
case l.lines <- line.Text:
case l.lines <- text:
}
}
}

View File

@ -38,12 +38,8 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
}
acc := testutil.Accumulator{}
logparser.Start(&acc)
if assert.NotEmpty(t, acc.Errors) {
assert.Error(t, acc.Errors[0])
}
logparser.Stop()
err := logparser.Start(&acc)
assert.Error(t, err)
}
func TestGrokParseLogFiles(t *testing.T) {

View File

@ -4,12 +4,12 @@
```toml
[[inputs.mongodb]]
## An array of URI to gather stats about. Specify an ip or hostname
## with optional port add password. ie,
## An array of URLs of the form:
## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"]
servers = ["mongodb://127.0.0.1:27017"]
gather_perdb_stats = false
## Optional SSL Config
@ -19,15 +19,8 @@
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```
For authenticated mongodb instances use `mongodb://` connection URI
```toml
[[inputs.mongodb]]
servers = ["mongodb://username:password@10.XX.XX.XX:27101/mydatabase?authSource=admin"]
```
This connection uri may be different based on your environement and mongodb
setup. If the user doesn't have the required privilege to execute serverStatus
This connection uri may be different based on your environment and mongodb
setup. If the user doesn't have the required privilege to execute serverStatus
command the you will get this error on telegraf
```

View File

@ -4,8 +4,10 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"net/url"
"strings"
"sync"
"time"
@ -37,12 +39,12 @@ type Ssl struct {
}
var sampleConfig = `
## An array of URI to gather stats about. Specify an ip or hostname
## with optional port add password. ie,
## An array of URLs of the form:
## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"]
servers = ["mongodb://127.0.0.1:27017"]
gather_perdb_stats = false
## Optional SSL Config
@ -61,7 +63,7 @@ func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}
var localhost = &url.URL{Host: "127.0.0.1:27017"}
var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
@ -72,19 +74,25 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
}
var wg sync.WaitGroup
for _, serv := range m.Servers {
for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
}
u, err := url.Parse(serv)
if err != nil {
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err))
acc.AddError(fmt.Errorf("Unable to parse address %q: %s", serv, err))
continue
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
}
if u.Host == "" {
acc.AddError(fmt.Errorf("Unable to parse address %q", serv))
continue
}
wg.Add(1)
go func(srv *Server) {
defer wg.Done()

View File

@ -564,7 +564,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
// BEGIN code modification
if newStat.Repl.IsMaster.(bool) {
returnVal.NodeType = "PRI"
} else if newStat.Repl.Secondary.(bool) {
} else if newStat.Repl.Secondary != nil && newStat.Repl.Secondary.(bool) {
returnVal.NodeType = "SEC"
} else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) {
returnVal.NodeType = "ARB"

View File

@ -7,6 +7,7 @@ import (
"bytes"
"fmt"
"os/exec"
"regexp"
"strconv"
"strings"
@ -67,6 +68,14 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
return err
}
// Due to problems with a parsing, we have to use regexp expression in order
// to remove string that starts from '(' and ends with space
// see: https://github.com/influxdata/telegraf/issues/2386
reg, err := regexp.Compile("\\([\\S]*")
if err != nil {
return err
}
lineCounter := 0
scanner := bufio.NewScanner(bytes.NewReader(out))
for scanner.Scan() {
@ -80,6 +89,8 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
line = strings.TrimLeft(line, "*#o+x.-")
}
line = reg.ReplaceAllString(line, "")
fields := strings.Fields(line)
if len(fields) < 2 {
continue

View File

@ -247,6 +247,21 @@ func TestBadWhenNTPQ(t *testing.T) {
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
// TestParserNTPQ - realated to:
// https://github.com/influxdata/telegraf/issues/2386
func TestParserNTPQ(t *testing.T) {
tt := tester{
ret: []byte(multiParserNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
}
func TestMultiNTPQ(t *testing.T) {
tt := tester{
ret: []byte(multiNTPQ),
@ -463,3 +478,9 @@ var multiNTPQ = ` remote refid st t when poll reach delay
5.9.29.107 10.177.80.37 2 u 703 1024 377 205.704 160.406 449602.
91.189.94.4 10.177.80.37 2 u 673 1024 377 143.047 274.726 449445.
`
var multiParserNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
+37.58.57.238 (d 192.53.103.103 2 u 10 1024 377 1.748 0.373 0.101
+37.58.57.238 (domain) 192.53.103.103 2 u 10 1024 377 1.748 0.373 0.101
+37.58.57.238 ( 192.53.103.103 2 u 10 1024 377 1.748 0.373 0.101
`

View File

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"sync"
"time"
@ -32,6 +31,8 @@ type Prometheus struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
client *http.Client
}
var sampleConfig = `
@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHttpClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup
for _, serv := range p.Urls {
@ -89,29 +98,30 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second),
}
func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout.Duration,
}
return client, nil
}
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url, nil)
req.Header.Add("Accept", acceptHeader)
var token []byte
var resp *http.Response
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return err
}
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
DisableKeepAlives: true,
}
if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken)
if err != nil {
@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
req.Header.Set("Authorization", "Bearer "+string(token))
}
resp, err = rt.RoundTrip(req)
resp, err = p.client.Do(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}

View File

@ -0,0 +1,121 @@
# RabbitMQ Input Plugin
Reads metrics from RabbitMQ servers via the [Management Plugin](https://www.rabbitmq.com/management.html).
For additional details reference the [RabbitMQ Management HTTP Stats](https://cdn.rawgit.com/rabbitmq/rabbitmq-management/master/priv/www/doc/stats.html).
### Configuration:
```toml
[[inputs.rabbitmq]]
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional request timeouts
##
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
## for a server's response headers after fully writing the request.
# header_timeout = "3s"
##
## client_timeout specifies a time limit for requests made by this client.
## Includes connection time, any redirects, and reading the response body.
# client_timeout = "4s"
## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
```
### Measurements & Fields:
- rabbitmq_overview
- channels (int, channels)
- connections (int, connections)
- consumers (int, consumers)
- exchanges (int, exchanges)
- messages (int, messages)
- messages_acked (int, messages)
- messages_delivered (int, messages)
- messages_published (int, messages)
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- rabbitmq_queue
- consumer_utilisation (float, percent)
- consumers (int, int)
- idle_since (string, time - e.g., "2006-01-02 15:04:05")
- memory (int, bytes)
- message_bytes (int, bytes)
- message_bytes_persist (int, bytes)
- message_bytes_ram (int, bytes)
- message_bytes_ready (int, bytes)
- message_bytes_unacked (int, bytes)
- messages (int, count)
- messages_ack (int, count)
- messages_ack_rate (float, messages per second)
- messages_deliver (int, count)
- messages_deliver_rate (float, messages per second)
- messages_deliver_get (int, count)
- messages_deliver_get_rate (float, messages per second)
- messages_publish (int, count)
- messages_publish_rate (float, messages per second)
- messages_ready (int, count)
- messages_redeliver (int, count)
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)
### Tags:
- All measurements have the following tags:
- url
- rabbitmq_overview
- name
- rabbitmq_node
- node
- rabbitmq_queue
- url
- queue
- vhost
- node
- durable
- auto_delete
### Sample Queries:
### Example Output:
```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
```

View File

@ -140,8 +140,11 @@ type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
# name = "rmq-server-1" # optional tag
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
@ -174,7 +177,7 @@ func (r *RabbitMQ) SampleConfig() string {
// Description ...
func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API"
return "Reads metrics from RabbitMQ servers via the Management Plugin"
}
// Gather ...

View File

@ -18,47 +18,103 @@
### Measurements & Fields:
- Measurement
- uptime_in_seconds
- connected_clients
- used_memory
- used_memory_rss
- used_memory_peak
- used_memory_lua
- rdb_changes_since_last_save
- total_connections_received
- total_commands_processed
- instantaneous_ops_per_sec
- instantaneous_input_kbps
- instantaneous_output_kbps
- sync_full
- sync_partial_ok
- sync_partial_err
- expired_keys
- evicted_keys
- keyspace_hits
- keyspace_misses
- pubsub_channels
- pubsub_patterns
- latest_fork_usec
- connected_slaves
- master_repl_offset
- master_last_io_seconds_ago
- repl_backlog_active
- repl_backlog_size
- repl_backlog_histlen
- mem_fragmentation_ratio
- used_cpu_sys
- used_cpu_user
- used_cpu_sys_children
- used_cpu_user_children
The plugin gathers the results of the [INFO](https://redis.io/commands/info) redis command.
There are two separate measurements: _redis_ and _redis\_keyspace_, the latter is used for gathering database related statistics.
Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) and the elapsed time since the last rdb save (rdb\_last\_save\_time\_elapsed).
- redis
- keyspace_hitrate(float, number)
- rdb_last_save_time_elapsed(int, seconds)
**Server**
- uptime(int, seconds)
- lru_clock(int, number)
**Clients**
- clients(int, number)
- client_longest_output_list(int, number)
- client_biggest_input_buf(int, number)
- blocked_clients(int, number)
**Memory**
- used_memory(int, bytes)
- used_memory_rss(int, bytes)
- used_memory_peak(int, bytes)
- total_system_memory(int, bytes)
- used_memory_lua(int, bytes)
- maxmemory(int, bytes)
- maxmemory_policy(string)
- mem_fragmentation_ratio(float, number)
**Persistance**
- loading(int,flag)
- rdb_changes_since_last_save(int, number)
- rdb_bgsave_in_progress(int, flag)
- rdb_last_save_time(int, seconds)
- rdb_last_bgsave_status(string)
- rdb_last_bgsave_time_sec(int, seconds)
- rdb_current_bgsave_time_sec(int, seconds)
- aof_enabled(int, flag)
- aof_rewrite_in_progress(int, flag)
- aof_rewrite_scheduled(int, flag)
- aof_last_rewrite_time_sec(int, seconds)
- aof_current_rewrite_time_sec(int, seconds)
- aof_last_bgrewrite_status(string)
- aof_last_write_status(string)
**Stats**
- total_connections_received(int, number)
- total_commands_processed(int, number)
- instantaneous_ops_per_sec(int, number)
- total_net_input_bytes(int, bytes)
- total_net_output_bytes(int, bytes)
- instantaneous_input_kbps(float, bytes)
- instantaneous_output_kbps(float, bytes)
- rejected_connections(int, number)
- sync_full(int, number)
- sync_partial_ok(int, number)
- sync_partial_err(int, number)
- expired_keys(int, number)
- evicted_keys(int, number)
- keyspace_hits(int, number)
- keyspace_misses(int, number)
- pubsub_channels(int, number)
- pubsub_patterns(int, number)
- latest_fork_usec(int, microseconds)
- migrate_cached_sockets(int, number)
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)
- repl_backlog_histlen(int, bytes)
**CPU**
- used_cpu_sys(float, number)
- used_cpu_user(float, number)
- used_cpu_sys_children(float, number)
- used_cpu_user_children(float, number)
**Cluster**
- cluster_enabled(int, flag)
- redis_keyspace
- keys(int, number)
- expires(int, number)
- avg_ttl(int, number)
### Tags:
- All measurements have the following tags:
- port
- server
- replication role
- replication_role
- The redis_keyspace measurement has an additional database tag:
- database
### Example Output:
@ -84,5 +140,10 @@ When run with:
It produces:
```
* Plugin: redis, Collection 1
> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383
> redis,server=localhost,port=6379,replication_role=master,host=host keyspace_hitrate=1,clients=2i,blocked_clients=0i,instantaneous_input_kbps=0,sync_full=0i,pubsub_channels=0i,pubsub_patterns=0i,total_net_output_bytes=6659253i,used_memory=842448i,total_system_memory=8351916032i,aof_current_rewrite_time_sec=-1i,rdb_changes_since_last_save=0i,sync_partial_err=0i,latest_fork_usec=508i,instantaneous_output_kbps=0,expired_keys=0i,used_memory_peak=843416i,aof_rewrite_in_progress=0i,aof_last_bgrewrite_status="ok",migrate_cached_sockets=0i,connected_slaves=0i,maxmemory_policy="noeviction",aof_rewrite_scheduled=0i,total_net_input_bytes=3125i,used_memory_rss=9564160i,repl_backlog_histlen=0i,rdb_last_bgsave_status="ok",aof_last_rewrite_time_sec=-1i,keyspace_misses=0i,client_biggest_input_buf=5i,used_cpu_user=1.33,maxmemory=0i,rdb_current_bgsave_time_sec=-1i,total_commands_processed=271i,repl_backlog_size=1048576i,used_cpu_sys=3,uptime=2822i,lru_clock=16706281i,used_memory_lua=37888i,rejected_connections=0i,sync_partial_ok=0i,evicted_keys=0i,rdb_last_save_time_elapsed=1922i,rdb_last_save_time=1493099368i,instantaneous_ops_per_sec=0i,used_cpu_user_children=0,client_longest_output_list=0i,master_repl_offset=0i,repl_backlog_active=0i,keyspace_hits=2i,used_cpu_sys_children=0,cluster_enabled=0i,rdb_last_bgsave_time_sec=0i,aof_last_write_status="ok",total_connections_received=263i,aof_enabled=0i,repl_backlog_first_byte_offset=0i,mem_fragmentation_ratio=11.35,loading=0i,rdb_bgsave_in_progress=0i 1493101290000000000
```
redis_keyspace:
```
> redis_keyspace,database=db1,host=host,server=localhost,port=6379,replication_role=master keys=1i,expires=0i,avg_ttl=0i 1493101350000000000
```

View File

@ -843,7 +843,7 @@ FROM (SELECT DISTINCT DatabaseName FROM #Databases) AS bl
SET @DynamicPivotQuery = N'
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -856,7 +856,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -869,7 +869,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -883,7 +883,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -896,7 +896,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -909,7 +909,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -922,7 +922,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -935,7 +935,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -948,7 +948,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -961,7 +961,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)

View File

@ -12,6 +12,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"syscall"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -195,6 +196,13 @@ func readProcFile(filename string) ([]byte, error) {
if os.IsNotExist(err) {
return nil, nil
}
// Reading from /proc/<PID> fails with ESRCH if the process has
// been terminated between open() and read().
if perr, ok := err.(*os.PathError); ok && perr.Err == syscall.ESRCH {
return nil, nil
}
return nil, err
}

View File

@ -2,6 +2,7 @@ package tail
import (
"fmt"
"strings"
"sync"
"github.com/influxdata/tail"
@ -123,7 +124,10 @@ func (t *Tail) receiver(tailer *tail.Tail) {
tailer.Filename, err))
continue
}
m, err = t.parser.ParseLine(line.Text)
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")
m, err = t.parser.ParseLine(text)
if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} else {

View File

@ -103,3 +103,33 @@ func TestTailBadLine(t *testing.T) {
acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
}
func TestTailDosLineendings(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n")
require.NoError(t, err)
tt := NewTail()
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser()
tt.SetParser(p)
defer tt.Stop()
defer tmpfile.Close()
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(2)
acc.AssertContainsFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
})
acc.AssertContainsFields(t, "cpu2",
map[string]interface{}{
"usage_idle": float64(200),
})
}

View File

@ -18,7 +18,8 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
@ -52,7 +53,7 @@ to write to. Each URL should start with either `http://` or `udp://`
### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Retention policy to write to.
* `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb
* `password`: Password for influxdb

View File

@ -16,7 +16,6 @@ var (
defaultRequestTimeout = time.Second * 5
)
//
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
// validate required parameters:
if len(config.URL) == 0 {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"log"
"net"
"net/url"
)
@ -25,6 +26,7 @@ type UDPConfig struct {
PayloadSize int
}
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL)
if err != nil {
@ -55,20 +57,22 @@ type udpClient struct {
buffer []byte
}
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error {
return nil
}
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// write params are ignored by the UDP client
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// contentLength is ignored by the UDP client.
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int
for {
@ -79,21 +83,40 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
if err != io.EOF && err != nil {
return totaln, err
}
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err
if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err
}
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
// Scan forward until next line break to realign.
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return totaln, err
}
if c.buffer[nR-1] == uint8('\n') {
break
}
}
}
}
return totaln, nil
}
// contentLength is ignored by the UDP client.
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1)
}
// Close will terminate the provided client connection
func (c *udpClient) Close() error {
return c.conn.Close()
}

View File

@ -1,7 +1,6 @@
package client
import (
"bytes"
"net"
"testing"
"time"
@ -10,6 +9,7 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUDPClient(t *testing.T) {
@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) {
pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt)
metrics := `cpu value=99
cpu value=55
cpu value=44
cpu value=101
cpu value=91
cpu value=92
`
// test sending packet with 6 metrics in a stream.
reader := bytes.NewReader([]byte(metrics))
// contentLength is ignored:
n, err = client.WriteStream(reader, 10)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
//
// Test that UDP packets get broken up properly:
config2 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 25,
}
client2, err := NewUDP(config2)
assert.NoError(t, err)
wp := WriteParams{}
//
// Using Write():
buf := []byte(metrics)
n, err = client2.WriteWithParams(buf, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
//
// Using WriteStream():
reader = bytes.NewReader([]byte(metrics))
n, err = client2.WriteStreamWithParams(reader, 10, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
//
// Using WriteStream() & a metric.Reader:
config3 := UDPConfig{
@ -159,4 +103,27 @@ cpu value=92
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close())
config = UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client4, err := NewUDP(config)
assert.NoError(t, err)
ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0)
assert.NoError(t, err)
require.Equal(t, 35, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close())
}

View File

@ -15,6 +15,12 @@ import (
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
)
var (
// Quote Ident replacer.
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
)
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct {
// URL is only for backwards compatability
URL string
@ -55,7 +61,8 @@ var sampleConfig = `
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
@ -78,11 +85,10 @@ var sampleConfig = `
# insecure_skip_verify = false
`
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error {
var urls []string
for _, u := range i.URLs {
urls = append(urls, u)
}
urls = append(urls, i.URLs...)
// Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list
@ -129,9 +135,11 @@ func (i *InfluxDB) Connect() error {
}
i.clients = append(i.clients, c)
err = c.Query("CREATE DATABASE " + i.Database)
err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if err != nil {
log.Println("E! Database creation failed: " + err.Error())
if !strings.Contains(err.Error(), "Status Code [403]") {
log.Println("I! Database creation failed: " + err.Error())
}
continue
}
}
@ -141,25 +149,30 @@ func (i *InfluxDB) Connect() error {
return nil
}
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error {
return nil
}
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) SampleConfig() string {
return sampleConfig
}
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}
// Choose a random server in the cluster to write to until a successful write
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0
for _, m := range metrics {
bufsize += m.Len()
}
r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs
@ -170,7 +183,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
// If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") {
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if errc != nil {
log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database)
}

View File

@ -2,15 +2,52 @@ package influxdb
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIdentQuoting(t *testing.T) {
var testCases = []struct {
database string
expected string
}{
{"x-y", `CREATE DATABASE "x-y"`},
{`x"y`, `CREATE DATABASE "x\"y"`},
{"x\ny", `CREATE DATABASE "x\ny"`},
{`x\y`, `CREATE DATABASE "x\\y"`},
}
for _, tc := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
q := r.Form.Get("q")
assert.Equal(t, tc.expected, q)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: tc.database,
}
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
}
func TestUDPInflux(t *testing.T) {
i := InfluxDB{
URLs: []string{"udp://localhost:8089"},
@ -164,3 +201,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) {
require.NoError(t, err)
require.NoError(t, i.Close())
}
type MockClient struct {
writeStreamCalled int
contentLength int
}
func (m *MockClient) Query(command string) error {
panic("not implemented")
}
func (m *MockClient) Write(b []byte) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) {
m.writeStreamCalled++
m.contentLength = contentLength
return 0, nil
}
func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) Close() error {
panic("not implemented")
}

View File

@ -6,6 +6,8 @@ import (
"log"
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"
@ -17,19 +19,40 @@ import (
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type MetricWithExpiration struct {
Metric prometheus.Metric
// SampleID uniquely identifies a Sample
type SampleID string
// Sample represents the current value of a series.
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time
}
// MetricFamily contains the data required to build valid prometheus Metrics.
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}
type PrometheusClient struct {
Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"`
server *http.Server
metrics map[string]*MetricWithExpiration
server *http.Server
sync.Mutex
// fam is the non-expired MetricFamily by Prometheus metric name.
fam map[string]*MetricFamily
// now returns the current time.
now func() time.Time
}
var sampleConfig = `
@ -41,7 +64,6 @@ var sampleConfig = `
`
func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p)
if p.Listen == "" {
@ -72,7 +94,9 @@ func (p *PrometheusClient) Connect() error {
func (p *PrometheusClient) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return p.server.Shutdown(ctx)
err := p.server.Shutdown(ctx)
prometheus.Unregister(p)
return err
}
func (p *PrometheusClient) SampleConfig() string {
@ -88,96 +112,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}
// Implements prometheus.Collector
// Expire removes Samples that have expired.
func (p *PrometheusClient) Expire() {
now := p.now()
for name, family := range p.fam {
for key, sample := range family.Samples {
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
for k, _ := range sample.Labels {
family.LabelSet[k]--
}
delete(family.Samples, key)
if len(family.Samples) == 0 {
delete(p.fam, name)
}
}
}
}
}
// Collect implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()
for key, m := range p.metrics {
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key)
} else {
ch <- m.Metric
p.Expire()
for name, family := range p.fam {
// Get list of all labels on MetricFamily
var labelNames []string
for k, v := range family.LabelSet {
if v > 0 {
labelNames = append(labelNames, k)
}
}
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}
ch <- metric
}
}
}
func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}
func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}
// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
if len(metrics) == 0 {
return nil
}
now := p.now()
for _, point := range metrics {
key := point.Name()
key = invalidNameCharRE.ReplaceAllString(key, "_")
tags := point.Tags()
vt := valueType(point.Type())
sampleID := CreateSampleID(tags)
// convert tags into prometheus labels
var labels []string
l := prometheus.Labels{}
for k, v := range point.Tags() {
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
labels = append(labels, k)
l[k] = v
labels := make(map[string]string)
for k, v := range tags {
labels[sanitize(k)] = v
}
// Get a type if it's available, defaulting to Untyped
var mType prometheus.ValueType
switch point.Type() {
case telegraf.Counter:
mType = prometheus.CounterValue
case telegraf.Gauge:
mType = prometheus.GaugeValue
default:
mType = prometheus.UntypedValue
}
for n, val := range point.Fields() {
for fn, fv := range point.Fields() {
// Ignore string and bool fields.
switch val.(type) {
case string:
continue
case bool:
continue
}
// sanitize the measurement name
n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string
if n == "value" {
mname = key
} else {
mname = fmt.Sprintf("%s_%s", key, n)
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
// switch for field type
switch val := val.(type) {
var value float64
switch fv := fv.(type) {
case int64:
metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
value = float64(fv)
case float64:
metric, err = prometheus.NewConstMetric(desc, mType, val)
value = fv
default:
continue
}
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
}
p.metrics[desc.String()] = &MetricWithExpiration{
Metric: metric,
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
}
}
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}
fam.Samples[sampleID] = sample
}
}
return nil
@ -187,6 +268,8 @@ func init() {
outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
})
}

View File

@ -4,16 +4,314 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs/prometheus"
prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
func setUnixTime(client *PrometheusClient, sec int64) {
client.now = func() time.Time {
return time.Unix(sec, 0)
}
}
// NewClient initializes a PrometheusClient.
func NewClient() *PrometheusClient {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
}
func TestWrite_Basic(t *testing.T) {
now := time.Now()
pt1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{
pt1,
}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{}, fam.LabelSet)
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
require.True(t, ok)
require.Equal(t, 0.0, sample.Value)
require.True(t, now.Before(sample.Expiration))
}
func TestWrite_IntField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 42.0, v.Value)
}
}
func TestWrite_FieldNotValue(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"howdy": 0.0},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_howdy"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 0.0, v.Value)
}
}
func TestWrite_SkipNonNumberField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": "howdy"},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
_, ok := client.fam["foo"]
require.False(t, ok)
}
func TestWrite_Counter(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.CounterValue, fam.ValueType)
}
func TestWrite_Sanitize(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo.bar",
map[string]string{"tag-with-dash": "localhost.local"},
map[string]interface{}{"field-with-dash": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_bar_field_with_dash"]
require.True(t, ok)
require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, map[string]string{
"tag_with_dash": "localhost.local"}, sample1.Labels)
}
func TestWrite_Gauge(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Gauge)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
}
func TestWrite_MixedValueType(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now,
telegraf.Counter)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
now,
telegraf.Gauge)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 1, len(fam.Samples))
}
func TestWrite_Tags(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
now)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, 1.0, sample1.Value)
require.True(t, now.Before(sample1.Expiration))
sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
require.True(t, ok)
require.Equal(t, 2.0, sample2.Value)
require.True(t, now.Before(sample2.Expiration))
}
func TestExpire(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"bar",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
require.Equal(t, 2, len(client.fam))
client.Expire()
require.Equal(t, 1, len(client.fam))
}
func TestExpire_TagsNoDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
}
func TestExpire_TagsWithDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
}
var pTesting *PrometheusClient
func TestPrometheusWritePointEmptyTag(t *testing.T) {
@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
}
}
func TestPrometheusExpireOldMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pClient, p, err := setupPrometheus()
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
require.NoError(t, err)
defer pClient.Stop()
now := time.Now()
tags := make(map[string]string)
pt1, _ := metric.New(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{pt1}
require.NoError(t, pClient.Write(metrics))
for _, m := range pClient.metrics {
m.Expiration = now.Add(time.Duration(-15) * time.Second)
}
pt2, _ := metric.New(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},
now)
var metrics2 = []telegraf.Metric{pt2}
require.NoError(t, pClient.Write(metrics2))
expected := []struct {
name string
value float64
tags map[string]string
}{
{"test_point_2", 1.0, tags},
}
var acc testutil.Accumulator
require.NoError(t, p.Gather(&acc))
for _, e := range expected {
acc.AssertContainsFields(t, e.name,
map[string]interface{}{"value": e.value})
}
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
// Confirm that it's not in the PrometheusClient map anymore
assert.Equal(t, 1, len(pClient.metrics))
}
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
if pTesting == nil {
pTesting = &PrometheusClient{Listen: "localhost:9127"}
pTesting = NewClient()
pTesting.Listen = "localhost:9127"
err := pTesting.Start()
if err != nil {
return nil, nil, err
}
} else {
pTesting.metrics = make(map[string]*MetricWithExpiration)
pTesting.fam = make(map[string]*MetricFamily)
}
time.Sleep(time.Millisecond * 200)
p := &prometheus.Prometheus{
p := &prometheus_input.Prometheus{
Urls: []string{"http://localhost:9127/metrics"},
}

View File

@ -124,6 +124,16 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
return nil
}
// Close closes the connection. Noop if already closed.
func (sw *SocketWriter) Close() error {
if sw.Conn == nil {
return nil
}
err := sw.Conn.Close()
sw.Conn = nil
return err
}
func newSocketWriter() *SocketWriter {
s, _ := serializers.NewInfluxSerializer()
return &SocketWriter{

View File

@ -143,7 +143,7 @@ func TestSocketWriter_Write_err(t *testing.T) {
// close the socket to generate an error
lconn.Close()
sw.Close()
sw.Conn.Close()
err = sw.Write(metrics)
require.Error(t, err)
assert.Nil(t, sw.Conn)

View File

@ -499,13 +499,12 @@ def build(version=None,
logging.info("Time taken: {}s".format((end_time - start_time).total_seconds()))
return True
def generate_md5_from_file(path):
"""Generate MD5 signature based on the contents of the file at path.
def generate_sha256_from_file(path):
"""Generate SHA256 hash signature based on the contents of the file at path.
"""
m = hashlib.md5()
m = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
m.update(f.read())
return m.hexdigest()
def generate_sig_from_file(path):
@ -636,6 +635,10 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
elif package_type not in ['zip', 'tar'] and static or "static_" in arch:
logging.info("Skipping package type '{}' for static builds.".format(package_type))
else:
if package_type == 'rpm' and release and '~' in package_version:
package_version, suffix = package_version.split('~', 1)
# The ~ indicatees that this is a prerelease so we give it a leading 0.
package_iteration = "0.%s" % suffix
fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format(
fpm_common_args,
name,
@ -664,9 +667,6 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
if package_type == 'rpm':
# rpm's convert any dashes to underscores
package_version = package_version.replace("-", "_")
new_outfile = outfile.replace("{}-{}".format(package_version, package_iteration), package_version)
os.rename(outfile, new_outfile)
outfile = new_outfile
outfiles.append(os.path.join(os.getcwd(), outfile))
logging.debug("Produced package files: {}".format(outfiles))
return outfiles
@ -789,9 +789,10 @@ def main(args):
if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite):
return 1
logging.info("Packages created:")
for p in packages:
logging.info("{} (MD5={})".format(p.split('/')[-1:][0],
generate_md5_from_file(p)))
for filename in packages:
logging.info("%s (SHA256=%s)",
os.path.basename(filename),
generate_sha256_from_file(filename))
if orig_branch != get_current_branch():
logging.info("Moving back to original git branch: {}".format(args.branch))
run("git checkout {}".format(orig_branch))

View File

@ -135,7 +135,9 @@ case $1 in
fi
log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then
if command -v startproc >/dev/null; then
startproc -u "$USER" -g "$GROUP" -p "$pidfile" -q -- "$daemon" -pidfile "$pidfile" -config "$config" -config-directory "$confdir" $TELEGRAF_OPTS
elif which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
else
su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER

View File

@ -11,7 +11,7 @@ function install_init {
}
function install_systemd {
cp -f $SCRIPT_DIR/telegraf.service /lib/systemd/system/telegraf.service
cp -f $SCRIPT_DIR/telegraf.service $1
systemctl enable telegraf || true
systemctl daemon-reload || true
}
@ -24,12 +24,12 @@ function install_chkconfig {
chkconfig --add telegraf
}
if ! grep "^telegraf:" /etc/group &>/dev/null; then
groupadd -r telegraf
fi
if ! id telegraf &>/dev/null; then
if ! grep "^telegraf:" /etc/group &>/dev/null; then
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf
else
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
useradd -r -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
test -d $LOG_DIR || mkdir -p $LOG_DIR
@ -56,10 +56,10 @@ if [[ ! -d /etc/telegraf/telegraf.d ]]; then
fi
# Distribution-specific logic
if [[ -f /etc/redhat-release ]]; then
if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd
install_systemd /usr/lib/systemd/system/telegraf.service
else
# Assuming SysVinit
install_init
@ -73,10 +73,10 @@ if [[ -f /etc/redhat-release ]]; then
elif [[ -f /etc/debian_version ]]; then
# Debian/Ubuntu logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd
install_systemd /lib/systemd/system/telegraf.service
systemctl restart telegraf || echo "WARNING: systemd not running."
else
# Assuming SysVinit
# Assuming SysVinit
install_init
# Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then
@ -89,7 +89,7 @@ elif [[ -f /etc/debian_version ]]; then
elif [[ -f /etc/os-release ]]; then
source /etc/os-release
if [[ $ID = "amzn" ]]; then
# Amazon Linux logic
# Amazon Linux logic
install_init
# Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then
@ -97,5 +97,6 @@ elif [[ -f /etc/os-release ]]; then
else
install_chkconfig
fi
/etc/init.d/telegraf restart
fi
fi

View File

@ -2,7 +2,7 @@
function disable_systemd {
systemctl disable telegraf
rm -f /lib/systemd/system/telegraf.service
rm -f $1
}
function disable_update_rcd {
@ -15,14 +15,14 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf
}
if [[ -f /etc/redhat-release ]]; then
if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic
if [[ "$1" = "0" ]]; then
# InfluxDB is no longer installed, remove from init system
rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd
disable_systemd /usr/lib/systemd/system/telegraf.service
else
# Assuming sysv
disable_chkconfig
@ -35,7 +35,7 @@ elif [[ -f /etc/debian_version ]]; then
rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd
disable_systemd /lib/systemd/system/telegraf.service
else
# Assuming sysv
# Run update-rc.d or fallback to chkconfig if not available

View File

@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
}
return false
}
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int)
return v, ok
}
}
}
}
return 0, false
}
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(float64)
return v, ok
}
}
}
}
return 0.0, false
}
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(string)
return v, ok
}
}
}
}
return "", false
}