Compare commits

...

28 Commits

Author SHA1 Message Date
Daniel Nelson
f93615672b Generate sha256 hashes when packaging
(cherry picked from commit 0b6db905ff)
2017-05-31 12:30:51 -07:00
Daniel Nelson
444f1ba09c Update changelog
(cherry picked from commit 9529199a44)
2017-05-30 17:41:06 -07:00
Daniel Nelson
5417a12ee3 Fix length calculation of split metric buffer (#2869)
(cherry picked from commit be03abd464)
2017-05-30 17:41:00 -07:00
Daniel Nelson
f6c986b5ca Update changelog
(cherry picked from commit 04aa732e94)
2017-05-30 11:05:56 -07:00
Steve Nardone
7f67bf593f Fix panic in mongo input (#2848)
(cherry picked from commit e7f9db297e)
2017-05-30 11:05:28 -07:00
Daniel Nelson
7ca902f987 Update changelog
(cherry picked from commit 7d7206b3e2)
2017-05-25 16:24:46 -07:00
Daniel Nelson
dc69cacaa1 Update gopsutil version
fixes #2856

(cherry picked from commit 03ca3975b5)
2017-05-25 16:24:33 -07:00
Daniel Nelson
29671154bc Update changelog
(cherry picked from commit e1088b9eee)
2017-05-25 13:39:55 -07:00
Daniel Nelson
4f8341670e Fix influxdb output database quoting (#2851)
(cherry picked from commit f47924ffc5)
2017-05-25 13:27:16 -07:00
Daniel Nelson
99edca80ef Handle process termination during read from /proc (#2816)
Fixes #2815.
(cherry picked from commit c53d9fa9b7)
2017-05-18 18:03:54 -07:00
Daniel Nelson
44654e011c Reuse transports in input plugins (#2782)
(cherry picked from commit a47aa0dcc2)
2017-05-18 18:02:09 -07:00
Daniel Nelson
389439111b Fixed sqlserver input to work with case sensitive server collation. (#2749)
Fixed a problem with sqlserver input where database properties are not returned by Telegraf when SQL Server has been set up with a case sensitive server-level collation.

* Added bugfix entry to CHANGELOG.md for sqlserver collation input fix.

(cherry picked from commit e2983383e4)
2017-05-18 17:59:14 -07:00
Daniel Nelson
2bc5594b44 Add release date for 1.3.0 2017-05-15 20:05:22 -07:00
Daniel Nelson
99b53c8745 Add back the changelog entry for 2141 2017-05-15 12:56:11 -07:00
Daniel Nelson
27b89dff48 Only split metrics if there is an udp output (#2799) 2017-05-12 15:34:31 -07:00
Sebastian Borza
b16eb6eae6 split metrics based on UDPPayload size (#2795) 2017-05-12 14:42:18 -07:00
Daniel Nelson
feaf76913b Add missing plugins to README 2017-05-09 13:51:26 -07:00
Daniel Nelson
ff704fbe0d Add SLES11 support to rpm package (#2768) 2017-05-05 14:30:31 -07:00
Sébastien
ebef47f56a fix systemd path in order to add compatibility with SuSe (#2499) 2017-05-05 14:30:24 -07:00
Daniel Nelson
18fd2d987d Return an error if no valid patterns. (#2753) 2017-05-02 14:55:16 -07:00
Alexander Blagoev
5e70cb3e44 Improve redis input documentation (#2708) 2017-05-02 14:12:09 -07:00
Patrick Hemmer
ce203dc687 fix close on closed socket_writer (#2748) 2017-05-02 11:07:58 -07:00
Daniel Nelson
b0a2e8e1bd Add initial documentation for rabbitmq input. (#2745) 2017-05-01 18:57:19 -07:00
Daniel Nelson
499495f844 Don't log error creating database on connect (#2740)
closes #2739
2017-04-28 15:59:28 -07:00
Daniel Nelson
20ab8fb2c3 Update telegraf.conf 2017-04-28 13:49:09 -07:00
Daniel Nelson
bc474d3a53 Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:48:24 -07:00
Daniel Nelson
547be87d79 Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:43:00 -07:00
Daniel Nelson
619d4d5d29 Use go 1.8.1 for CI and Release builds (#2732) 2017-04-27 16:22:41 -07:00
30 changed files with 714 additions and 239 deletions

View File

@@ -1,4 +1,16 @@
## v1.3 [unreleased] ## v1.3.1 [unreleased]
### Bugfixes
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
- [#2815](https://github.com/influxdata/telegraf/issues/2815): Inputs processes fails with "no such process".
- [#2851](https://github.com/influxdata/telegraf/pull/2851): Fix InfluxDB output database quoting.
- [#2856](https://github.com/influxdata/telegraf/issues/2856): Fix net input on older Linux kernels.
- [#2848](https://github.com/influxdata/telegraf/pull/2848): Fix panic in mongo input.
- [#2869](https://github.com/influxdata/telegraf/pull/2869): Fix length calculation of split metric buffer.
## v1.3 [2017-05-15]
### Release Notes ### Release Notes
@@ -79,6 +91,9 @@ be deprecated eventually.
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option - [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin - [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
### Bugfixes ### Bugfixes
@@ -118,6 +133,7 @@ be deprecated eventually.
- [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale - [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale
- [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload. - [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload.
- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs - [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs
- [#2706](https://github.com/influxdata/telegraf/issues/2706): Improve documentation for redis input plugin
## v1.2.1 [2017-02-01] ## v1.2.1 [2017-02-01]

2
Godeps
View File

@@ -46,7 +46,7 @@ github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062 github.com/shirou/gopsutil 9a4a9167ad3b4355dbf1c2c7a0f5f0d3fb1e9ab9
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987

View File

@@ -111,6 +111,7 @@ configuration options.
* [couchbase](./plugins/inputs/couchbase) * [couchbase](./plugins/inputs/couchbase)
* [couchdb](./plugins/inputs/couchdb) * [couchdb](./plugins/inputs/couchdb)
* [disque](./plugins/inputs/disque) * [disque](./plugins/inputs/disque)
* [dmcache](./plugins/inputs/dmcache)
* [dns query time](./plugins/inputs/dns_query) * [dns query time](./plugins/inputs/dns_query)
* [docker](./plugins/inputs/docker) * [docker](./plugins/inputs/docker)
* [dovecot](./plugins/inputs/dovecot) * [dovecot](./plugins/inputs/dovecot)
@@ -127,6 +128,7 @@ configuration options.
* [ipmi_sensor](./plugins/inputs/ipmi_sensor) * [ipmi_sensor](./plugins/inputs/ipmi_sensor)
* [iptables](./plugins/inputs/iptables) * [iptables](./plugins/inputs/iptables)
* [jolokia](./plugins/inputs/jolokia) * [jolokia](./plugins/inputs/jolokia)
* [kapacitor](./plugins/inputs/kapacitor)
* [kubernetes](./plugins/inputs/kubernetes) * [kubernetes](./plugins/inputs/kubernetes)
* [leofs](./plugins/inputs/leofs) * [leofs](./plugins/inputs/leofs)
* [lustre2](./plugins/inputs/lustre2) * [lustre2](./plugins/inputs/lustre2)
@@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
* [github](./plugins/inputs/webhooks/github) * [github](./plugins/inputs/webhooks/github)
* [mandrill](./plugins/inputs/webhooks/mandrill) * [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar) * [rollbar](./plugins/inputs/webhooks/rollbar)
* [papertrail](./plugins/inputs/webhooks/papertrail)
Telegraf is able to parse the following input data formats into metrics, these Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option: formats may be used with input plugins supporting the `data_format` option:

View File

@@ -1,13 +1,11 @@
machine: machine:
go:
version: 1.8.1
services: services:
- docker - docker
post: - memcached
- sudo service zookeeper stop - redis
- go version - rabbitmq-server
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.8.linux-amd64.tar.gz
- go version
dependencies: dependencies:
override: override:

View File

@@ -95,7 +95,8 @@
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@@ -141,7 +142,7 @@
# ## described here: https://www.rabbitmq.com/plugins.html # ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN" # # auth_method = "PLAIN"
# ## Telegraf tag to use as a routing key # ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, it's value will be used as the routing key # ## ie, if this tag exists, its value will be used as the routing key
# routing_tag = "host" # routing_tag = "host"
# #
# ## InfluxDB retention policy # ## InfluxDB retention policy
@@ -335,6 +336,10 @@
# ## Use SSL but skip chain & host verification # ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false # # insecure_skip_verify = false
# #
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to output. # ## Data format to output.
# ## Each data format has its own unique set of configuration options, read # ## Each data format has its own unique set of configuration options, read
# ## more about them here: # ## more about them here:
@@ -1325,6 +1330,18 @@
# attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" # attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
# # Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints
# [[inputs.kapacitor]]
# ## Multiple URLs from which to read Kapacitor-formatted JSON
# ## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
# urls = [
# "http://localhost:9092/kapacitor/v1/debug/vars"
# ]
#
# ## Time limit for http requests
# timeout = "5s"
# # Get kernel statistics from /proc/vmstat # # Get kernel statistics from /proc/vmstat
# [[inputs.kernel_vmstat]] # [[inputs.kernel_vmstat]]
# # no configuration # # no configuration

View File

@@ -29,6 +29,8 @@ type Apache struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
@@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
n.ResponseTimeout.Duration = time.Second * 5 n.ResponseTimeout.Duration = time.Second * 5
} }
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(n.Urls)) wg.Add(len(n.Urls))
for _, u := range n.Urls { for _, u := range n.Urls {
@@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { func (n *Apache) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
var tr *http.Transport n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
if addr.Scheme == "https" { return nil, err
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return err
}
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
} else {
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
} }
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: &http.Transport{
Timeout: n.ResponseTimeout.Duration, TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
} }
return client, nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req, err := http.NewRequest("GET", addr.String(), nil) req, err := http.NewRequest("GET", addr.String(), nil)
if err != nil { if err != nil {
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err) return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
@@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req.SetBasicAuth(n.Username, n.Password) req.SetBasicAuth(n.Username, n.Password)
} }
resp, err := client.Do(req) resp, err := n.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err) return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
} }

View File

@@ -25,7 +25,6 @@ type HTTPResponse struct {
Headers map[string]string Headers map[string]string
FollowRedirects bool FollowRedirects bool
ResponseStringMatch string ResponseStringMatch string
compiledStringMatch *regexp.Regexp
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@@ -35,6 +34,9 @@ type HTTPResponse struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
compiledStringMatch *regexp.Regexp
client *http.Client
} }
// Description returns the plugin Description // Description returns the plugin Description
@@ -88,13 +90,12 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
tr := &http.Transport{
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
TLSClientConfig: tlsCfg,
}
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: &http.Transport{
Timeout: h.ResponseTimeout.Duration, DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: h.ResponseTimeout.Duration,
} }
if h.FollowRedirects == false { if h.FollowRedirects == false {
@@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
} }
// HTTPGather gathers all fields and returns any errors it encounters // HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) { func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
// Prepare fields // Prepare fields
fields := make(map[string]interface{}) fields := make(map[string]interface{})
client, err := h.createHttpClient()
if err != nil {
return nil, err
}
var body io.Reader var body io.Reader
if h.Body != "" { if h.Body != "" {
body = strings.NewReader(h.Body) body = strings.NewReader(h.Body)
@@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Start Timer // Start Timer
start := time.Now() start := time.Now()
resp, err := client.Do(request) resp, err := h.client.Do(request)
if err != nil { if err != nil {
if h.FollowRedirects { if h.FollowRedirects {
return nil, err return nil, err
@@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
return nil, err return nil, err
} }
} }
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fields["response_time"] = time.Since(start).Seconds() fields["response_time"] = time.Since(start).Seconds()
fields["http_response_code"] = resp.StatusCode fields["http_response_code"] = resp.StatusCode
@@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Prepare data // Prepare data
tags := map[string]string{"server": h.Address, "method": h.Method} tags := map[string]string{"server": h.Address, "method": h.Method}
var fields map[string]interface{} var fields map[string]interface{}
if h.client == nil {
client, err := h.createHttpClient()
if err != nil {
return err
}
h.client = client
}
// Gather data // Gather data
fields, err = h.HTTPGather() fields, err = h.httpGather()
if err != nil { if err != nil {
return err return err
} }

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
"Host": "Hello", "Host": "Hello",
}, },
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
assert.NotNil(t, fields["response_time"])
} }
func TestFields(t *testing.T) { func TestFields(t *testing.T) {
@@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
assert.NotNil(t, fields["response_time"])
} }
func TestRedirects(t *testing.T) { func TestRedirects(t *testing.T) {
@@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/badredirect", Address: ts.URL + "/badredirect",
@@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.Error(t, err) require.Error(t, err)
value, ok = acc.IntField("http_response", "http_response_code")
require.False(t, ok)
} }
func TestMethod(t *testing.T) { func TestMethod(t *testing.T) {
@@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/mustbepostmethod", Address: ts.URL + "/mustbepostmethod",
@@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusMethodNotAllowed, value)
//check that lowercase methods work correctly //check that lowercase methods work correctly
h = &HTTPResponse{ h = &HTTPResponse{
@@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusMethodNotAllowed, value)
} }
func TestBody(t *testing.T) { func TestBody(t *testing.T) {
@@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/musthaveabody", Address: ts.URL + "/musthaveabody",
@@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusBadRequest, value)
} }
func TestStringMatch(t *testing.T) { func TestStringMatch(t *testing.T) {
@@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestStringMatchJson(t *testing.T) { func TestStringMatchJson(t *testing.T) {
@@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestStringMatchFail(t *testing.T) { func TestStringMatchFail(t *testing.T) {
@@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather()
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 0, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 0, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test with sleep in short mode.")
}
mux := setUpTestMux() mux := setUpTestMux()
ts := httptest.NewServer(mux) ts := httptest.NewServer(mux)
defer ts.Close() defer ts.Close()
@@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
_, err := h.HTTPGather() var acc testutil.Accumulator
require.Error(t, err) err := h.Gather(&acc)
require.NoError(t, err)
ok := acc.HasIntField("http_response", "http_response_code")
require.False(t, ok)
} }

View File

@@ -118,11 +118,18 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named // Give Patterns fake names so that they can be treated as named
// "custom patterns" // "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns)) p.namedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns { for i, pattern := range p.Patterns {
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n" p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}" p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
}
if len(p.namedPatterns) == 0 {
return fmt.Errorf("pattern required")
} }
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse

View File

@@ -117,16 +117,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
// compile log parser patterns: // compile log parser patterns:
var haveError bool
for _, parser := range l.parsers { for _, parser := range l.parsers {
if err := parser.Compile(); err != nil { if err := parser.Compile(); err != nil {
acc.AddError(err) return err
haveError = true
} }
} }
if haveError {
return nil
}
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()

View File

@@ -38,12 +38,8 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
logparser.Start(&acc) err := logparser.Start(&acc)
if assert.NotEmpty(t, acc.Errors) { assert.Error(t, err)
assert.Error(t, acc.Errors[0])
}
logparser.Stop()
} }
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {

View File

@@ -564,7 +564,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
// BEGIN code modification // BEGIN code modification
if newStat.Repl.IsMaster.(bool) { if newStat.Repl.IsMaster.(bool) {
returnVal.NodeType = "PRI" returnVal.NodeType = "PRI"
} else if newStat.Repl.Secondary.(bool) { } else if newStat.Repl.Secondary != nil && newStat.Repl.Secondary.(bool) {
returnVal.NodeType = "SEC" returnVal.NodeType = "SEC"
} else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) { } else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) {
returnVal.NodeType = "ARB" returnVal.NodeType = "ARB"

View File

@@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@@ -32,6 +31,8 @@ type Prometheus struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
@@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error { func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHttpClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup var wg sync.WaitGroup
for _, serv := range p.Urls { for _, serv := range p.Urls {
@@ -89,29 +98,30 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout.Duration,
}
return client, nil
}
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url, nil) var req, err = http.NewRequest("GET", url, nil)
req.Header.Add("Accept", acceptHeader) req.Header.Add("Accept", acceptHeader)
var token []byte var token []byte
var resp *http.Response var resp *http.Response
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return err
}
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
DisableKeepAlives: true,
}
if p.BearerToken != "" { if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken) token, err = ioutil.ReadFile(p.BearerToken)
if err != nil { if err != nil {
@@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
req.Header.Set("Authorization", "Bearer "+string(token)) req.Header.Set("Authorization", "Bearer "+string(token))
} }
resp, err = rt.RoundTrip(req) resp, err = p.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err) return fmt.Errorf("error making HTTP request to %s: %s", url, err)
} }

View File

@@ -0,0 +1,121 @@
# RabbitMQ Input Plugin
Reads metrics from RabbitMQ servers via the [Management Plugin](https://www.rabbitmq.com/management.html).
For additional details reference the [RabbitMQ Management HTTP Stats](https://cdn.rawgit.com/rabbitmq/rabbitmq-management/master/priv/www/doc/stats.html).
### Configuration:
```toml
[[inputs.rabbitmq]]
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional request timeouts
##
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
## for a server's response headers after fully writing the request.
# header_timeout = "3s"
##
## client_timeout specifies a time limit for requests made by this client.
## Includes connection time, any redirects, and reading the response body.
# client_timeout = "4s"
## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
```
### Measurements & Fields:
- rabbitmq_overview
- channels (int, channels)
- connections (int, connections)
- consumers (int, consumers)
- exchanges (int, exchanges)
- messages (int, messages)
- messages_acked (int, messages)
- messages_delivered (int, messages)
- messages_published (int, messages)
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- rabbitmq_queue
- consumer_utilisation (float, percent)
- consumers (int, int)
- idle_since (string, time - e.g., "2006-01-02 15:04:05")
- memory (int, bytes)
- message_bytes (int, bytes)
- message_bytes_persist (int, bytes)
- message_bytes_ram (int, bytes)
- message_bytes_ready (int, bytes)
- message_bytes_unacked (int, bytes)
- messages (int, count)
- messages_ack (int, count)
- messages_ack_rate (float, messages per second)
- messages_deliver (int, count)
- messages_deliver_rate (float, messages per second)
- messages_deliver_get (int, count)
- messages_deliver_get_rate (float, messages per second)
- messages_publish (int, count)
- messages_publish_rate (float, messages per second)
- messages_ready (int, count)
- messages_redeliver (int, count)
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)
### Tags:
- All measurements have the following tags:
- url
- rabbitmq_overview
- name
- rabbitmq_node
- node
- rabbitmq_queue
- url
- queue
- vhost
- node
- durable
- auto_delete
### Sample Queries:
### Example Output:
```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
```

View File

@@ -140,8 +140,11 @@ type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = ` var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672" # url = "http://localhost:15672"
# name = "rmq-server-1" # optional tag ## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
@@ -174,7 +177,7 @@ func (r *RabbitMQ) SampleConfig() string {
// Description ... // Description ...
func (r *RabbitMQ) Description() string { func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Reads metrics from RabbitMQ servers via the Management Plugin"
} }
// Gather ... // Gather ...

View File

@@ -18,47 +18,103 @@
### Measurements & Fields: ### Measurements & Fields:
- Measurement The plugin gathers the results of the [INFO](https://redis.io/commands/info) redis command.
- uptime_in_seconds There are two separate measurements: _redis_ and _redis\_keyspace_, the latter is used for gathering database related statistics.
- connected_clients
- used_memory Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) and the elapsed time since the last rdb save (rdb\_last\_save\_time\_elapsed).
- used_memory_rss
- used_memory_peak - redis
- used_memory_lua - keyspace_hitrate(float, number)
- rdb_changes_since_last_save - rdb_last_save_time_elapsed(int, seconds)
- total_connections_received
- total_commands_processed **Server**
- instantaneous_ops_per_sec - uptime(int, seconds)
- instantaneous_input_kbps - lru_clock(int, number)
- instantaneous_output_kbps
- sync_full **Clients**
- sync_partial_ok - clients(int, number)
- sync_partial_err - client_longest_output_list(int, number)
- expired_keys - client_biggest_input_buf(int, number)
- evicted_keys - blocked_clients(int, number)
- keyspace_hits
- keyspace_misses **Memory**
- pubsub_channels - used_memory(int, bytes)
- pubsub_patterns - used_memory_rss(int, bytes)
- latest_fork_usec - used_memory_peak(int, bytes)
- connected_slaves - total_system_memory(int, bytes)
- master_repl_offset - used_memory_lua(int, bytes)
- master_last_io_seconds_ago - maxmemory(int, bytes)
- repl_backlog_active - maxmemory_policy(string)
- repl_backlog_size - mem_fragmentation_ratio(float, number)
- repl_backlog_histlen
- mem_fragmentation_ratio **Persistance**
- used_cpu_sys - loading(int,flag)
- used_cpu_user - rdb_changes_since_last_save(int, number)
- used_cpu_sys_children - rdb_bgsave_in_progress(int, flag)
- used_cpu_user_children - rdb_last_save_time(int, seconds)
- rdb_last_bgsave_status(string)
- rdb_last_bgsave_time_sec(int, seconds)
- rdb_current_bgsave_time_sec(int, seconds)
- aof_enabled(int, flag)
- aof_rewrite_in_progress(int, flag)
- aof_rewrite_scheduled(int, flag)
- aof_last_rewrite_time_sec(int, seconds)
- aof_current_rewrite_time_sec(int, seconds)
- aof_last_bgrewrite_status(string)
- aof_last_write_status(string)
**Stats**
- total_connections_received(int, number)
- total_commands_processed(int, number)
- instantaneous_ops_per_sec(int, number)
- total_net_input_bytes(int, bytes)
- total_net_output_bytes(int, bytes)
- instantaneous_input_kbps(float, bytes)
- instantaneous_output_kbps(float, bytes)
- rejected_connections(int, number)
- sync_full(int, number)
- sync_partial_ok(int, number)
- sync_partial_err(int, number)
- expired_keys(int, number)
- evicted_keys(int, number)
- keyspace_hits(int, number)
- keyspace_misses(int, number)
- pubsub_channels(int, number)
- pubsub_patterns(int, number)
- latest_fork_usec(int, microseconds)
- migrate_cached_sockets(int, number)
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)
- repl_backlog_histlen(int, bytes)
**CPU**
- used_cpu_sys(float, number)
- used_cpu_user(float, number)
- used_cpu_sys_children(float, number)
- used_cpu_user_children(float, number)
**Cluster**
- cluster_enabled(int, flag)
- redis_keyspace
- keys(int, number)
- expires(int, number)
- avg_ttl(int, number)
### Tags: ### Tags:
- All measurements have the following tags: - All measurements have the following tags:
- port - port
- server - server
- replication role - replication_role
- The redis_keyspace measurement has an additional database tag:
- database
### Example Output: ### Example Output:
@@ -84,5 +140,10 @@ When run with:
It produces: It produces:
``` ```
* Plugin: redis, Collection 1 * Plugin: redis, Collection 1
> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383 > redis,server=localhost,port=6379,replication_role=master,host=host keyspace_hitrate=1,clients=2i,blocked_clients=0i,instantaneous_input_kbps=0,sync_full=0i,pubsub_channels=0i,pubsub_patterns=0i,total_net_output_bytes=6659253i,used_memory=842448i,total_system_memory=8351916032i,aof_current_rewrite_time_sec=-1i,rdb_changes_since_last_save=0i,sync_partial_err=0i,latest_fork_usec=508i,instantaneous_output_kbps=0,expired_keys=0i,used_memory_peak=843416i,aof_rewrite_in_progress=0i,aof_last_bgrewrite_status="ok",migrate_cached_sockets=0i,connected_slaves=0i,maxmemory_policy="noeviction",aof_rewrite_scheduled=0i,total_net_input_bytes=3125i,used_memory_rss=9564160i,repl_backlog_histlen=0i,rdb_last_bgsave_status="ok",aof_last_rewrite_time_sec=-1i,keyspace_misses=0i,client_biggest_input_buf=5i,used_cpu_user=1.33,maxmemory=0i,rdb_current_bgsave_time_sec=-1i,total_commands_processed=271i,repl_backlog_size=1048576i,used_cpu_sys=3,uptime=2822i,lru_clock=16706281i,used_memory_lua=37888i,rejected_connections=0i,sync_partial_ok=0i,evicted_keys=0i,rdb_last_save_time_elapsed=1922i,rdb_last_save_time=1493099368i,instantaneous_ops_per_sec=0i,used_cpu_user_children=0,client_longest_output_list=0i,master_repl_offset=0i,repl_backlog_active=0i,keyspace_hits=2i,used_cpu_sys_children=0,cluster_enabled=0i,rdb_last_bgsave_time_sec=0i,aof_last_write_status="ok",total_connections_received=263i,aof_enabled=0i,repl_backlog_first_byte_offset=0i,mem_fragmentation_ratio=11.35,loading=0i,rdb_bgsave_in_progress=0i 1493101290000000000
```
redis_keyspace:
```
> redis_keyspace,database=db1,host=host,server=localhost,port=6379,replication_role=master keys=1i,expires=0i,avg_ttl=0i 1493101350000000000
``` ```

View File

@@ -843,7 +843,7 @@ FROM (SELECT DISTINCT DatabaseName FROM #Databases) AS bl
SET @DynamicPivotQuery = N' SET @DynamicPivotQuery = N'
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -856,7 +856,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -869,7 +869,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -883,7 +883,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -896,7 +896,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -909,7 +909,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -922,7 +922,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -935,7 +935,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -948,7 +948,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -961,7 +961,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)

View File

@@ -12,6 +12,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"strconv" "strconv"
"syscall"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@@ -195,6 +196,13 @@ func readProcFile(filename string) ([]byte, error) {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
} }
// Reading from /proc/<PID> fails with ESRCH if the process has
// been terminated between open() and read().
if perr, ok := err.(*os.PathError); ok && perr.Err == syscall.ESRCH {
return nil, nil
}
return nil, err return nil, err
} }

View File

@@ -18,7 +18,8 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@@ -52,7 +53,7 @@ to write to. Each URL should start with either `http://` or `udp://`
### Optional parameters: ### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all". * `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Retention policy to write to. * `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended). * `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb * `username`: Username for influxdb
* `password`: Password for influxdb * `password`: Password for influxdb

View File

@@ -16,7 +16,6 @@ var (
defaultRequestTimeout = time.Second * 5 defaultRequestTimeout = time.Second * 5
) )
//
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) { func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
// validate required parameters: // validate required parameters:
if len(config.URL) == 0 { if len(config.URL) == 0 {

View File

@@ -25,6 +25,7 @@ type UDPConfig struct {
PayloadSize int PayloadSize int
} }
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) { func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL) p, err := url.Parse(config.URL)
if err != nil { if err != nil {
@@ -55,20 +56,22 @@ type udpClient struct {
buffer []byte buffer []byte
} }
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error { func (c *udpClient) Query(command string) error {
return nil return nil
} }
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) { func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// write params are ignored by the UDP client // WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// contentLength is ignored by the UDP client. // WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int var totaln int
for { for {
@@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
return totaln, nil return totaln, nil
} }
// contentLength is ignored by the UDP client. // WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client // write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1) return c.WriteStream(r, -1)
} }
// Close will terminate the provided client connection
func (c *udpClient) Close() error { func (c *udpClient) Close() error {
return c.conn.Close() return c.conn.Close()
} }

View File

@@ -15,6 +15,12 @@ import (
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
) )
var (
// Quote Ident replacer.
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
)
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct { type InfluxDB struct {
// URL is only for backwards compatability // URL is only for backwards compatability
URL string URL string
@@ -40,7 +46,8 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored. // Precision is only here for legacy support. It will be ignored.
Precision string Precision string
clients []client.Client clients []client.Client
splitPayload bool
} }
var sampleConfig = ` var sampleConfig = `
@@ -55,7 +62,8 @@ var sampleConfig = `
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@@ -78,11 +86,10 @@ var sampleConfig = `
# insecure_skip_verify = false # insecure_skip_verify = false
` `
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
var urls []string var urls []string
for _, u := range i.URLs { urls = append(urls, i.URLs...)
urls = append(urls, u)
}
// Backward-compatability with single Influx URL config files // Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list // This could eventually be removed in favor of specifying the urls as a list
@@ -108,6 +115,7 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
i.splitPayload = true
default: default:
// If URL doesn't start with "udp", assume HTTP client // If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{ config := client.HTTPConfig{
@@ -129,9 +137,11 @@ func (i *InfluxDB) Connect() error {
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
err = c.Query("CREATE DATABASE " + i.Database) err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if err != nil { if err != nil {
log.Println("E! Database creation failed: " + err.Error()) if !strings.Contains(err.Error(), "Status Code [403]") {
log.Println("I! Database creation failed: " + err.Error())
}
continue continue
} }
} }
@@ -141,25 +151,43 @@ func (i *InfluxDB) Connect() error {
return nil return nil
} }
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error { func (i *InfluxDB) Close() error {
return nil return nil
} }
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) SampleConfig() string { func (i *InfluxDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string { func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
// Choose a random server in the cluster to write to until a successful write func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
if !i.splitPayload {
return metrics
}
split := make([]telegraf.Metric, 0)
for _, m := range metrics {
split = append(split, m.Split(i.UDPPayload)...)
}
return split
}
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
metrics = i.split(metrics)
bufsize := 0 bufsize := 0
for _, m := range metrics { for _, m := range metrics {
bufsize += m.Len() bufsize += m.Len()
} }
r := metric.NewReader(metrics) r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
@@ -170,7 +198,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil { if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
// If the database was not found, try to recreate it: // If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") { if strings.Contains(e.Error(), "database not found") {
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil { errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if errc != nil {
log.Printf("E! Error: Database %s not found and failed to recreate\n", log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database) i.Database)
} }

View File

@@ -2,15 +2,55 @@ package influxdb
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestIdentQuoting(t *testing.T) {
var testCases = []struct {
database string
expected string
}{
{"x-y", `CREATE DATABASE "x-y"`},
{`x"y`, `CREATE DATABASE "x\"y"`},
{"x\ny", `CREATE DATABASE "x\ny"`},
{`x\y`, `CREATE DATABASE "x\\y"`},
}
for _, tc := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
q := r.Form.Get("q")
assert.Equal(t, tc.expected, q)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: tc.database,
}
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
}
func TestUDPInflux(t *testing.T) { func TestUDPInflux(t *testing.T) {
i := InfluxDB{ i := InfluxDB{
URLs: []string{"udp://localhost:8089"}, URLs: []string{"udp://localhost:8089"},
@@ -23,6 +63,35 @@ func TestUDPInflux(t *testing.T) {
require.NoError(t, i.Close()) require.NoError(t, i.Close())
} }
func TestBasicSplit(t *testing.T) {
c := &MockClient{}
i := InfluxDB{
clients: []client.Client{c},
UDPPayload: 50,
splitPayload: true,
}
// Input metrics:
// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
//
// Split metrics:
// test1,tag1=value1 value1=1 1257894000000000000\n
// test1,tag1=value1 value2=2 1257894000000000000\n
m, err := metric.New("test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"value1": 1.0, "value2": 2.0},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
require.NoError(t, err)
metrics := []telegraf.Metric{m}
err = i.Write(metrics)
require.Equal(t, 1, c.writeStreamCalled)
require.Equal(t, 94, c.contentLength)
require.NoError(t, err)
}
func TestHTTPInflux(t *testing.T) { func TestHTTPInflux(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path { switch r.URL.Path {
@@ -164,3 +233,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, i.Close()) require.NoError(t, i.Close())
} }
type MockClient struct {
writeStreamCalled int
contentLength int
}
func (m *MockClient) Query(command string) error {
panic("not implemented")
}
func (m *MockClient) Write(b []byte) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) {
m.writeStreamCalled++
m.contentLength = contentLength
return 0, nil
}
func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) Close() error {
panic("not implemented")
}

View File

@@ -124,6 +124,16 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
// Close closes the connection. Noop if already closed.
func (sw *SocketWriter) Close() error {
if sw.Conn == nil {
return nil
}
err := sw.Conn.Close()
sw.Conn = nil
return err
}
func newSocketWriter() *SocketWriter { func newSocketWriter() *SocketWriter {
s, _ := serializers.NewInfluxSerializer() s, _ := serializers.NewInfluxSerializer()
return &SocketWriter{ return &SocketWriter{

View File

@@ -143,7 +143,7 @@ func TestSocketWriter_Write_err(t *testing.T) {
// close the socket to generate an error // close the socket to generate an error
lconn.Close() lconn.Close()
sw.Close() sw.Conn.Close()
err = sw.Write(metrics) err = sw.Write(metrics)
require.Error(t, err) require.Error(t, err)
assert.Nil(t, sw.Conn) assert.Nil(t, sw.Conn)

View File

@@ -499,13 +499,12 @@ def build(version=None,
logging.info("Time taken: {}s".format((end_time - start_time).total_seconds())) logging.info("Time taken: {}s".format((end_time - start_time).total_seconds()))
return True return True
def generate_md5_from_file(path): def generate_sha256_from_file(path):
"""Generate MD5 signature based on the contents of the file at path. """Generate SHA256 hash signature based on the contents of the file at path.
""" """
m = hashlib.md5() m = hashlib.sha256()
with open(path, 'rb') as f: with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""): m.update(f.read())
m.update(chunk)
return m.hexdigest() return m.hexdigest()
def generate_sig_from_file(path): def generate_sig_from_file(path):
@@ -636,6 +635,10 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
elif package_type not in ['zip', 'tar'] and static or "static_" in arch: elif package_type not in ['zip', 'tar'] and static or "static_" in arch:
logging.info("Skipping package type '{}' for static builds.".format(package_type)) logging.info("Skipping package type '{}' for static builds.".format(package_type))
else: else:
if package_type == 'rpm' and release and '~' in package_version:
package_version, suffix = package_version.split('~', 1)
# The ~ indicatees that this is a prerelease so we give it a leading 0.
package_iteration = "0.%s" % suffix
fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format( fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format(
fpm_common_args, fpm_common_args,
name, name,
@@ -664,9 +667,6 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
if package_type == 'rpm': if package_type == 'rpm':
# rpm's convert any dashes to underscores # rpm's convert any dashes to underscores
package_version = package_version.replace("-", "_") package_version = package_version.replace("-", "_")
new_outfile = outfile.replace("{}-{}".format(package_version, package_iteration), package_version)
os.rename(outfile, new_outfile)
outfile = new_outfile
outfiles.append(os.path.join(os.getcwd(), outfile)) outfiles.append(os.path.join(os.getcwd(), outfile))
logging.debug("Produced package files: {}".format(outfiles)) logging.debug("Produced package files: {}".format(outfiles))
return outfiles return outfiles
@@ -789,9 +789,10 @@ def main(args):
if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite): if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite):
return 1 return 1
logging.info("Packages created:") logging.info("Packages created:")
for p in packages: for filename in packages:
logging.info("{} (MD5={})".format(p.split('/')[-1:][0], logging.info("%s (SHA256=%s)",
generate_md5_from_file(p))) os.path.basename(filename),
generate_sha256_from_file(filename))
if orig_branch != get_current_branch(): if orig_branch != get_current_branch():
logging.info("Moving back to original git branch: {}".format(args.branch)) logging.info("Moving back to original git branch: {}".format(args.branch))
run("git checkout {}".format(orig_branch)) run("git checkout {}".format(orig_branch))

View File

@@ -135,7 +135,9 @@ case $1 in
fi fi
log_success_msg "Starting the process" "$name" log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then if command -v startproc >/dev/null; then
startproc -u "$USER" -g "$GROUP" -p "$pidfile" -q -- "$daemon" -pidfile "$pidfile" -config "$config" -config-directory "$confdir" $TELEGRAF_OPTS
elif which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR & start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
else else
su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER

View File

@@ -11,7 +11,7 @@ function install_init {
} }
function install_systemd { function install_systemd {
cp -f $SCRIPT_DIR/telegraf.service /lib/systemd/system/telegraf.service cp -f $SCRIPT_DIR/telegraf.service $1
systemctl enable telegraf || true systemctl enable telegraf || true
systemctl daemon-reload || true systemctl daemon-reload || true
} }
@@ -24,12 +24,12 @@ function install_chkconfig {
chkconfig --add telegraf chkconfig --add telegraf
} }
if ! grep "^telegraf:" /etc/group &>/dev/null; then
groupadd -r telegraf
fi
if ! id telegraf &>/dev/null; then if ! id telegraf &>/dev/null; then
if ! grep "^telegraf:" /etc/group &>/dev/null; then useradd -r -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf
else
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
fi fi
test -d $LOG_DIR || mkdir -p $LOG_DIR test -d $LOG_DIR || mkdir -p $LOG_DIR
@@ -56,10 +56,10 @@ if [[ ! -d /etc/telegraf/telegraf.d ]]; then
fi fi
# Distribution-specific logic # Distribution-specific logic
if [[ -f /etc/redhat-release ]]; then if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic # RHEL-variant logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd install_systemd /usr/lib/systemd/system/telegraf.service
else else
# Assuming SysVinit # Assuming SysVinit
install_init install_init
@@ -73,10 +73,10 @@ if [[ -f /etc/redhat-release ]]; then
elif [[ -f /etc/debian_version ]]; then elif [[ -f /etc/debian_version ]]; then
# Debian/Ubuntu logic # Debian/Ubuntu logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd install_systemd /lib/systemd/system/telegraf.service
systemctl restart telegraf || echo "WARNING: systemd not running." systemctl restart telegraf || echo "WARNING: systemd not running."
else else
# Assuming SysVinit # Assuming SysVinit
install_init install_init
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then if which update-rc.d &>/dev/null; then
@@ -89,7 +89,7 @@ elif [[ -f /etc/debian_version ]]; then
elif [[ -f /etc/os-release ]]; then elif [[ -f /etc/os-release ]]; then
source /etc/os-release source /etc/os-release
if [[ $ID = "amzn" ]]; then if [[ $ID = "amzn" ]]; then
# Amazon Linux logic # Amazon Linux logic
install_init install_init
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then if which update-rc.d &>/dev/null; then
@@ -97,5 +97,6 @@ elif [[ -f /etc/os-release ]]; then
else else
install_chkconfig install_chkconfig
fi fi
/etc/init.d/telegraf restart
fi fi
fi fi

View File

@@ -2,7 +2,7 @@
function disable_systemd { function disable_systemd {
systemctl disable telegraf systemctl disable telegraf
rm -f /lib/systemd/system/telegraf.service rm -f $1
} }
function disable_update_rcd { function disable_update_rcd {
@@ -15,14 +15,14 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf rm -f /etc/init.d/telegraf
} }
if [[ -f /etc/redhat-release ]]; then if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic # RHEL-variant logic
if [[ "$1" = "0" ]]; then if [[ "$1" = "0" ]]; then
# InfluxDB is no longer installed, remove from init system # InfluxDB is no longer installed, remove from init system
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd disable_systemd /usr/lib/systemd/system/telegraf.service
else else
# Assuming sysv # Assuming sysv
disable_chkconfig disable_chkconfig
@@ -35,7 +35,7 @@ elif [[ -f /etc/debian_version ]]; then
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd disable_systemd /lib/systemd/system/telegraf.service
else else
# Assuming sysv # Assuming sysv
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available

View File

@@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
} }
return false return false
} }
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int)
return v, ok
}
}
}
}
return 0, false
}
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(float64)
return v, ok
}
}
}
}
return 0.0, false
}
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(string)
return v, ok
}
}
}
}
return "", false
}