Compare commits
28 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f93615672b | ||
|
|
444f1ba09c | ||
|
|
5417a12ee3 | ||
|
|
f6c986b5ca | ||
|
|
7f67bf593f | ||
|
|
7ca902f987 | ||
|
|
dc69cacaa1 | ||
|
|
29671154bc | ||
|
|
4f8341670e | ||
|
|
99edca80ef | ||
|
|
44654e011c | ||
|
|
389439111b | ||
|
|
2bc5594b44 | ||
|
|
99b53c8745 | ||
|
|
27b89dff48 | ||
|
|
b16eb6eae6 | ||
|
|
feaf76913b | ||
|
|
ff704fbe0d | ||
|
|
ebef47f56a | ||
|
|
18fd2d987d | ||
|
|
5e70cb3e44 | ||
|
|
ce203dc687 | ||
|
|
b0a2e8e1bd | ||
|
|
499495f844 | ||
|
|
20ab8fb2c3 | ||
|
|
bc474d3a53 | ||
|
|
547be87d79 | ||
|
|
619d4d5d29 |
18
CHANGELOG.md
18
CHANGELOG.md
@@ -1,4 +1,16 @@
|
||||
## v1.3 [unreleased]
|
||||
## v1.3.1 [unreleased]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
|
||||
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
|
||||
- [#2815](https://github.com/influxdata/telegraf/issues/2815): Inputs processes fails with "no such process".
|
||||
- [#2851](https://github.com/influxdata/telegraf/pull/2851): Fix InfluxDB output database quoting.
|
||||
- [#2856](https://github.com/influxdata/telegraf/issues/2856): Fix net input on older Linux kernels.
|
||||
- [#2848](https://github.com/influxdata/telegraf/pull/2848): Fix panic in mongo input.
|
||||
- [#2869](https://github.com/influxdata/telegraf/pull/2869): Fix length calculation of split metric buffer.
|
||||
|
||||
## v1.3 [2017-05-15]
|
||||
|
||||
### Release Notes
|
||||
|
||||
@@ -79,6 +91,9 @@ be deprecated eventually.
|
||||
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
|
||||
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
|
||||
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
|
||||
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
|
||||
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
|
||||
- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
@@ -118,6 +133,7 @@ be deprecated eventually.
|
||||
- [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale
|
||||
- [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload.
|
||||
- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs
|
||||
- [#2706](https://github.com/influxdata/telegraf/issues/2706): Improve documentation for redis input plugin
|
||||
|
||||
## v1.2.1 [2017-02-01]
|
||||
|
||||
|
||||
2
Godeps
2
Godeps
@@ -46,7 +46,7 @@ github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
|
||||
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
|
||||
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
|
||||
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
|
||||
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062
|
||||
github.com/shirou/gopsutil 9a4a9167ad3b4355dbf1c2c7a0f5f0d3fb1e9ab9
|
||||
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
|
||||
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
|
||||
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
|
||||
|
||||
@@ -111,6 +111,7 @@ configuration options.
|
||||
* [couchbase](./plugins/inputs/couchbase)
|
||||
* [couchdb](./plugins/inputs/couchdb)
|
||||
* [disque](./plugins/inputs/disque)
|
||||
* [dmcache](./plugins/inputs/dmcache)
|
||||
* [dns query time](./plugins/inputs/dns_query)
|
||||
* [docker](./plugins/inputs/docker)
|
||||
* [dovecot](./plugins/inputs/dovecot)
|
||||
@@ -127,6 +128,7 @@ configuration options.
|
||||
* [ipmi_sensor](./plugins/inputs/ipmi_sensor)
|
||||
* [iptables](./plugins/inputs/iptables)
|
||||
* [jolokia](./plugins/inputs/jolokia)
|
||||
* [kapacitor](./plugins/inputs/kapacitor)
|
||||
* [kubernetes](./plugins/inputs/kubernetes)
|
||||
* [leofs](./plugins/inputs/leofs)
|
||||
* [lustre2](./plugins/inputs/lustre2)
|
||||
@@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
|
||||
* [github](./plugins/inputs/webhooks/github)
|
||||
* [mandrill](./plugins/inputs/webhooks/mandrill)
|
||||
* [rollbar](./plugins/inputs/webhooks/rollbar)
|
||||
* [papertrail](./plugins/inputs/webhooks/papertrail)
|
||||
|
||||
Telegraf is able to parse the following input data formats into metrics, these
|
||||
formats may be used with input plugins supporting the `data_format` option:
|
||||
|
||||
12
circle.yml
12
circle.yml
@@ -1,13 +1,11 @@
|
||||
machine:
|
||||
go:
|
||||
version: 1.8.1
|
||||
services:
|
||||
- docker
|
||||
post:
|
||||
- sudo service zookeeper stop
|
||||
- go version
|
||||
- sudo rm -rf /usr/local/go
|
||||
- wget https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz
|
||||
- sudo tar -C /usr/local -xzf go1.8.linux-amd64.tar.gz
|
||||
- go version
|
||||
- memcached
|
||||
- redis
|
||||
- rabbitmq-server
|
||||
|
||||
dependencies:
|
||||
override:
|
||||
|
||||
@@ -95,7 +95,8 @@
|
||||
## The target database for metrics (telegraf will create it if not exists).
|
||||
database = "telegraf" # required
|
||||
|
||||
## Retention policy to write to. Empty string writes to the default rp.
|
||||
## Name of existing retention policy to write to. Empty string writes to
|
||||
## the default retention policy.
|
||||
retention_policy = ""
|
||||
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
|
||||
write_consistency = "any"
|
||||
@@ -141,7 +142,7 @@
|
||||
# ## described here: https://www.rabbitmq.com/plugins.html
|
||||
# # auth_method = "PLAIN"
|
||||
# ## Telegraf tag to use as a routing key
|
||||
# ## ie, if this tag exists, it's value will be used as the routing key
|
||||
# ## ie, if this tag exists, its value will be used as the routing key
|
||||
# routing_tag = "host"
|
||||
#
|
||||
# ## InfluxDB retention policy
|
||||
@@ -335,6 +336,10 @@
|
||||
# ## Use SSL but skip chain & host verification
|
||||
# # insecure_skip_verify = false
|
||||
#
|
||||
# ## Optional SASL Config
|
||||
# # sasl_username = "kafka"
|
||||
# # sasl_password = "secret"
|
||||
#
|
||||
# ## Data format to output.
|
||||
# ## Each data format has its own unique set of configuration options, read
|
||||
# ## more about them here:
|
||||
@@ -1325,6 +1330,18 @@
|
||||
# attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
|
||||
|
||||
|
||||
# # Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints
|
||||
# [[inputs.kapacitor]]
|
||||
# ## Multiple URLs from which to read Kapacitor-formatted JSON
|
||||
# ## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
|
||||
# urls = [
|
||||
# "http://localhost:9092/kapacitor/v1/debug/vars"
|
||||
# ]
|
||||
#
|
||||
# ## Time limit for http requests
|
||||
# timeout = "5s"
|
||||
|
||||
|
||||
# # Get kernel statistics from /proc/vmstat
|
||||
# [[inputs.kernel_vmstat]]
|
||||
# # no configuration
|
||||
|
||||
@@ -29,6 +29,8 @@ type Apache struct {
|
||||
SSLKey string `toml:"ssl_key"`
|
||||
// Use SSL but skip chain & host verification
|
||||
InsecureSkipVerify bool
|
||||
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
@@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
|
||||
n.ResponseTimeout.Duration = time.Second * 5
|
||||
}
|
||||
|
||||
if n.client == nil {
|
||||
client, err := n.createHttpClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.client = client
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(n.Urls))
|
||||
for _, u := range n.Urls {
|
||||
@@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
|
||||
var tr *http.Transport
|
||||
|
||||
if addr.Scheme == "https" {
|
||||
tlsCfg, err := internal.GetTLSConfig(
|
||||
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr = &http.Transport{
|
||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
} else {
|
||||
tr = &http.Transport{
|
||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||
}
|
||||
func (n *Apache) createHttpClient() (*http.Client, error) {
|
||||
tlsCfg, err := internal.GetTLSConfig(
|
||||
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: n.ResponseTimeout.Duration,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
},
|
||||
Timeout: n.ResponseTimeout.Duration,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
req, err := http.NewRequest("GET", addr.String(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
|
||||
@@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
req.SetBasicAuth(n.Username, n.Password)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := n.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ type HTTPResponse struct {
|
||||
Headers map[string]string
|
||||
FollowRedirects bool
|
||||
ResponseStringMatch string
|
||||
compiledStringMatch *regexp.Regexp
|
||||
|
||||
// Path to CA file
|
||||
SSLCA string `toml:"ssl_ca"`
|
||||
@@ -35,6 +34,9 @@ type HTTPResponse struct {
|
||||
SSLKey string `toml:"ssl_key"`
|
||||
// Use SSL but skip chain & host verification
|
||||
InsecureSkipVerify bool
|
||||
|
||||
compiledStringMatch *regexp.Regexp
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// Description returns the plugin Description
|
||||
@@ -88,13 +90,12 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr := &http.Transport{
|
||||
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: h.ResponseTimeout.Duration,
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
TLSClientConfig: tlsCfg,
|
||||
},
|
||||
Timeout: h.ResponseTimeout.Duration,
|
||||
}
|
||||
|
||||
if h.FollowRedirects == false {
|
||||
@@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
|
||||
}
|
||||
|
||||
// HTTPGather gathers all fields and returns any errors it encounters
|
||||
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
|
||||
func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
|
||||
// Prepare fields
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
client, err := h.createHttpClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var body io.Reader
|
||||
if h.Body != "" {
|
||||
body = strings.NewReader(h.Body)
|
||||
@@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
|
||||
|
||||
// Start Timer
|
||||
start := time.Now()
|
||||
resp, err := client.Do(request)
|
||||
resp, err := h.client.Do(request)
|
||||
if err != nil {
|
||||
if h.FollowRedirects {
|
||||
return nil, err
|
||||
@@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
fields["response_time"] = time.Since(start).Seconds()
|
||||
fields["http_response_code"] = resp.StatusCode
|
||||
|
||||
@@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
|
||||
// Prepare data
|
||||
tags := map[string]string{"server": h.Address, "method": h.Method}
|
||||
var fields map[string]interface{}
|
||||
|
||||
if h.client == nil {
|
||||
client, err := h.createHttpClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.client = client
|
||||
}
|
||||
|
||||
// Gather data
|
||||
fields, err = h.HTTPGather()
|
||||
fields, err = h.httpGather()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
|
||||
"Host": "Hello",
|
||||
},
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
assert.NotNil(t, fields["response_time"])
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
}
|
||||
|
||||
func TestFields(t *testing.T) {
|
||||
@@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
assert.NotNil(t, fields["response_time"])
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
}
|
||||
|
||||
func TestRedirects(t *testing.T) {
|
||||
@@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
|
||||
h = &HTTPResponse{
|
||||
Address: ts.URL + "/badredirect",
|
||||
@@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err = h.HTTPGather()
|
||||
acc = testutil.Accumulator{}
|
||||
err = h.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
|
||||
value, ok = acc.IntField("http_response", "http_response_code")
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func TestMethod(t *testing.T) {
|
||||
@@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
|
||||
h = &HTTPResponse{
|
||||
Address: ts.URL + "/mustbepostmethod",
|
||||
@@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err = h.HTTPGather()
|
||||
acc = testutil.Accumulator{}
|
||||
err = h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
|
||||
}
|
||||
|
||||
value, ok = acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusMethodNotAllowed, value)
|
||||
|
||||
//check that lowercase methods work correctly
|
||||
h = &HTTPResponse{
|
||||
@@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err = h.HTTPGather()
|
||||
acc = testutil.Accumulator{}
|
||||
err = h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
|
||||
}
|
||||
|
||||
value, ok = acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusMethodNotAllowed, value)
|
||||
}
|
||||
|
||||
func TestBody(t *testing.T) {
|
||||
@@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
|
||||
h = &HTTPResponse{
|
||||
Address: ts.URL + "/musthaveabody",
|
||||
@@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err = h.HTTPGather()
|
||||
acc = testutil.Accumulator{}
|
||||
err = h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"])
|
||||
}
|
||||
|
||||
value, ok = acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusBadRequest, value)
|
||||
}
|
||||
|
||||
func TestStringMatch(t *testing.T) {
|
||||
@@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
assert.Equal(t, 1, fields["response_string_match"])
|
||||
assert.NotNil(t, fields["response_time"])
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
value, ok = acc.IntField("http_response", "response_string_match")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, value)
|
||||
_, ok = acc.FloatField("http_response", "response_time")
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func TestStringMatchJson(t *testing.T) {
|
||||
@@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
assert.Equal(t, 1, fields["response_string_match"])
|
||||
assert.NotNil(t, fields["response_time"])
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
value, ok = acc.IntField("http_response", "response_string_match")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, value)
|
||||
_, ok = acc.FloatField("http_response", "response_time")
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func TestStringMatchFail(t *testing.T) {
|
||||
@@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
fields, err := h.HTTPGather()
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, fields)
|
||||
if assert.NotNil(t, fields["http_response_code"]) {
|
||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
||||
}
|
||||
assert.Equal(t, 0, fields["response_string_match"])
|
||||
assert.NotNil(t, fields["response_time"])
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
value, ok := acc.IntField("http_response", "http_response_code")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, http.StatusOK, value)
|
||||
value, ok = acc.IntField("http_response", "response_string_match")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 0, value)
|
||||
_, ok = acc.FloatField("http_response", "response_time")
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func TestTimeout(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping test with sleep in short mode.")
|
||||
}
|
||||
|
||||
mux := setUpTestMux()
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
@@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
|
||||
},
|
||||
FollowRedirects: true,
|
||||
}
|
||||
_, err := h.HTTPGather()
|
||||
require.Error(t, err)
|
||||
var acc testutil.Accumulator
|
||||
err := h.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
ok := acc.HasIntField("http_response", "http_response_code")
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
@@ -118,11 +118,18 @@ func (p *Parser) Compile() error {
|
||||
|
||||
// Give Patterns fake names so that they can be treated as named
|
||||
// "custom patterns"
|
||||
p.namedPatterns = make([]string, len(p.Patterns))
|
||||
p.namedPatterns = make([]string, 0, len(p.Patterns))
|
||||
for i, pattern := range p.Patterns {
|
||||
if pattern == "" {
|
||||
continue
|
||||
}
|
||||
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
|
||||
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
|
||||
p.namedPatterns[i] = "%{" + name + "}"
|
||||
p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
|
||||
}
|
||||
|
||||
if len(p.namedPatterns) == 0 {
|
||||
return fmt.Errorf("pattern required")
|
||||
}
|
||||
|
||||
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
|
||||
|
||||
@@ -117,16 +117,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
|
||||
}
|
||||
|
||||
// compile log parser patterns:
|
||||
var haveError bool
|
||||
for _, parser := range l.parsers {
|
||||
if err := parser.Compile(); err != nil {
|
||||
acc.AddError(err)
|
||||
haveError = true
|
||||
return err
|
||||
}
|
||||
}
|
||||
if haveError {
|
||||
return nil
|
||||
}
|
||||
|
||||
l.wg.Add(1)
|
||||
go l.parser()
|
||||
|
||||
@@ -38,12 +38,8 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
|
||||
}
|
||||
|
||||
acc := testutil.Accumulator{}
|
||||
logparser.Start(&acc)
|
||||
if assert.NotEmpty(t, acc.Errors) {
|
||||
assert.Error(t, acc.Errors[0])
|
||||
}
|
||||
|
||||
logparser.Stop()
|
||||
err := logparser.Start(&acc)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGrokParseLogFiles(t *testing.T) {
|
||||
|
||||
@@ -564,7 +564,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
|
||||
// BEGIN code modification
|
||||
if newStat.Repl.IsMaster.(bool) {
|
||||
returnVal.NodeType = "PRI"
|
||||
} else if newStat.Repl.Secondary.(bool) {
|
||||
} else if newStat.Repl.Secondary != nil && newStat.Repl.Secondary.(bool) {
|
||||
returnVal.NodeType = "SEC"
|
||||
} else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) {
|
||||
returnVal.NodeType = "ARB"
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -32,6 +31,8 @@ type Prometheus struct {
|
||||
SSLKey string `toml:"ssl_key"`
|
||||
// Use SSL but skip chain & host verification
|
||||
InsecureSkipVerify bool
|
||||
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
@@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
|
||||
if p.client == nil {
|
||||
client, err := p.createHttpClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.client = client
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, serv := range p.Urls {
|
||||
@@ -89,29 +98,30 @@ var client = &http.Client{
|
||||
Timeout: time.Duration(4 * time.Second),
|
||||
}
|
||||
|
||||
func (p *Prometheus) createHttpClient() (*http.Client, error) {
|
||||
tlsCfg, err := internal.GetTLSConfig(
|
||||
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
DisableKeepAlives: true,
|
||||
},
|
||||
Timeout: p.ResponseTimeout.Duration,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
||||
var req, err = http.NewRequest("GET", url, nil)
|
||||
req.Header.Add("Accept", acceptHeader)
|
||||
var token []byte
|
||||
var resp *http.Response
|
||||
|
||||
tlsCfg, err := internal.GetTLSConfig(
|
||||
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var rt http.RoundTripper = &http.Transport{
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
TLSClientConfig: tlsCfg,
|
||||
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
|
||||
DisableKeepAlives: true,
|
||||
}
|
||||
|
||||
if p.BearerToken != "" {
|
||||
token, err = ioutil.ReadFile(p.BearerToken)
|
||||
if err != nil {
|
||||
@@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
||||
req.Header.Set("Authorization", "Bearer "+string(token))
|
||||
}
|
||||
|
||||
resp, err = rt.RoundTrip(req)
|
||||
resp, err = p.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
||||
}
|
||||
|
||||
121
plugins/inputs/rabbitmq/README.md
Normal file
121
plugins/inputs/rabbitmq/README.md
Normal file
@@ -0,0 +1,121 @@
|
||||
# RabbitMQ Input Plugin
|
||||
|
||||
Reads metrics from RabbitMQ servers via the [Management Plugin](https://www.rabbitmq.com/management.html).
|
||||
|
||||
For additional details reference the [RabbitMQ Management HTTP Stats](https://cdn.rawgit.com/rabbitmq/rabbitmq-management/master/priv/www/doc/stats.html).
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
[[inputs.rabbitmq]]
|
||||
## Management Plugin url. (default: http://localhost:15672)
|
||||
# url = "http://localhost:15672"
|
||||
## Tag added to rabbitmq_overview series; deprecated: use tags
|
||||
# name = "rmq-server-1"
|
||||
## Credentials
|
||||
# username = "guest"
|
||||
# password = "guest"
|
||||
|
||||
## Optional SSL Config
|
||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||
# ssl_key = "/etc/telegraf/key.pem"
|
||||
## Use SSL but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
|
||||
## Optional request timeouts
|
||||
##
|
||||
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
|
||||
## for a server's response headers after fully writing the request.
|
||||
# header_timeout = "3s"
|
||||
##
|
||||
## client_timeout specifies a time limit for requests made by this client.
|
||||
## Includes connection time, any redirects, and reading the response body.
|
||||
# client_timeout = "4s"
|
||||
|
||||
## A list of nodes to pull metrics about. If not specified, metrics for
|
||||
## all nodes are gathered.
|
||||
# nodes = ["rabbit@node1", "rabbit@node2"]
|
||||
```
|
||||
|
||||
### Measurements & Fields:
|
||||
|
||||
- rabbitmq_overview
|
||||
- channels (int, channels)
|
||||
- connections (int, connections)
|
||||
- consumers (int, consumers)
|
||||
- exchanges (int, exchanges)
|
||||
- messages (int, messages)
|
||||
- messages_acked (int, messages)
|
||||
- messages_delivered (int, messages)
|
||||
- messages_published (int, messages)
|
||||
- messages_ready (int, messages)
|
||||
- messages_unacked (int, messages)
|
||||
- queues (int, queues)
|
||||
|
||||
- rabbitmq_node
|
||||
- disk_free (int, bytes)
|
||||
- disk_free_limit (int, bytes)
|
||||
- fd_total (int, file descriptors)
|
||||
- fd_used (int, file descriptors)
|
||||
- mem_limit (int, bytes)
|
||||
- mem_used (int, bytes)
|
||||
- proc_total (int, erlang processes)
|
||||
- proc_used (int, erlang processes)
|
||||
- run_queue (int, erlang processes)
|
||||
- sockets_total (int, sockets)
|
||||
- sockets_used (int, sockets)
|
||||
|
||||
- rabbitmq_queue
|
||||
- consumer_utilisation (float, percent)
|
||||
- consumers (int, int)
|
||||
- idle_since (string, time - e.g., "2006-01-02 15:04:05")
|
||||
- memory (int, bytes)
|
||||
- message_bytes (int, bytes)
|
||||
- message_bytes_persist (int, bytes)
|
||||
- message_bytes_ram (int, bytes)
|
||||
- message_bytes_ready (int, bytes)
|
||||
- message_bytes_unacked (int, bytes)
|
||||
- messages (int, count)
|
||||
- messages_ack (int, count)
|
||||
- messages_ack_rate (float, messages per second)
|
||||
- messages_deliver (int, count)
|
||||
- messages_deliver_rate (float, messages per second)
|
||||
- messages_deliver_get (int, count)
|
||||
- messages_deliver_get_rate (float, messages per second)
|
||||
- messages_publish (int, count)
|
||||
- messages_publish_rate (float, messages per second)
|
||||
- messages_ready (int, count)
|
||||
- messages_redeliver (int, count)
|
||||
- messages_redeliver_rate (float, messages per second)
|
||||
- messages_unack (integer, count)
|
||||
|
||||
### Tags:
|
||||
|
||||
- All measurements have the following tags:
|
||||
- url
|
||||
|
||||
- rabbitmq_overview
|
||||
- name
|
||||
|
||||
- rabbitmq_node
|
||||
- node
|
||||
|
||||
- rabbitmq_queue
|
||||
- url
|
||||
- queue
|
||||
- vhost
|
||||
- node
|
||||
- durable
|
||||
- auto_delete
|
||||
|
||||
### Sample Queries:
|
||||
|
||||
|
||||
### Example Output:
|
||||
|
||||
```
|
||||
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
|
||||
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
|
||||
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
|
||||
```
|
||||
@@ -140,8 +140,11 @@ type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
|
||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
||||
|
||||
var sampleConfig = `
|
||||
## Management Plugin url. (default: http://localhost:15672)
|
||||
# url = "http://localhost:15672"
|
||||
# name = "rmq-server-1" # optional tag
|
||||
## Tag added to rabbitmq_overview series; deprecated: use tags
|
||||
# name = "rmq-server-1"
|
||||
## Credentials
|
||||
# username = "guest"
|
||||
# password = "guest"
|
||||
|
||||
@@ -174,7 +177,7 @@ func (r *RabbitMQ) SampleConfig() string {
|
||||
|
||||
// Description ...
|
||||
func (r *RabbitMQ) Description() string {
|
||||
return "Read metrics from one or many RabbitMQ servers via the management API"
|
||||
return "Reads metrics from RabbitMQ servers via the Management Plugin"
|
||||
}
|
||||
|
||||
// Gather ...
|
||||
|
||||
@@ -18,47 +18,103 @@
|
||||
|
||||
### Measurements & Fields:
|
||||
|
||||
- Measurement
|
||||
- uptime_in_seconds
|
||||
- connected_clients
|
||||
- used_memory
|
||||
- used_memory_rss
|
||||
- used_memory_peak
|
||||
- used_memory_lua
|
||||
- rdb_changes_since_last_save
|
||||
- total_connections_received
|
||||
- total_commands_processed
|
||||
- instantaneous_ops_per_sec
|
||||
- instantaneous_input_kbps
|
||||
- instantaneous_output_kbps
|
||||
- sync_full
|
||||
- sync_partial_ok
|
||||
- sync_partial_err
|
||||
- expired_keys
|
||||
- evicted_keys
|
||||
- keyspace_hits
|
||||
- keyspace_misses
|
||||
- pubsub_channels
|
||||
- pubsub_patterns
|
||||
- latest_fork_usec
|
||||
- connected_slaves
|
||||
- master_repl_offset
|
||||
- master_last_io_seconds_ago
|
||||
- repl_backlog_active
|
||||
- repl_backlog_size
|
||||
- repl_backlog_histlen
|
||||
- mem_fragmentation_ratio
|
||||
- used_cpu_sys
|
||||
- used_cpu_user
|
||||
- used_cpu_sys_children
|
||||
- used_cpu_user_children
|
||||
The plugin gathers the results of the [INFO](https://redis.io/commands/info) redis command.
|
||||
There are two separate measurements: _redis_ and _redis\_keyspace_, the latter is used for gathering database related statistics.
|
||||
|
||||
Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) and the elapsed time since the last rdb save (rdb\_last\_save\_time\_elapsed).
|
||||
|
||||
- redis
|
||||
- keyspace_hitrate(float, number)
|
||||
- rdb_last_save_time_elapsed(int, seconds)
|
||||
|
||||
**Server**
|
||||
- uptime(int, seconds)
|
||||
- lru_clock(int, number)
|
||||
|
||||
**Clients**
|
||||
- clients(int, number)
|
||||
- client_longest_output_list(int, number)
|
||||
- client_biggest_input_buf(int, number)
|
||||
- blocked_clients(int, number)
|
||||
|
||||
**Memory**
|
||||
- used_memory(int, bytes)
|
||||
- used_memory_rss(int, bytes)
|
||||
- used_memory_peak(int, bytes)
|
||||
- total_system_memory(int, bytes)
|
||||
- used_memory_lua(int, bytes)
|
||||
- maxmemory(int, bytes)
|
||||
- maxmemory_policy(string)
|
||||
- mem_fragmentation_ratio(float, number)
|
||||
|
||||
**Persistance**
|
||||
- loading(int,flag)
|
||||
- rdb_changes_since_last_save(int, number)
|
||||
- rdb_bgsave_in_progress(int, flag)
|
||||
- rdb_last_save_time(int, seconds)
|
||||
- rdb_last_bgsave_status(string)
|
||||
- rdb_last_bgsave_time_sec(int, seconds)
|
||||
- rdb_current_bgsave_time_sec(int, seconds)
|
||||
- aof_enabled(int, flag)
|
||||
- aof_rewrite_in_progress(int, flag)
|
||||
- aof_rewrite_scheduled(int, flag)
|
||||
- aof_last_rewrite_time_sec(int, seconds)
|
||||
- aof_current_rewrite_time_sec(int, seconds)
|
||||
- aof_last_bgrewrite_status(string)
|
||||
- aof_last_write_status(string)
|
||||
|
||||
**Stats**
|
||||
- total_connections_received(int, number)
|
||||
- total_commands_processed(int, number)
|
||||
- instantaneous_ops_per_sec(int, number)
|
||||
- total_net_input_bytes(int, bytes)
|
||||
- total_net_output_bytes(int, bytes)
|
||||
- instantaneous_input_kbps(float, bytes)
|
||||
- instantaneous_output_kbps(float, bytes)
|
||||
- rejected_connections(int, number)
|
||||
- sync_full(int, number)
|
||||
- sync_partial_ok(int, number)
|
||||
- sync_partial_err(int, number)
|
||||
- expired_keys(int, number)
|
||||
- evicted_keys(int, number)
|
||||
- keyspace_hits(int, number)
|
||||
- keyspace_misses(int, number)
|
||||
- pubsub_channels(int, number)
|
||||
- pubsub_patterns(int, number)
|
||||
- latest_fork_usec(int, microseconds)
|
||||
- migrate_cached_sockets(int, number)
|
||||
|
||||
**Replication**
|
||||
- connected_slaves(int, number)
|
||||
- master_repl_offset(int, number)
|
||||
- repl_backlog_active(int, number)
|
||||
- repl_backlog_size(int, bytes)
|
||||
- repl_backlog_first_byte_offset(int, number)
|
||||
- repl_backlog_histlen(int, bytes)
|
||||
|
||||
**CPU**
|
||||
- used_cpu_sys(float, number)
|
||||
- used_cpu_user(float, number)
|
||||
- used_cpu_sys_children(float, number)
|
||||
- used_cpu_user_children(float, number)
|
||||
|
||||
**Cluster**
|
||||
- cluster_enabled(int, flag)
|
||||
|
||||
- redis_keyspace
|
||||
- keys(int, number)
|
||||
- expires(int, number)
|
||||
- avg_ttl(int, number)
|
||||
|
||||
### Tags:
|
||||
|
||||
- All measurements have the following tags:
|
||||
- port
|
||||
- server
|
||||
- replication role
|
||||
- replication_role
|
||||
|
||||
- The redis_keyspace measurement has an additional database tag:
|
||||
- database
|
||||
|
||||
### Example Output:
|
||||
|
||||
@@ -84,5 +140,10 @@ When run with:
|
||||
It produces:
|
||||
```
|
||||
* Plugin: redis, Collection 1
|
||||
> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383
|
||||
> redis,server=localhost,port=6379,replication_role=master,host=host keyspace_hitrate=1,clients=2i,blocked_clients=0i,instantaneous_input_kbps=0,sync_full=0i,pubsub_channels=0i,pubsub_patterns=0i,total_net_output_bytes=6659253i,used_memory=842448i,total_system_memory=8351916032i,aof_current_rewrite_time_sec=-1i,rdb_changes_since_last_save=0i,sync_partial_err=0i,latest_fork_usec=508i,instantaneous_output_kbps=0,expired_keys=0i,used_memory_peak=843416i,aof_rewrite_in_progress=0i,aof_last_bgrewrite_status="ok",migrate_cached_sockets=0i,connected_slaves=0i,maxmemory_policy="noeviction",aof_rewrite_scheduled=0i,total_net_input_bytes=3125i,used_memory_rss=9564160i,repl_backlog_histlen=0i,rdb_last_bgsave_status="ok",aof_last_rewrite_time_sec=-1i,keyspace_misses=0i,client_biggest_input_buf=5i,used_cpu_user=1.33,maxmemory=0i,rdb_current_bgsave_time_sec=-1i,total_commands_processed=271i,repl_backlog_size=1048576i,used_cpu_sys=3,uptime=2822i,lru_clock=16706281i,used_memory_lua=37888i,rejected_connections=0i,sync_partial_ok=0i,evicted_keys=0i,rdb_last_save_time_elapsed=1922i,rdb_last_save_time=1493099368i,instantaneous_ops_per_sec=0i,used_cpu_user_children=0,client_longest_output_list=0i,master_repl_offset=0i,repl_backlog_active=0i,keyspace_hits=2i,used_cpu_sys_children=0,cluster_enabled=0i,rdb_last_bgsave_time_sec=0i,aof_last_write_status="ok",total_connections_received=263i,aof_enabled=0i,repl_backlog_first_byte_offset=0i,mem_fragmentation_ratio=11.35,loading=0i,rdb_bgsave_in_progress=0i 1493101290000000000
|
||||
```
|
||||
|
||||
redis_keyspace:
|
||||
```
|
||||
> redis_keyspace,database=db1,host=host,server=localhost,port=6379,replication_role=master keys=1i,expires=0i,avg_ttl=0i 1493101350000000000
|
||||
```
|
||||
|
||||
@@ -843,7 +843,7 @@ FROM (SELECT DISTINCT DatabaseName FROM #Databases) AS bl
|
||||
SET @DynamicPivotQuery = N'
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -856,7 +856,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -869,7 +869,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -883,7 +883,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -896,7 +896,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -909,7 +909,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -922,7 +922,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -935,7 +935,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -948,7 +948,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
@@ -961,7 +961,7 @@ UNION ALL
|
||||
|
||||
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
|
||||
, type = ''Database properties''
|
||||
, ' + @ColumnName + ', total FROM
|
||||
, ' + @ColumnName + ', Total FROM
|
||||
(
|
||||
SELECT Measurement, DatabaseName, Value
|
||||
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
@@ -195,6 +196,13 @@ func readProcFile(filename string) ([]byte, error) {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Reading from /proc/<PID> fails with ESRCH if the process has
|
||||
// been terminated between open() and read().
|
||||
if perr, ok := err.(*os.PathError); ok && perr.Err == syscall.ESRCH {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,8 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
|
||||
## The target database for metrics (telegraf will create it if not exists).
|
||||
database = "telegraf" # required
|
||||
|
||||
## Retention policy to write to. Empty string writes to the default rp.
|
||||
## Name of existing retention policy to write to. Empty string writes to
|
||||
## the default retention policy.
|
||||
retention_policy = ""
|
||||
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
|
||||
write_consistency = "any"
|
||||
@@ -52,7 +53,7 @@ to write to. Each URL should start with either `http://` or `udp://`
|
||||
### Optional parameters:
|
||||
|
||||
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
|
||||
* `retention_policy`: Retention policy to write to.
|
||||
* `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
|
||||
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
|
||||
* `username`: Username for influxdb
|
||||
* `password`: Password for influxdb
|
||||
|
||||
@@ -16,7 +16,6 @@ var (
|
||||
defaultRequestTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
//
|
||||
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
|
||||
// validate required parameters:
|
||||
if len(config.URL) == 0 {
|
||||
|
||||
@@ -25,6 +25,7 @@ type UDPConfig struct {
|
||||
PayloadSize int
|
||||
}
|
||||
|
||||
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
|
||||
func NewUDP(config UDPConfig) (Client, error) {
|
||||
p, err := url.Parse(config.URL)
|
||||
if err != nil {
|
||||
@@ -55,20 +56,22 @@ type udpClient struct {
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
// Query will send the provided query command to the client, returning an error if any issues arise
|
||||
func (c *udpClient) Query(command string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write will send the byte stream to the given UDP client endpoint
|
||||
func (c *udpClient) Write(b []byte) (int, error) {
|
||||
return c.WriteStream(bytes.NewReader(b), -1)
|
||||
}
|
||||
|
||||
// write params are ignored by the UDP client
|
||||
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
|
||||
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||
return c.WriteStream(bytes.NewReader(b), -1)
|
||||
}
|
||||
|
||||
// contentLength is ignored by the UDP client.
|
||||
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
|
||||
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
var totaln int
|
||||
for {
|
||||
@@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
return totaln, nil
|
||||
}
|
||||
|
||||
// contentLength is ignored by the UDP client.
|
||||
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
|
||||
// write params are ignored by the UDP client
|
||||
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
|
||||
return c.WriteStream(r, -1)
|
||||
}
|
||||
|
||||
// Close will terminate the provided client connection
|
||||
func (c *udpClient) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
@@ -15,6 +15,12 @@ import (
|
||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||
)
|
||||
|
||||
var (
|
||||
// Quote Ident replacer.
|
||||
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
|
||||
)
|
||||
|
||||
// InfluxDB struct is the primary data structure for the plugin
|
||||
type InfluxDB struct {
|
||||
// URL is only for backwards compatability
|
||||
URL string
|
||||
@@ -40,7 +46,8 @@ type InfluxDB struct {
|
||||
// Precision is only here for legacy support. It will be ignored.
|
||||
Precision string
|
||||
|
||||
clients []client.Client
|
||||
clients []client.Client
|
||||
splitPayload bool
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
@@ -55,7 +62,8 @@ var sampleConfig = `
|
||||
## The target database for metrics (telegraf will create it if not exists).
|
||||
database = "telegraf" # required
|
||||
|
||||
## Retention policy to write to. Empty string writes to the default rp.
|
||||
## Name of existing retention policy to write to. Empty string writes to
|
||||
## the default retention policy.
|
||||
retention_policy = ""
|
||||
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
|
||||
write_consistency = "any"
|
||||
@@ -78,11 +86,10 @@ var sampleConfig = `
|
||||
# insecure_skip_verify = false
|
||||
`
|
||||
|
||||
// Connect initiates the primary connection to the range of provided URLs
|
||||
func (i *InfluxDB) Connect() error {
|
||||
var urls []string
|
||||
for _, u := range i.URLs {
|
||||
urls = append(urls, u)
|
||||
}
|
||||
urls = append(urls, i.URLs...)
|
||||
|
||||
// Backward-compatability with single Influx URL config files
|
||||
// This could eventually be removed in favor of specifying the urls as a list
|
||||
@@ -108,6 +115,7 @@ func (i *InfluxDB) Connect() error {
|
||||
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
||||
}
|
||||
i.clients = append(i.clients, c)
|
||||
i.splitPayload = true
|
||||
default:
|
||||
// If URL doesn't start with "udp", assume HTTP client
|
||||
config := client.HTTPConfig{
|
||||
@@ -129,9 +137,11 @@ func (i *InfluxDB) Connect() error {
|
||||
}
|
||||
i.clients = append(i.clients, c)
|
||||
|
||||
err = c.Query("CREATE DATABASE " + i.Database)
|
||||
err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
||||
if err != nil {
|
||||
log.Println("E! Database creation failed: " + err.Error())
|
||||
if !strings.Contains(err.Error(), "Status Code [403]") {
|
||||
log.Println("I! Database creation failed: " + err.Error())
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -141,25 +151,43 @@ func (i *InfluxDB) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close will terminate the session to the backend, returning error if an issue arises
|
||||
func (i *InfluxDB) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SampleConfig returns the formatted sample configuration for the plugin
|
||||
func (i *InfluxDB) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description returns the human-readable function definition of the plugin
|
||||
func (i *InfluxDB) Description() string {
|
||||
return "Configuration for influxdb server to send metrics to"
|
||||
}
|
||||
|
||||
// Choose a random server in the cluster to write to until a successful write
|
||||
func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
|
||||
if !i.splitPayload {
|
||||
return metrics
|
||||
}
|
||||
|
||||
split := make([]telegraf.Metric, 0)
|
||||
for _, m := range metrics {
|
||||
split = append(split, m.Split(i.UDPPayload)...)
|
||||
}
|
||||
return split
|
||||
}
|
||||
|
||||
// Write will choose a random server in the cluster to write to until a successful write
|
||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||
metrics = i.split(metrics)
|
||||
|
||||
bufsize := 0
|
||||
for _, m := range metrics {
|
||||
bufsize += m.Len()
|
||||
}
|
||||
|
||||
r := metric.NewReader(metrics)
|
||||
|
||||
// This will get set to nil if a successful write occurs
|
||||
@@ -170,7 +198,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
||||
// If the database was not found, try to recreate it:
|
||||
if strings.Contains(e.Error(), "database not found") {
|
||||
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
|
||||
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
||||
if errc != nil {
|
||||
log.Printf("E! Error: Database %s not found and failed to recreate\n",
|
||||
i.Database)
|
||||
}
|
||||
|
||||
@@ -2,15 +2,55 @@ package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIdentQuoting(t *testing.T) {
|
||||
var testCases = []struct {
|
||||
database string
|
||||
expected string
|
||||
}{
|
||||
{"x-y", `CREATE DATABASE "x-y"`},
|
||||
{`x"y`, `CREATE DATABASE "x\"y"`},
|
||||
{"x\ny", `CREATE DATABASE "x\ny"`},
|
||||
{`x\y`, `CREATE DATABASE "x\\y"`},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
q := r.Form.Get("q")
|
||||
assert.Equal(t, tc.expected, q)
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
i := InfluxDB{
|
||||
URLs: []string{ts.URL},
|
||||
Database: tc.database,
|
||||
}
|
||||
|
||||
err := i.Connect()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, i.Close())
|
||||
}
|
||||
}
|
||||
|
||||
func TestUDPInflux(t *testing.T) {
|
||||
i := InfluxDB{
|
||||
URLs: []string{"udp://localhost:8089"},
|
||||
@@ -23,6 +63,35 @@ func TestUDPInflux(t *testing.T) {
|
||||
require.NoError(t, i.Close())
|
||||
}
|
||||
|
||||
func TestBasicSplit(t *testing.T) {
|
||||
c := &MockClient{}
|
||||
i := InfluxDB{
|
||||
clients: []client.Client{c},
|
||||
UDPPayload: 50,
|
||||
splitPayload: true,
|
||||
}
|
||||
|
||||
// Input metrics:
|
||||
// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
|
||||
//
|
||||
// Split metrics:
|
||||
// test1,tag1=value1 value1=1 1257894000000000000\n
|
||||
// test1,tag1=value1 value2=2 1257894000000000000\n
|
||||
m, err := metric.New("test1",
|
||||
map[string]string{"tag1": "value1"},
|
||||
map[string]interface{}{"value1": 1.0, "value2": 2.0},
|
||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := []telegraf.Metric{m}
|
||||
err = i.Write(metrics)
|
||||
require.Equal(t, 1, c.writeStreamCalled)
|
||||
require.Equal(t, 94, c.contentLength)
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestHTTPInflux(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
@@ -164,3 +233,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, i.Close())
|
||||
}
|
||||
|
||||
type MockClient struct {
|
||||
writeStreamCalled int
|
||||
contentLength int
|
||||
}
|
||||
|
||||
func (m *MockClient) Query(command string) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *MockClient) Write(b []byte) (int, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) {
|
||||
m.writeStreamCalled++
|
||||
m.contentLength = contentLength
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *MockClient) Close() error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
@@ -124,6 +124,16 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the connection. Noop if already closed.
|
||||
func (sw *SocketWriter) Close() error {
|
||||
if sw.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
err := sw.Conn.Close()
|
||||
sw.Conn = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func newSocketWriter() *SocketWriter {
|
||||
s, _ := serializers.NewInfluxSerializer()
|
||||
return &SocketWriter{
|
||||
|
||||
@@ -143,7 +143,7 @@ func TestSocketWriter_Write_err(t *testing.T) {
|
||||
|
||||
// close the socket to generate an error
|
||||
lconn.Close()
|
||||
sw.Close()
|
||||
sw.Conn.Close()
|
||||
err = sw.Write(metrics)
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, sw.Conn)
|
||||
|
||||
@@ -499,13 +499,12 @@ def build(version=None,
|
||||
logging.info("Time taken: {}s".format((end_time - start_time).total_seconds()))
|
||||
return True
|
||||
|
||||
def generate_md5_from_file(path):
|
||||
"""Generate MD5 signature based on the contents of the file at path.
|
||||
def generate_sha256_from_file(path):
|
||||
"""Generate SHA256 hash signature based on the contents of the file at path.
|
||||
"""
|
||||
m = hashlib.md5()
|
||||
m = hashlib.sha256()
|
||||
with open(path, 'rb') as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
m.update(chunk)
|
||||
m.update(f.read())
|
||||
return m.hexdigest()
|
||||
|
||||
def generate_sig_from_file(path):
|
||||
@@ -636,6 +635,10 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
|
||||
elif package_type not in ['zip', 'tar'] and static or "static_" in arch:
|
||||
logging.info("Skipping package type '{}' for static builds.".format(package_type))
|
||||
else:
|
||||
if package_type == 'rpm' and release and '~' in package_version:
|
||||
package_version, suffix = package_version.split('~', 1)
|
||||
# The ~ indicatees that this is a prerelease so we give it a leading 0.
|
||||
package_iteration = "0.%s" % suffix
|
||||
fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format(
|
||||
fpm_common_args,
|
||||
name,
|
||||
@@ -664,9 +667,6 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
|
||||
if package_type == 'rpm':
|
||||
# rpm's convert any dashes to underscores
|
||||
package_version = package_version.replace("-", "_")
|
||||
new_outfile = outfile.replace("{}-{}".format(package_version, package_iteration), package_version)
|
||||
os.rename(outfile, new_outfile)
|
||||
outfile = new_outfile
|
||||
outfiles.append(os.path.join(os.getcwd(), outfile))
|
||||
logging.debug("Produced package files: {}".format(outfiles))
|
||||
return outfiles
|
||||
@@ -789,9 +789,10 @@ def main(args):
|
||||
if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite):
|
||||
return 1
|
||||
logging.info("Packages created:")
|
||||
for p in packages:
|
||||
logging.info("{} (MD5={})".format(p.split('/')[-1:][0],
|
||||
generate_md5_from_file(p)))
|
||||
for filename in packages:
|
||||
logging.info("%s (SHA256=%s)",
|
||||
os.path.basename(filename),
|
||||
generate_sha256_from_file(filename))
|
||||
if orig_branch != get_current_branch():
|
||||
logging.info("Moving back to original git branch: {}".format(args.branch))
|
||||
run("git checkout {}".format(orig_branch))
|
||||
|
||||
@@ -135,7 +135,9 @@ case $1 in
|
||||
fi
|
||||
|
||||
log_success_msg "Starting the process" "$name"
|
||||
if which start-stop-daemon > /dev/null 2>&1; then
|
||||
if command -v startproc >/dev/null; then
|
||||
startproc -u "$USER" -g "$GROUP" -p "$pidfile" -q -- "$daemon" -pidfile "$pidfile" -config "$config" -config-directory "$confdir" $TELEGRAF_OPTS
|
||||
elif which start-stop-daemon > /dev/null 2>&1; then
|
||||
start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
|
||||
else
|
||||
su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER
|
||||
|
||||
@@ -11,7 +11,7 @@ function install_init {
|
||||
}
|
||||
|
||||
function install_systemd {
|
||||
cp -f $SCRIPT_DIR/telegraf.service /lib/systemd/system/telegraf.service
|
||||
cp -f $SCRIPT_DIR/telegraf.service $1
|
||||
systemctl enable telegraf || true
|
||||
systemctl daemon-reload || true
|
||||
}
|
||||
@@ -24,12 +24,12 @@ function install_chkconfig {
|
||||
chkconfig --add telegraf
|
||||
}
|
||||
|
||||
if ! grep "^telegraf:" /etc/group &>/dev/null; then
|
||||
groupadd -r telegraf
|
||||
fi
|
||||
|
||||
if ! id telegraf &>/dev/null; then
|
||||
if ! grep "^telegraf:" /etc/group &>/dev/null; then
|
||||
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf
|
||||
else
|
||||
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
|
||||
fi
|
||||
useradd -r -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
|
||||
fi
|
||||
|
||||
test -d $LOG_DIR || mkdir -p $LOG_DIR
|
||||
@@ -56,10 +56,10 @@ if [[ ! -d /etc/telegraf/telegraf.d ]]; then
|
||||
fi
|
||||
|
||||
# Distribution-specific logic
|
||||
if [[ -f /etc/redhat-release ]]; then
|
||||
if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
|
||||
# RHEL-variant logic
|
||||
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
|
||||
install_systemd
|
||||
install_systemd /usr/lib/systemd/system/telegraf.service
|
||||
else
|
||||
# Assuming SysVinit
|
||||
install_init
|
||||
@@ -73,10 +73,10 @@ if [[ -f /etc/redhat-release ]]; then
|
||||
elif [[ -f /etc/debian_version ]]; then
|
||||
# Debian/Ubuntu logic
|
||||
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
|
||||
install_systemd
|
||||
install_systemd /lib/systemd/system/telegraf.service
|
||||
systemctl restart telegraf || echo "WARNING: systemd not running."
|
||||
else
|
||||
# Assuming SysVinit
|
||||
# Assuming SysVinit
|
||||
install_init
|
||||
# Run update-rc.d or fallback to chkconfig if not available
|
||||
if which update-rc.d &>/dev/null; then
|
||||
@@ -89,7 +89,7 @@ elif [[ -f /etc/debian_version ]]; then
|
||||
elif [[ -f /etc/os-release ]]; then
|
||||
source /etc/os-release
|
||||
if [[ $ID = "amzn" ]]; then
|
||||
# Amazon Linux logic
|
||||
# Amazon Linux logic
|
||||
install_init
|
||||
# Run update-rc.d or fallback to chkconfig if not available
|
||||
if which update-rc.d &>/dev/null; then
|
||||
@@ -97,5 +97,6 @@ elif [[ -f /etc/os-release ]]; then
|
||||
else
|
||||
install_chkconfig
|
||||
fi
|
||||
/etc/init.d/telegraf restart
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
function disable_systemd {
|
||||
systemctl disable telegraf
|
||||
rm -f /lib/systemd/system/telegraf.service
|
||||
rm -f $1
|
||||
}
|
||||
|
||||
function disable_update_rcd {
|
||||
@@ -15,14 +15,14 @@ function disable_chkconfig {
|
||||
rm -f /etc/init.d/telegraf
|
||||
}
|
||||
|
||||
if [[ -f /etc/redhat-release ]]; then
|
||||
if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
|
||||
# RHEL-variant logic
|
||||
if [[ "$1" = "0" ]]; then
|
||||
# InfluxDB is no longer installed, remove from init system
|
||||
rm -f /etc/default/telegraf
|
||||
|
||||
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
|
||||
disable_systemd
|
||||
disable_systemd /usr/lib/systemd/system/telegraf.service
|
||||
else
|
||||
# Assuming sysv
|
||||
disable_chkconfig
|
||||
@@ -35,7 +35,7 @@ elif [[ -f /etc/debian_version ]]; then
|
||||
rm -f /etc/default/telegraf
|
||||
|
||||
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
|
||||
disable_systemd
|
||||
disable_systemd /lib/systemd/system/telegraf.service
|
||||
else
|
||||
# Assuming sysv
|
||||
# Run update-rc.d or fallback to chkconfig if not available
|
||||
|
||||
@@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
for _, p := range a.Metrics {
|
||||
if p.Measurement == measurement {
|
||||
for fieldname, value := range p.Fields {
|
||||
if fieldname == field {
|
||||
v, ok := value.(int)
|
||||
return v, ok
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
for _, p := range a.Metrics {
|
||||
if p.Measurement == measurement {
|
||||
for fieldname, value := range p.Fields {
|
||||
if fieldname == field {
|
||||
v, ok := value.(float64)
|
||||
return v, ok
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0.0, false
|
||||
}
|
||||
|
||||
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
for _, p := range a.Metrics {
|
||||
if p.Measurement == measurement {
|
||||
for fieldname, value := range p.Fields {
|
||||
if fieldname == field {
|
||||
v, ok := value.(string)
|
||||
return v, ok
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user