Compare commits

...

28 Commits
1.7.0 ... 1.3.1

Author SHA1 Message Date
Daniel Nelson
f93615672b Generate sha256 hashes when packaging
(cherry picked from commit 0b6db905ff)
2017-05-31 12:30:51 -07:00
Daniel Nelson
444f1ba09c Update changelog
(cherry picked from commit 9529199a44)
2017-05-30 17:41:06 -07:00
Daniel Nelson
5417a12ee3 Fix length calculation of split metric buffer (#2869)
(cherry picked from commit be03abd464)
2017-05-30 17:41:00 -07:00
Daniel Nelson
f6c986b5ca Update changelog
(cherry picked from commit 04aa732e94)
2017-05-30 11:05:56 -07:00
Steve Nardone
7f67bf593f Fix panic in mongo input (#2848)
(cherry picked from commit e7f9db297e)
2017-05-30 11:05:28 -07:00
Daniel Nelson
7ca902f987 Update changelog
(cherry picked from commit 7d7206b3e2)
2017-05-25 16:24:46 -07:00
Daniel Nelson
dc69cacaa1 Update gopsutil version
fixes #2856

(cherry picked from commit 03ca3975b5)
2017-05-25 16:24:33 -07:00
Daniel Nelson
29671154bc Update changelog
(cherry picked from commit e1088b9eee)
2017-05-25 13:39:55 -07:00
Daniel Nelson
4f8341670e Fix influxdb output database quoting (#2851)
(cherry picked from commit f47924ffc5)
2017-05-25 13:27:16 -07:00
Daniel Nelson
99edca80ef Handle process termination during read from /proc (#2816)
Fixes #2815.
(cherry picked from commit c53d9fa9b7)
2017-05-18 18:03:54 -07:00
Daniel Nelson
44654e011c Reuse transports in input plugins (#2782)
(cherry picked from commit a47aa0dcc2)
2017-05-18 18:02:09 -07:00
Daniel Nelson
389439111b Fixed sqlserver input to work with case sensitive server collation. (#2749)
Fixed a problem with sqlserver input where database properties are not returned by Telegraf when SQL Server has been set up with a case sensitive server-level collation.

* Added bugfix entry to CHANGELOG.md for sqlserver collation input fix.

(cherry picked from commit e2983383e4)
2017-05-18 17:59:14 -07:00
Daniel Nelson
2bc5594b44 Add release date for 1.3.0 2017-05-15 20:05:22 -07:00
Daniel Nelson
99b53c8745 Add back the changelog entry for 2141 2017-05-15 12:56:11 -07:00
Daniel Nelson
27b89dff48 Only split metrics if there is an udp output (#2799) 2017-05-12 15:34:31 -07:00
Sebastian Borza
b16eb6eae6 split metrics based on UDPPayload size (#2795) 2017-05-12 14:42:18 -07:00
Daniel Nelson
feaf76913b Add missing plugins to README 2017-05-09 13:51:26 -07:00
Daniel Nelson
ff704fbe0d Add SLES11 support to rpm package (#2768) 2017-05-05 14:30:31 -07:00
Sébastien
ebef47f56a fix systemd path in order to add compatibility with SuSe (#2499) 2017-05-05 14:30:24 -07:00
Daniel Nelson
18fd2d987d Return an error if no valid patterns. (#2753) 2017-05-02 14:55:16 -07:00
Alexander Blagoev
5e70cb3e44 Improve redis input documentation (#2708) 2017-05-02 14:12:09 -07:00
Patrick Hemmer
ce203dc687 fix close on closed socket_writer (#2748) 2017-05-02 11:07:58 -07:00
Daniel Nelson
b0a2e8e1bd Add initial documentation for rabbitmq input. (#2745) 2017-05-01 18:57:19 -07:00
Daniel Nelson
499495f844 Don't log error creating database on connect (#2740)
closes #2739
2017-04-28 15:59:28 -07:00
Daniel Nelson
20ab8fb2c3 Update telegraf.conf 2017-04-28 13:49:09 -07:00
Daniel Nelson
bc474d3a53 Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:48:24 -07:00
Daniel Nelson
547be87d79 Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:43:00 -07:00
Daniel Nelson
619d4d5d29 Use go 1.8.1 for CI and Release builds (#2732) 2017-04-27 16:22:41 -07:00
30 changed files with 714 additions and 239 deletions

View File

@@ -1,4 +1,16 @@
## v1.3 [unreleased]
## v1.3.1 [unreleased]
### Bugfixes
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
- [#2815](https://github.com/influxdata/telegraf/issues/2815): Inputs processes fails with "no such process".
- [#2851](https://github.com/influxdata/telegraf/pull/2851): Fix InfluxDB output database quoting.
- [#2856](https://github.com/influxdata/telegraf/issues/2856): Fix net input on older Linux kernels.
- [#2848](https://github.com/influxdata/telegraf/pull/2848): Fix panic in mongo input.
- [#2869](https://github.com/influxdata/telegraf/pull/2869): Fix length calculation of split metric buffer.
## v1.3 [2017-05-15]
### Release Notes
@@ -79,6 +91,9 @@ be deprecated eventually.
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
### Bugfixes
@@ -118,6 +133,7 @@ be deprecated eventually.
- [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale
- [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload.
- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs
- [#2706](https://github.com/influxdata/telegraf/issues/2706): Improve documentation for redis input plugin
## v1.2.1 [2017-02-01]

2
Godeps
View File

@@ -46,7 +46,7 @@ github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062
github.com/shirou/gopsutil 9a4a9167ad3b4355dbf1c2c7a0f5f0d3fb1e9ab9
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987

View File

@@ -111,6 +111,7 @@ configuration options.
* [couchbase](./plugins/inputs/couchbase)
* [couchdb](./plugins/inputs/couchdb)
* [disque](./plugins/inputs/disque)
* [dmcache](./plugins/inputs/dmcache)
* [dns query time](./plugins/inputs/dns_query)
* [docker](./plugins/inputs/docker)
* [dovecot](./plugins/inputs/dovecot)
@@ -127,6 +128,7 @@ configuration options.
* [ipmi_sensor](./plugins/inputs/ipmi_sensor)
* [iptables](./plugins/inputs/iptables)
* [jolokia](./plugins/inputs/jolokia)
* [kapacitor](./plugins/inputs/kapacitor)
* [kubernetes](./plugins/inputs/kubernetes)
* [leofs](./plugins/inputs/leofs)
* [lustre2](./plugins/inputs/lustre2)
@@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
* [github](./plugins/inputs/webhooks/github)
* [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar)
* [papertrail](./plugins/inputs/webhooks/papertrail)
Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option:

View File

@@ -1,13 +1,11 @@
machine:
go:
version: 1.8.1
services:
- docker
post:
- sudo service zookeeper stop
- go version
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.8.linux-amd64.tar.gz
- go version
- memcached
- redis
- rabbitmq-server
dependencies:
override:

View File

@@ -95,7 +95,8 @@
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
@@ -141,7 +142,7 @@
# ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN"
# ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, it's value will be used as the routing key
# ## ie, if this tag exists, its value will be used as the routing key
# routing_tag = "host"
#
# ## InfluxDB retention policy
@@ -335,6 +336,10 @@
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to output.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
@@ -1325,6 +1330,18 @@
# attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
# # Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints
# [[inputs.kapacitor]]
# ## Multiple URLs from which to read Kapacitor-formatted JSON
# ## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
# urls = [
# "http://localhost:9092/kapacitor/v1/debug/vars"
# ]
#
# ## Time limit for http requests
# timeout = "5s"
# # Get kernel statistics from /proc/vmstat
# [[inputs.kernel_vmstat]]
# # no configuration

View File

@@ -29,6 +29,8 @@ type Apache struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
client *http.Client
}
var sampleConfig = `
@@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
n.ResponseTimeout.Duration = time.Second * 5
}
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
var wg sync.WaitGroup
wg.Add(len(n.Urls))
for _, u := range n.Urls {
@@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
return nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
var tr *http.Transport
if addr.Scheme == "https" {
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return err
}
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
} else {
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
func (n *Apache) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: tr,
Timeout: n.ResponseTimeout.Duration,
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
}
return client, nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req, err := http.NewRequest("GET", addr.String(), nil)
if err != nil {
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
@@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req.SetBasicAuth(n.Username, n.Password)
}
resp, err := client.Do(req)
resp, err := n.client.Do(req)
if err != nil {
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
}

View File

@@ -25,7 +25,6 @@ type HTTPResponse struct {
Headers map[string]string
FollowRedirects bool
ResponseStringMatch string
compiledStringMatch *regexp.Regexp
// Path to CA file
SSLCA string `toml:"ssl_ca"`
@@ -35,6 +34,9 @@ type HTTPResponse struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
compiledStringMatch *regexp.Regexp
client *http.Client
}
// Description returns the plugin Description
@@ -88,13 +90,12 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
if err != nil {
return nil, err
}
tr := &http.Transport{
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
TLSClientConfig: tlsCfg,
}
client := &http.Client{
Transport: tr,
Timeout: h.ResponseTimeout.Duration,
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: h.ResponseTimeout.Duration,
}
if h.FollowRedirects == false {
@@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
}
// HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
// Prepare fields
fields := make(map[string]interface{})
client, err := h.createHttpClient()
if err != nil {
return nil, err
}
var body io.Reader
if h.Body != "" {
body = strings.NewReader(h.Body)
@@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Start Timer
start := time.Now()
resp, err := client.Do(request)
resp, err := h.client.Do(request)
if err != nil {
if h.FollowRedirects {
return nil, err
@@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
return nil, err
}
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fields["response_time"] = time.Since(start).Seconds()
fields["http_response_code"] = resp.StatusCode
@@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Prepare data
tags := map[string]string{"server": h.Address, "method": h.Method}
var fields map[string]interface{}
if h.client == nil {
client, err := h.createHttpClient()
if err != nil {
return err
}
h.client = client
}
// Gather data
fields, err = h.HTTPGather()
fields, err = h.httpGather()
if err != nil {
return err
}

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
"Host": "Hello",
},
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
}
func TestFields(t *testing.T) {
@@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
}
func TestRedirects(t *testing.T) {
@@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{
Address: ts.URL + "/badredirect",
@@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.Error(t, err)
value, ok = acc.IntField("http_response", "http_response_code")
require.False(t, ok)
}
func TestMethod(t *testing.T) {
@@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{
Address: ts.URL + "/mustbepostmethod",
@@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
}
value, ok = acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusMethodNotAllowed, value)
//check that lowercase methods work correctly
h = &HTTPResponse{
@@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
}
value, ok = acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusMethodNotAllowed, value)
}
func TestBody(t *testing.T) {
@@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{
Address: ts.URL + "/musthaveabody",
@@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
},
FollowRedirects: true,
}
fields, err = h.HTTPGather()
acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"])
}
value, ok = acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusBadRequest, value)
}
func TestStringMatch(t *testing.T) {
@@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
}
func TestStringMatchJson(t *testing.T) {
@@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
}
func TestStringMatchFail(t *testing.T) {
@@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
},
FollowRedirects: true,
}
fields, err := h.HTTPGather()
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 0, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 0, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
}
func TestTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test with sleep in short mode.")
}
mux := setUpTestMux()
ts := httptest.NewServer(mux)
defer ts.Close()
@@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
},
FollowRedirects: true,
}
_, err := h.HTTPGather()
require.Error(t, err)
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
ok := acc.HasIntField("http_response", "http_response_code")
require.False(t, ok)
}

View File

@@ -118,11 +118,18 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns))
p.namedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns {
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}"
p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
}
if len(p.namedPatterns) == 0 {
return fmt.Errorf("pattern required")
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse

View File

@@ -117,16 +117,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
// compile log parser patterns:
var haveError bool
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
acc.AddError(err)
haveError = true
return err
}
}
if haveError {
return nil
}
l.wg.Add(1)
go l.parser()

View File

@@ -38,12 +38,8 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
}
acc := testutil.Accumulator{}
logparser.Start(&acc)
if assert.NotEmpty(t, acc.Errors) {
assert.Error(t, acc.Errors[0])
}
logparser.Stop()
err := logparser.Start(&acc)
assert.Error(t, err)
}
func TestGrokParseLogFiles(t *testing.T) {

View File

@@ -564,7 +564,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
// BEGIN code modification
if newStat.Repl.IsMaster.(bool) {
returnVal.NodeType = "PRI"
} else if newStat.Repl.Secondary.(bool) {
} else if newStat.Repl.Secondary != nil && newStat.Repl.Secondary.(bool) {
returnVal.NodeType = "SEC"
} else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) {
returnVal.NodeType = "ARB"

View File

@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"sync"
"time"
@@ -32,6 +31,8 @@ type Prometheus struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
client *http.Client
}
var sampleConfig = `
@@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHttpClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup
for _, serv := range p.Urls {
@@ -89,29 +98,30 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second),
}
func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout.Duration,
}
return client, nil
}
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url, nil)
req.Header.Add("Accept", acceptHeader)
var token []byte
var resp *http.Response
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return err
}
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
DisableKeepAlives: true,
}
if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken)
if err != nil {
@@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
req.Header.Set("Authorization", "Bearer "+string(token))
}
resp, err = rt.RoundTrip(req)
resp, err = p.client.Do(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}

View File

@@ -0,0 +1,121 @@
# RabbitMQ Input Plugin
Reads metrics from RabbitMQ servers via the [Management Plugin](https://www.rabbitmq.com/management.html).
For additional details reference the [RabbitMQ Management HTTP Stats](https://cdn.rawgit.com/rabbitmq/rabbitmq-management/master/priv/www/doc/stats.html).
### Configuration:
```toml
[[inputs.rabbitmq]]
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional request timeouts
##
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
## for a server's response headers after fully writing the request.
# header_timeout = "3s"
##
## client_timeout specifies a time limit for requests made by this client.
## Includes connection time, any redirects, and reading the response body.
# client_timeout = "4s"
## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
```
### Measurements & Fields:
- rabbitmq_overview
- channels (int, channels)
- connections (int, connections)
- consumers (int, consumers)
- exchanges (int, exchanges)
- messages (int, messages)
- messages_acked (int, messages)
- messages_delivered (int, messages)
- messages_published (int, messages)
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- rabbitmq_queue
- consumer_utilisation (float, percent)
- consumers (int, int)
- idle_since (string, time - e.g., "2006-01-02 15:04:05")
- memory (int, bytes)
- message_bytes (int, bytes)
- message_bytes_persist (int, bytes)
- message_bytes_ram (int, bytes)
- message_bytes_ready (int, bytes)
- message_bytes_unacked (int, bytes)
- messages (int, count)
- messages_ack (int, count)
- messages_ack_rate (float, messages per second)
- messages_deliver (int, count)
- messages_deliver_rate (float, messages per second)
- messages_deliver_get (int, count)
- messages_deliver_get_rate (float, messages per second)
- messages_publish (int, count)
- messages_publish_rate (float, messages per second)
- messages_ready (int, count)
- messages_redeliver (int, count)
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)
### Tags:
- All measurements have the following tags:
- url
- rabbitmq_overview
- name
- rabbitmq_node
- node
- rabbitmq_queue
- url
- queue
- vhost
- node
- durable
- auto_delete
### Sample Queries:
### Example Output:
```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
```

View File

@@ -140,8 +140,11 @@ type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
# name = "rmq-server-1" # optional tag
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
@@ -174,7 +177,7 @@ func (r *RabbitMQ) SampleConfig() string {
// Description ...
func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API"
return "Reads metrics from RabbitMQ servers via the Management Plugin"
}
// Gather ...

View File

@@ -18,47 +18,103 @@
### Measurements & Fields:
- Measurement
- uptime_in_seconds
- connected_clients
- used_memory
- used_memory_rss
- used_memory_peak
- used_memory_lua
- rdb_changes_since_last_save
- total_connections_received
- total_commands_processed
- instantaneous_ops_per_sec
- instantaneous_input_kbps
- instantaneous_output_kbps
- sync_full
- sync_partial_ok
- sync_partial_err
- expired_keys
- evicted_keys
- keyspace_hits
- keyspace_misses
- pubsub_channels
- pubsub_patterns
- latest_fork_usec
- connected_slaves
- master_repl_offset
- master_last_io_seconds_ago
- repl_backlog_active
- repl_backlog_size
- repl_backlog_histlen
- mem_fragmentation_ratio
- used_cpu_sys
- used_cpu_user
- used_cpu_sys_children
- used_cpu_user_children
The plugin gathers the results of the [INFO](https://redis.io/commands/info) redis command.
There are two separate measurements: _redis_ and _redis\_keyspace_, the latter is used for gathering database related statistics.
Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) and the elapsed time since the last rdb save (rdb\_last\_save\_time\_elapsed).
- redis
- keyspace_hitrate(float, number)
- rdb_last_save_time_elapsed(int, seconds)
**Server**
- uptime(int, seconds)
- lru_clock(int, number)
**Clients**
- clients(int, number)
- client_longest_output_list(int, number)
- client_biggest_input_buf(int, number)
- blocked_clients(int, number)
**Memory**
- used_memory(int, bytes)
- used_memory_rss(int, bytes)
- used_memory_peak(int, bytes)
- total_system_memory(int, bytes)
- used_memory_lua(int, bytes)
- maxmemory(int, bytes)
- maxmemory_policy(string)
- mem_fragmentation_ratio(float, number)
**Persistance**
- loading(int,flag)
- rdb_changes_since_last_save(int, number)
- rdb_bgsave_in_progress(int, flag)
- rdb_last_save_time(int, seconds)
- rdb_last_bgsave_status(string)
- rdb_last_bgsave_time_sec(int, seconds)
- rdb_current_bgsave_time_sec(int, seconds)
- aof_enabled(int, flag)
- aof_rewrite_in_progress(int, flag)
- aof_rewrite_scheduled(int, flag)
- aof_last_rewrite_time_sec(int, seconds)
- aof_current_rewrite_time_sec(int, seconds)
- aof_last_bgrewrite_status(string)
- aof_last_write_status(string)
**Stats**
- total_connections_received(int, number)
- total_commands_processed(int, number)
- instantaneous_ops_per_sec(int, number)
- total_net_input_bytes(int, bytes)
- total_net_output_bytes(int, bytes)
- instantaneous_input_kbps(float, bytes)
- instantaneous_output_kbps(float, bytes)
- rejected_connections(int, number)
- sync_full(int, number)
- sync_partial_ok(int, number)
- sync_partial_err(int, number)
- expired_keys(int, number)
- evicted_keys(int, number)
- keyspace_hits(int, number)
- keyspace_misses(int, number)
- pubsub_channels(int, number)
- pubsub_patterns(int, number)
- latest_fork_usec(int, microseconds)
- migrate_cached_sockets(int, number)
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)
- repl_backlog_histlen(int, bytes)
**CPU**
- used_cpu_sys(float, number)
- used_cpu_user(float, number)
- used_cpu_sys_children(float, number)
- used_cpu_user_children(float, number)
**Cluster**
- cluster_enabled(int, flag)
- redis_keyspace
- keys(int, number)
- expires(int, number)
- avg_ttl(int, number)
### Tags:
- All measurements have the following tags:
- port
- server
- replication role
- replication_role
- The redis_keyspace measurement has an additional database tag:
- database
### Example Output:
@@ -84,5 +140,10 @@ When run with:
It produces:
```
* Plugin: redis, Collection 1
> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383
> redis,server=localhost,port=6379,replication_role=master,host=host keyspace_hitrate=1,clients=2i,blocked_clients=0i,instantaneous_input_kbps=0,sync_full=0i,pubsub_channels=0i,pubsub_patterns=0i,total_net_output_bytes=6659253i,used_memory=842448i,total_system_memory=8351916032i,aof_current_rewrite_time_sec=-1i,rdb_changes_since_last_save=0i,sync_partial_err=0i,latest_fork_usec=508i,instantaneous_output_kbps=0,expired_keys=0i,used_memory_peak=843416i,aof_rewrite_in_progress=0i,aof_last_bgrewrite_status="ok",migrate_cached_sockets=0i,connected_slaves=0i,maxmemory_policy="noeviction",aof_rewrite_scheduled=0i,total_net_input_bytes=3125i,used_memory_rss=9564160i,repl_backlog_histlen=0i,rdb_last_bgsave_status="ok",aof_last_rewrite_time_sec=-1i,keyspace_misses=0i,client_biggest_input_buf=5i,used_cpu_user=1.33,maxmemory=0i,rdb_current_bgsave_time_sec=-1i,total_commands_processed=271i,repl_backlog_size=1048576i,used_cpu_sys=3,uptime=2822i,lru_clock=16706281i,used_memory_lua=37888i,rejected_connections=0i,sync_partial_ok=0i,evicted_keys=0i,rdb_last_save_time_elapsed=1922i,rdb_last_save_time=1493099368i,instantaneous_ops_per_sec=0i,used_cpu_user_children=0,client_longest_output_list=0i,master_repl_offset=0i,repl_backlog_active=0i,keyspace_hits=2i,used_cpu_sys_children=0,cluster_enabled=0i,rdb_last_bgsave_time_sec=0i,aof_last_write_status="ok",total_connections_received=263i,aof_enabled=0i,repl_backlog_first_byte_offset=0i,mem_fragmentation_ratio=11.35,loading=0i,rdb_bgsave_in_progress=0i 1493101290000000000
```
redis_keyspace:
```
> redis_keyspace,database=db1,host=host,server=localhost,port=6379,replication_role=master keys=1i,expires=0i,avg_ttl=0i 1493101350000000000
```

View File

@@ -843,7 +843,7 @@ FROM (SELECT DISTINCT DatabaseName FROM #Databases) AS bl
SET @DynamicPivotQuery = N'
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -856,7 +856,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -869,7 +869,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -883,7 +883,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -896,7 +896,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -909,7 +909,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -922,7 +922,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -935,7 +935,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -948,7 +948,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@@ -961,7 +961,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties''
, ' + @ColumnName + ', total FROM
, ' + @ColumnName + ', Total FROM
(
SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)

View File

@@ -12,6 +12,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"syscall"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@@ -195,6 +196,13 @@ func readProcFile(filename string) ([]byte, error) {
if os.IsNotExist(err) {
return nil, nil
}
// Reading from /proc/<PID> fails with ESRCH if the process has
// been terminated between open() and read().
if perr, ok := err.(*os.PathError); ok && perr.Err == syscall.ESRCH {
return nil, nil
}
return nil, err
}

View File

@@ -18,7 +18,8 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
@@ -52,7 +53,7 @@ to write to. Each URL should start with either `http://` or `udp://`
### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Retention policy to write to.
* `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb
* `password`: Password for influxdb

View File

@@ -16,7 +16,6 @@ var (
defaultRequestTimeout = time.Second * 5
)
//
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
// validate required parameters:
if len(config.URL) == 0 {

View File

@@ -25,6 +25,7 @@ type UDPConfig struct {
PayloadSize int
}
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL)
if err != nil {
@@ -55,20 +56,22 @@ type udpClient struct {
buffer []byte
}
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error {
return nil
}
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// write params are ignored by the UDP client
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// contentLength is ignored by the UDP client.
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int
for {
@@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
return totaln, nil
}
// contentLength is ignored by the UDP client.
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1)
}
// Close will terminate the provided client connection
func (c *udpClient) Close() error {
return c.conn.Close()
}

View File

@@ -15,6 +15,12 @@ import (
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
)
var (
// Quote Ident replacer.
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
)
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct {
// URL is only for backwards compatability
URL string
@@ -40,7 +46,8 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored.
Precision string
clients []client.Client
clients []client.Client
splitPayload bool
}
var sampleConfig = `
@@ -55,7 +62,8 @@ var sampleConfig = `
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
@@ -78,11 +86,10 @@ var sampleConfig = `
# insecure_skip_verify = false
`
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error {
var urls []string
for _, u := range i.URLs {
urls = append(urls, u)
}
urls = append(urls, i.URLs...)
// Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list
@@ -108,6 +115,7 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
}
i.clients = append(i.clients, c)
i.splitPayload = true
default:
// If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{
@@ -129,9 +137,11 @@ func (i *InfluxDB) Connect() error {
}
i.clients = append(i.clients, c)
err = c.Query("CREATE DATABASE " + i.Database)
err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if err != nil {
log.Println("E! Database creation failed: " + err.Error())
if !strings.Contains(err.Error(), "Status Code [403]") {
log.Println("I! Database creation failed: " + err.Error())
}
continue
}
}
@@ -141,25 +151,43 @@ func (i *InfluxDB) Connect() error {
return nil
}
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error {
return nil
}
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) SampleConfig() string {
return sampleConfig
}
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}
// Choose a random server in the cluster to write to until a successful write
func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
if !i.splitPayload {
return metrics
}
split := make([]telegraf.Metric, 0)
for _, m := range metrics {
split = append(split, m.Split(i.UDPPayload)...)
}
return split
}
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
metrics = i.split(metrics)
bufsize := 0
for _, m := range metrics {
bufsize += m.Len()
}
r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs
@@ -170,7 +198,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
// If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") {
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if errc != nil {
log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database)
}

View File

@@ -2,15 +2,55 @@ package influxdb
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIdentQuoting(t *testing.T) {
var testCases = []struct {
database string
expected string
}{
{"x-y", `CREATE DATABASE "x-y"`},
{`x"y`, `CREATE DATABASE "x\"y"`},
{"x\ny", `CREATE DATABASE "x\ny"`},
{`x\y`, `CREATE DATABASE "x\\y"`},
}
for _, tc := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
q := r.Form.Get("q")
assert.Equal(t, tc.expected, q)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: tc.database,
}
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
}
func TestUDPInflux(t *testing.T) {
i := InfluxDB{
URLs: []string{"udp://localhost:8089"},
@@ -23,6 +63,35 @@ func TestUDPInflux(t *testing.T) {
require.NoError(t, i.Close())
}
func TestBasicSplit(t *testing.T) {
c := &MockClient{}
i := InfluxDB{
clients: []client.Client{c},
UDPPayload: 50,
splitPayload: true,
}
// Input metrics:
// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
//
// Split metrics:
// test1,tag1=value1 value1=1 1257894000000000000\n
// test1,tag1=value1 value2=2 1257894000000000000\n
m, err := metric.New("test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"value1": 1.0, "value2": 2.0},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
require.NoError(t, err)
metrics := []telegraf.Metric{m}
err = i.Write(metrics)
require.Equal(t, 1, c.writeStreamCalled)
require.Equal(t, 94, c.contentLength)
require.NoError(t, err)
}
func TestHTTPInflux(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
@@ -164,3 +233,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) {
require.NoError(t, err)
require.NoError(t, i.Close())
}
type MockClient struct {
writeStreamCalled int
contentLength int
}
func (m *MockClient) Query(command string) error {
panic("not implemented")
}
func (m *MockClient) Write(b []byte) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) {
m.writeStreamCalled++
m.contentLength = contentLength
return 0, nil
}
func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) Close() error {
panic("not implemented")
}

View File

@@ -124,6 +124,16 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
return nil
}
// Close closes the connection. Noop if already closed.
func (sw *SocketWriter) Close() error {
if sw.Conn == nil {
return nil
}
err := sw.Conn.Close()
sw.Conn = nil
return err
}
func newSocketWriter() *SocketWriter {
s, _ := serializers.NewInfluxSerializer()
return &SocketWriter{

View File

@@ -143,7 +143,7 @@ func TestSocketWriter_Write_err(t *testing.T) {
// close the socket to generate an error
lconn.Close()
sw.Close()
sw.Conn.Close()
err = sw.Write(metrics)
require.Error(t, err)
assert.Nil(t, sw.Conn)

View File

@@ -499,13 +499,12 @@ def build(version=None,
logging.info("Time taken: {}s".format((end_time - start_time).total_seconds()))
return True
def generate_md5_from_file(path):
"""Generate MD5 signature based on the contents of the file at path.
def generate_sha256_from_file(path):
"""Generate SHA256 hash signature based on the contents of the file at path.
"""
m = hashlib.md5()
m = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
m.update(f.read())
return m.hexdigest()
def generate_sig_from_file(path):
@@ -636,6 +635,10 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
elif package_type not in ['zip', 'tar'] and static or "static_" in arch:
logging.info("Skipping package type '{}' for static builds.".format(package_type))
else:
if package_type == 'rpm' and release and '~' in package_version:
package_version, suffix = package_version.split('~', 1)
# The ~ indicatees that this is a prerelease so we give it a leading 0.
package_iteration = "0.%s" % suffix
fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format(
fpm_common_args,
name,
@@ -664,9 +667,6 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
if package_type == 'rpm':
# rpm's convert any dashes to underscores
package_version = package_version.replace("-", "_")
new_outfile = outfile.replace("{}-{}".format(package_version, package_iteration), package_version)
os.rename(outfile, new_outfile)
outfile = new_outfile
outfiles.append(os.path.join(os.getcwd(), outfile))
logging.debug("Produced package files: {}".format(outfiles))
return outfiles
@@ -789,9 +789,10 @@ def main(args):
if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite):
return 1
logging.info("Packages created:")
for p in packages:
logging.info("{} (MD5={})".format(p.split('/')[-1:][0],
generate_md5_from_file(p)))
for filename in packages:
logging.info("%s (SHA256=%s)",
os.path.basename(filename),
generate_sha256_from_file(filename))
if orig_branch != get_current_branch():
logging.info("Moving back to original git branch: {}".format(args.branch))
run("git checkout {}".format(orig_branch))

View File

@@ -135,7 +135,9 @@ case $1 in
fi
log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then
if command -v startproc >/dev/null; then
startproc -u "$USER" -g "$GROUP" -p "$pidfile" -q -- "$daemon" -pidfile "$pidfile" -config "$config" -config-directory "$confdir" $TELEGRAF_OPTS
elif which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
else
su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER

View File

@@ -11,7 +11,7 @@ function install_init {
}
function install_systemd {
cp -f $SCRIPT_DIR/telegraf.service /lib/systemd/system/telegraf.service
cp -f $SCRIPT_DIR/telegraf.service $1
systemctl enable telegraf || true
systemctl daemon-reload || true
}
@@ -24,12 +24,12 @@ function install_chkconfig {
chkconfig --add telegraf
}
if ! grep "^telegraf:" /etc/group &>/dev/null; then
groupadd -r telegraf
fi
if ! id telegraf &>/dev/null; then
if ! grep "^telegraf:" /etc/group &>/dev/null; then
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf
else
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
useradd -r -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
test -d $LOG_DIR || mkdir -p $LOG_DIR
@@ -56,10 +56,10 @@ if [[ ! -d /etc/telegraf/telegraf.d ]]; then
fi
# Distribution-specific logic
if [[ -f /etc/redhat-release ]]; then
if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd
install_systemd /usr/lib/systemd/system/telegraf.service
else
# Assuming SysVinit
install_init
@@ -73,10 +73,10 @@ if [[ -f /etc/redhat-release ]]; then
elif [[ -f /etc/debian_version ]]; then
# Debian/Ubuntu logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd
install_systemd /lib/systemd/system/telegraf.service
systemctl restart telegraf || echo "WARNING: systemd not running."
else
# Assuming SysVinit
# Assuming SysVinit
install_init
# Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then
@@ -89,7 +89,7 @@ elif [[ -f /etc/debian_version ]]; then
elif [[ -f /etc/os-release ]]; then
source /etc/os-release
if [[ $ID = "amzn" ]]; then
# Amazon Linux logic
# Amazon Linux logic
install_init
# Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then
@@ -97,5 +97,6 @@ elif [[ -f /etc/os-release ]]; then
else
install_chkconfig
fi
/etc/init.d/telegraf restart
fi
fi

View File

@@ -2,7 +2,7 @@
function disable_systemd {
systemctl disable telegraf
rm -f /lib/systemd/system/telegraf.service
rm -f $1
}
function disable_update_rcd {
@@ -15,14 +15,14 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf
}
if [[ -f /etc/redhat-release ]]; then
if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic
if [[ "$1" = "0" ]]; then
# InfluxDB is no longer installed, remove from init system
rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd
disable_systemd /usr/lib/systemd/system/telegraf.service
else
# Assuming sysv
disable_chkconfig
@@ -35,7 +35,7 @@ elif [[ -f /etc/debian_version ]]; then
rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd
disable_systemd /lib/systemd/system/telegraf.service
else
# Assuming sysv
# Run update-rc.d or fallback to chkconfig if not available

View File

@@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
}
return false
}
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int)
return v, ok
}
}
}
}
return 0, false
}
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(float64)
return v, ok
}
}
}
}
return 0.0, false
}
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(string)
return v, ok
}
}
}
}
return "", false
}