Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Miki
2016-02-24 11:30:32 +01:00
41 changed files with 2031 additions and 192 deletions

View File

@@ -6,6 +6,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
_ "github.com/influxdata/telegraf/plugins/inputs/dns_query"
_ "github.com/influxdata/telegraf/plugins/inputs/docker"
_ "github.com/influxdata/telegraf/plugins/inputs/dovecot"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
@@ -40,6 +41,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"

View File

@@ -0,0 +1,51 @@
# DNS Query Input Plugin
The DNS plugin gathers dns query times in miliseconds - like [Dig](https://en.wikipedia.org/wiki/Dig_\(command\))
### Configuration:
```
# Sample Config:
[[inputs.dns_query]]
## servers to query
servers = ["8.8.8.8"] # required
## Domains or subdomains to query. "." (root) is default
domains = ["."] # optional
## Query record type. Posible values: A, AAAA, ANY, CNAME, MX, NS, PTR, SOA, SPF, SRV, TXT. Default is "NS"
record_type = "A" # optional
## Dns server port. 53 is default
port = 53 # optional
## Query timeout in seconds. Default is 2 seconds
timeout = 2 # optional
```
For querying more than one record type make:
```
[[inputs.dns_query]]
domains = ["mjasion.pl"]
servers = ["8.8.8.8", "8.8.4.4"]
record_type = "A"
[[inputs.dns_query]]
domains = ["mjasion.pl"]
servers = ["8.8.8.8", "8.8.4.4"]
record_type = "MX"
```
### Tags:
- server
- domain
- record_type
### Example output:
```
./telegraf -config telegraf.conf -test -input-filter dns_query -test
> dns_query,domain=mjasion.pl,record_type=A,server=8.8.8.8 query_time_ms=67.189842 1456082743585760680
```

View File

@@ -0,0 +1,159 @@
package dns_query
import (
"errors"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/miekg/dns"
"net"
"strconv"
"time"
)
type DnsQuery struct {
// Domains or subdomains to query
Domains []string
// Server to query
Servers []string
// Record type
RecordType string `toml:"record_type"`
// DNS server port number
Port int
// Dns query timeout in seconds. 0 means no timeout
Timeout int
}
var sampleConfig = `
## servers to query
servers = ["8.8.8.8"] # required
## Domains or subdomains to query. "."(root) is default
domains = ["."] # optional
## Query record type. Posible values: A, AAAA, CNAME, MX, NS, PTR, TXT, SOA, SPF, SRV. Default is "NS"
record_type = "A" # optional
## Dns server port. 53 is default
port = 53 # optional
## Query timeout in seconds. Default is 2 seconds
timeout = 2 # optional
`
func (d *DnsQuery) SampleConfig() string {
return sampleConfig
}
func (d *DnsQuery) Description() string {
return "Query given DNS server and gives statistics"
}
func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
d.setDefaultValues()
for _, domain := range d.Domains {
for _, server := range d.Servers {
dnsQueryTime, err := d.getDnsQueryTime(domain, server)
if err != nil {
return err
}
tags := map[string]string{
"server": server,
"domain": domain,
"record_type": d.RecordType,
}
fields := map[string]interface{}{"query_time_ms": dnsQueryTime}
acc.AddFields("dns_query", fields, tags)
}
}
return nil
}
func (d *DnsQuery) setDefaultValues() {
if len(d.RecordType) == 0 {
d.RecordType = "NS"
}
if len(d.Domains) == 0 {
d.Domains = []string{"."}
d.RecordType = "NS"
}
if d.Port == 0 {
d.Port = 53
}
if d.Timeout == 0 {
d.Timeout = 2
}
}
func (d *DnsQuery) getDnsQueryTime(domain string, server string) (float64, error) {
dnsQueryTime := float64(0)
c := new(dns.Client)
c.ReadTimeout = time.Duration(d.Timeout) * time.Second
m := new(dns.Msg)
recordType, err := d.parseRecordType()
if err != nil {
return dnsQueryTime, err
}
m.SetQuestion(dns.Fqdn(domain), recordType)
m.RecursionDesired = true
r, rtt, err := c.Exchange(m, net.JoinHostPort(server, strconv.Itoa(d.Port)))
if err != nil {
return dnsQueryTime, err
}
if r.Rcode != dns.RcodeSuccess {
return dnsQueryTime, errors.New(fmt.Sprintf("Invalid answer name %s after %s query for %s\n", domain, d.RecordType, domain))
}
dnsQueryTime = float64(rtt.Nanoseconds()) / 1e6
return dnsQueryTime, nil
}
func (d *DnsQuery) parseRecordType() (uint16, error) {
var recordType uint16
var error error
switch d.RecordType {
case "A":
recordType = dns.TypeA
case "AAAA":
recordType = dns.TypeAAAA
case "ANY":
recordType = dns.TypeANY
case "CNAME":
recordType = dns.TypeCNAME
case "MX":
recordType = dns.TypeMX
case "NS":
recordType = dns.TypeNS
case "PTR":
recordType = dns.TypePTR
case "SOA":
recordType = dns.TypeSOA
case "SPF":
recordType = dns.TypeSPF
case "SRV":
recordType = dns.TypeSRV
case "TXT":
recordType = dns.TypeTXT
default:
error = errors.New(fmt.Sprintf("Record type %s not recognized", d.RecordType))
}
return recordType, error
}
func init() {
inputs.Add("dns_query", func() telegraf.Input {
return &DnsQuery{}
})
}

View File

@@ -0,0 +1,192 @@
package dns_query
import (
"github.com/influxdata/telegraf/testutil"
"github.com/miekg/dns"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
var servers = []string{"8.8.8.8"}
var domains = []string{"google.com"}
func TestGathering(t *testing.T) {
var dnsConfig = DnsQuery{
Servers: servers,
Domains: domains,
}
var acc testutil.Accumulator
err := dnsConfig.Gather(&acc)
assert.NoError(t, err)
metric, ok := acc.Get("dns_query")
assert.True(t, ok)
queryTime, _ := metric.Fields["query_time_ms"].(float64)
assert.NotEqual(t, 0, queryTime)
}
func TestGatheringMxRecord(t *testing.T) {
var dnsConfig = DnsQuery{
Servers: servers,
Domains: domains,
}
var acc testutil.Accumulator
dnsConfig.RecordType = "MX"
err := dnsConfig.Gather(&acc)
assert.NoError(t, err)
metric, ok := acc.Get("dns_query")
assert.True(t, ok)
queryTime, _ := metric.Fields["query_time_ms"].(float64)
assert.NotEqual(t, 0, queryTime)
}
func TestGatheringRootDomain(t *testing.T) {
var dnsConfig = DnsQuery{
Servers: servers,
Domains: []string{"."},
RecordType: "MX",
}
var acc testutil.Accumulator
tags := map[string]string{
"server": "8.8.8.8",
"domain": ".",
"record_type": "MX",
}
fields := map[string]interface{}{}
err := dnsConfig.Gather(&acc)
assert.NoError(t, err)
metric, ok := acc.Get("dns_query")
assert.True(t, ok)
queryTime, _ := metric.Fields["query_time_ms"].(float64)
fields["query_time_ms"] = queryTime
acc.AssertContainsTaggedFields(t, "dns_query", fields, tags)
}
func TestMetricContainsServerAndDomainAndRecordTypeTags(t *testing.T) {
var dnsConfig = DnsQuery{
Servers: servers,
Domains: domains,
}
var acc testutil.Accumulator
tags := map[string]string{
"server": "8.8.8.8",
"domain": "google.com",
"record_type": "NS",
}
fields := map[string]interface{}{}
err := dnsConfig.Gather(&acc)
assert.NoError(t, err)
metric, ok := acc.Get("dns_query")
assert.True(t, ok)
queryTime, _ := metric.Fields["query_time_ms"].(float64)
fields["query_time_ms"] = queryTime
acc.AssertContainsTaggedFields(t, "dns_query", fields, tags)
}
func TestGatheringTimeout(t *testing.T) {
var dnsConfig = DnsQuery{
Servers: servers,
Domains: domains,
}
var acc testutil.Accumulator
dnsConfig.Port = 60054
dnsConfig.Timeout = 1
var err error
channel := make(chan error, 1)
go func() {
channel <- dnsConfig.Gather(&acc)
}()
select {
case res := <-channel:
err = res
case <-time.After(time.Second * 2):
err = nil
}
assert.Error(t, err)
assert.Contains(t, err.Error(), "i/o timeout")
}
func TestSettingDefaultValues(t *testing.T) {
dnsConfig := DnsQuery{}
dnsConfig.setDefaultValues()
assert.Equal(t, []string{"."}, dnsConfig.Domains, "Default domain not equal \".\"")
assert.Equal(t, "NS", dnsConfig.RecordType, "Default record type not equal 'NS'")
assert.Equal(t, 53, dnsConfig.Port, "Default port number not equal 53")
assert.Equal(t, 2, dnsConfig.Timeout, "Default timeout not equal 2")
dnsConfig = DnsQuery{Domains: []string{"."}}
dnsConfig.setDefaultValues()
assert.Equal(t, "NS", dnsConfig.RecordType, "Default record type not equal 'NS'")
}
func TestRecordTypeParser(t *testing.T) {
var dnsConfig = DnsQuery{}
var recordType uint16
dnsConfig.RecordType = "A"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeA, recordType)
dnsConfig.RecordType = "AAAA"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeAAAA, recordType)
dnsConfig.RecordType = "ANY"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeANY, recordType)
dnsConfig.RecordType = "CNAME"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeCNAME, recordType)
dnsConfig.RecordType = "MX"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeMX, recordType)
dnsConfig.RecordType = "NS"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeNS, recordType)
dnsConfig.RecordType = "PTR"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypePTR, recordType)
dnsConfig.RecordType = "SOA"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeSOA, recordType)
dnsConfig.RecordType = "SPF"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeSPF, recordType)
dnsConfig.RecordType = "SRV"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeSRV, recordType)
dnsConfig.RecordType = "TXT"
recordType, _ = dnsConfig.parseRecordType()
assert.Equal(t, dns.TypeTXT, recordType)
}
func TestRecordTypeParserError(t *testing.T) {
var dnsConfig = DnsQuery{}
var err error
dnsConfig.RecordType = "nil"
_, err = dnsConfig.parseRecordType()
assert.Error(t, err)
}

View File

@@ -1,7 +1,6 @@
package httpjson
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
@@ -23,7 +22,8 @@ type HttpJson struct {
TagKeys []string
Parameters map[string]string
Headers map[string]string
client HTTPClient
client HTTPClient
}
type HTTPClient interface {
@@ -182,15 +182,14 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
return "", -1, fmt.Errorf("Invalid server URL \"%s\"", serverURL)
}
params := url.Values{}
data := url.Values{}
switch {
case h.Method == "GET":
requestURL.RawQuery = params.Encode()
params := requestURL.Query()
for k, v := range h.Parameters {
params.Add(k, v)
}
requestURL.RawQuery = params.Encode()
case h.Method == "POST":
requestURL.RawQuery = ""
@@ -200,7 +199,8 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
}
// Create + send request
req, err := http.NewRequest(h.Method, requestURL.String(), bytes.NewBufferString(data.Encode()))
req, err := http.NewRequest(h.Method, requestURL.String(),
strings.NewReader(data.Encode()))
if err != nil {
return "", -1, err
}

View File

@@ -1,8 +1,10 @@
package httpjson
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
@@ -27,6 +29,75 @@ const validJSON = `
"another_list": [4]
}`
const validJSON2 = `{
"user":{
"hash_rate":0,
"expected_24h_rewards":0,
"total_rewards":0.000595109232,
"paid_rewards":0,
"unpaid_rewards":0.000595109232,
"past_24h_rewards":0,
"total_work":"5172625408",
"blocks_found":0
},
"workers":{
"brminer.1":{
"hash_rate":0,
"hash_rate_24h":0,
"valid_shares":"6176",
"stale_shares":"0",
"invalid_shares":"0",
"rewards":4.5506464e-5,
"rewards_24h":0,
"reset_time":1455409950
},
"brminer.2":{
"hash_rate":0,
"hash_rate_24h":0,
"valid_shares":"0",
"stale_shares":"0",
"invalid_shares":"0",
"rewards":0,
"rewards_24h":0,
"reset_time":1455936726
},
"brminer.3":{
"hash_rate":0,
"hash_rate_24h":0,
"valid_shares":"0",
"stale_shares":"0",
"invalid_shares":"0",
"rewards":0,
"rewards_24h":0,
"reset_time":1455936733
}
},
"pool":{
"hash_rate":114100000,
"active_users":843,
"total_work":"5015346808842682368",
"pps_ratio":1.04,
"pps_rate":7.655e-9
},
"network":{
"hash_rate":1426117703,
"block_number":944895,
"time_per_block":156,
"difficulty":51825.72835216,
"next_difficulty":51916.15249019,
"retarget_time":95053
},
"market":{
"ltc_btc":0.00798,
"ltc_usd":3.37801,
"ltc_eur":3.113,
"ltc_gbp":2.32807,
"ltc_rub":241.796,
"ltc_cny":21.3883,
"btc_usd":422.852
}
}`
const validJSONTags = `
{
"value": 15,
@@ -149,6 +220,222 @@ func TestHttpJson200(t *testing.T) {
}
}
// Test that GET Parameters from the url string are applied properly
func TestHttpJsonGET_URL(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := r.FormValue("api_key")
assert.Equal(t, "mykey", key)
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, validJSON2)
}))
defer ts.Close()
a := HttpJson{
Servers: []string{ts.URL + "?api_key=mykey"},
Name: "",
Method: "GET",
client: RealHTTPClient{client: &http.Client{}},
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
// remove response_time from gathered fields because it's non-deterministic
delete(acc.Metrics[0].Fields, "response_time")
fields := map[string]interface{}{
"market_btc_usd": float64(422.852),
"market_ltc_btc": float64(0.00798),
"market_ltc_cny": float64(21.3883),
"market_ltc_eur": float64(3.113),
"market_ltc_gbp": float64(2.32807),
"market_ltc_rub": float64(241.796),
"market_ltc_usd": float64(3.37801),
"network_block_number": float64(944895),
"network_difficulty": float64(51825.72835216),
"network_hash_rate": float64(1.426117703e+09),
"network_next_difficulty": float64(51916.15249019),
"network_retarget_time": float64(95053),
"network_time_per_block": float64(156),
"pool_active_users": float64(843),
"pool_hash_rate": float64(1.141e+08),
"pool_pps_rate": float64(7.655e-09),
"pool_pps_ratio": float64(1.04),
"user_blocks_found": float64(0),
"user_expected_24h_rewards": float64(0),
"user_hash_rate": float64(0),
"user_paid_rewards": float64(0),
"user_past_24h_rewards": float64(0),
"user_total_rewards": float64(0.000595109232),
"user_unpaid_rewards": float64(0.000595109232),
"workers_brminer.1_hash_rate": float64(0),
"workers_brminer.1_hash_rate_24h": float64(0),
"workers_brminer.1_reset_time": float64(1.45540995e+09),
"workers_brminer.1_rewards": float64(4.5506464e-05),
"workers_brminer.1_rewards_24h": float64(0),
"workers_brminer.2_hash_rate": float64(0),
"workers_brminer.2_hash_rate_24h": float64(0),
"workers_brminer.2_reset_time": float64(1.455936726e+09),
"workers_brminer.2_rewards": float64(0),
"workers_brminer.2_rewards_24h": float64(0),
"workers_brminer.3_hash_rate": float64(0),
"workers_brminer.3_hash_rate_24h": float64(0),
"workers_brminer.3_reset_time": float64(1.455936733e+09),
"workers_brminer.3_rewards": float64(0),
"workers_brminer.3_rewards_24h": float64(0),
}
acc.AssertContainsFields(t, "httpjson", fields)
}
// Test that GET Parameters are applied properly
func TestHttpJsonGET(t *testing.T) {
params := map[string]string{
"api_key": "mykey",
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := r.FormValue("api_key")
assert.Equal(t, "mykey", key)
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, validJSON2)
}))
defer ts.Close()
a := HttpJson{
Servers: []string{ts.URL},
Name: "",
Method: "GET",
Parameters: params,
client: RealHTTPClient{client: &http.Client{}},
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
// remove response_time from gathered fields because it's non-deterministic
delete(acc.Metrics[0].Fields, "response_time")
fields := map[string]interface{}{
"market_btc_usd": float64(422.852),
"market_ltc_btc": float64(0.00798),
"market_ltc_cny": float64(21.3883),
"market_ltc_eur": float64(3.113),
"market_ltc_gbp": float64(2.32807),
"market_ltc_rub": float64(241.796),
"market_ltc_usd": float64(3.37801),
"network_block_number": float64(944895),
"network_difficulty": float64(51825.72835216),
"network_hash_rate": float64(1.426117703e+09),
"network_next_difficulty": float64(51916.15249019),
"network_retarget_time": float64(95053),
"network_time_per_block": float64(156),
"pool_active_users": float64(843),
"pool_hash_rate": float64(1.141e+08),
"pool_pps_rate": float64(7.655e-09),
"pool_pps_ratio": float64(1.04),
"user_blocks_found": float64(0),
"user_expected_24h_rewards": float64(0),
"user_hash_rate": float64(0),
"user_paid_rewards": float64(0),
"user_past_24h_rewards": float64(0),
"user_total_rewards": float64(0.000595109232),
"user_unpaid_rewards": float64(0.000595109232),
"workers_brminer.1_hash_rate": float64(0),
"workers_brminer.1_hash_rate_24h": float64(0),
"workers_brminer.1_reset_time": float64(1.45540995e+09),
"workers_brminer.1_rewards": float64(4.5506464e-05),
"workers_brminer.1_rewards_24h": float64(0),
"workers_brminer.2_hash_rate": float64(0),
"workers_brminer.2_hash_rate_24h": float64(0),
"workers_brminer.2_reset_time": float64(1.455936726e+09),
"workers_brminer.2_rewards": float64(0),
"workers_brminer.2_rewards_24h": float64(0),
"workers_brminer.3_hash_rate": float64(0),
"workers_brminer.3_hash_rate_24h": float64(0),
"workers_brminer.3_reset_time": float64(1.455936733e+09),
"workers_brminer.3_rewards": float64(0),
"workers_brminer.3_rewards_24h": float64(0),
}
acc.AssertContainsFields(t, "httpjson", fields)
}
// Test that POST Parameters are applied properly
func TestHttpJsonPOST(t *testing.T) {
params := map[string]string{
"api_key": "mykey",
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
assert.Equal(t, "api_key=mykey", string(body))
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, validJSON2)
}))
defer ts.Close()
a := HttpJson{
Servers: []string{ts.URL},
Name: "",
Method: "POST",
Parameters: params,
client: RealHTTPClient{client: &http.Client{}},
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
// remove response_time from gathered fields because it's non-deterministic
delete(acc.Metrics[0].Fields, "response_time")
fields := map[string]interface{}{
"market_btc_usd": float64(422.852),
"market_ltc_btc": float64(0.00798),
"market_ltc_cny": float64(21.3883),
"market_ltc_eur": float64(3.113),
"market_ltc_gbp": float64(2.32807),
"market_ltc_rub": float64(241.796),
"market_ltc_usd": float64(3.37801),
"network_block_number": float64(944895),
"network_difficulty": float64(51825.72835216),
"network_hash_rate": float64(1.426117703e+09),
"network_next_difficulty": float64(51916.15249019),
"network_retarget_time": float64(95053),
"network_time_per_block": float64(156),
"pool_active_users": float64(843),
"pool_hash_rate": float64(1.141e+08),
"pool_pps_rate": float64(7.655e-09),
"pool_pps_ratio": float64(1.04),
"user_blocks_found": float64(0),
"user_expected_24h_rewards": float64(0),
"user_hash_rate": float64(0),
"user_paid_rewards": float64(0),
"user_past_24h_rewards": float64(0),
"user_total_rewards": float64(0.000595109232),
"user_unpaid_rewards": float64(0.000595109232),
"workers_brminer.1_hash_rate": float64(0),
"workers_brminer.1_hash_rate_24h": float64(0),
"workers_brminer.1_reset_time": float64(1.45540995e+09),
"workers_brminer.1_rewards": float64(4.5506464e-05),
"workers_brminer.1_rewards_24h": float64(0),
"workers_brminer.2_hash_rate": float64(0),
"workers_brminer.2_hash_rate_24h": float64(0),
"workers_brminer.2_reset_time": float64(1.455936726e+09),
"workers_brminer.2_rewards": float64(0),
"workers_brminer.2_rewards_24h": float64(0),
"workers_brminer.3_hash_rate": float64(0),
"workers_brminer.3_hash_rate_24h": float64(0),
"workers_brminer.3_reset_time": float64(1.455936733e+09),
"workers_brminer.3_rewards": float64(0),
"workers_brminer.3_rewards_24h": float64(0),
}
acc.AssertContainsFields(t, "httpjson", fields)
}
// Test response to HTTP 500
func TestHttpJson500(t *testing.T) {
httpjson := genMockHttpJson(validJSON, 500)

View File

@@ -7,7 +7,8 @@ individual process using their /proc data.
The plugin will tag processes by their PID and their process name.
Processes can be specified either by pid file or by executable name. Procstat
Processes can be specified either by pid file, by executable name, by command
line pattern matching, or by username (in this order or priority. Procstat
plugin will use `pgrep` when executable name is provided to obtain the pid.
Proctstas plugin will transmit IO, memory, cpu, file descriptor related
measurements for every process specified. A prefix can be set to isolate

View File

@@ -19,6 +19,7 @@ type Procstat struct {
Exe string
Pattern string
Prefix string
User string
pidmap map[int32]*process.Process
}
@@ -37,6 +38,8 @@ var sampleConfig = `
# exe = "nginx"
## pattern as argument for pgrep (ie, pgrep -f <pattern>)
# pattern = "nginx"
## user as argument for pgrep (ie, pgrep -u <user>)
# user = "nginx"
## Field name prefix
prefix = ""
@@ -53,8 +56,8 @@ func (_ *Procstat) Description() string {
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
err := p.createProcesses()
if err != nil {
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s",
p.Exe, p.PidFile, p.Pattern, err.Error())
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
} else {
for _, proc := range p.pidmap {
p := NewSpecProcessor(p.Prefix, acc, proc)
@@ -103,6 +106,8 @@ func (p *Procstat) getAllPids() ([]int32, error) {
pids, err = pidsFromExe(p.Exe)
} else if p.Pattern != "" {
pids, err = pidsFromPattern(p.Pattern)
} else if p.User != "" {
pids, err = pidsFromUser(p.User)
} else {
err = fmt.Errorf("Either exe, pid_file or pattern has to be specified")
}
@@ -175,6 +180,30 @@ func pidsFromPattern(pattern string) ([]int32, error) {
return out, outerr
}
func pidsFromUser(user string) ([]int32, error) {
var out []int32
var outerr error
bin, err := exec.LookPath("pgrep")
if err != nil {
return out, fmt.Errorf("Couldn't find pgrep binary: %s", err)
}
pgrep, err := exec.Command(bin, "-u", user).Output()
if err != nil {
return out, fmt.Errorf("Failed to execute %s. Error: '%s'", bin, err)
} else {
pids := strings.Fields(string(pgrep))
for _, pid := range pids {
ipid, err := strconv.Atoi(pid)
if err == nil {
out = append(out, int32(ipid))
} else {
outerr = err
}
}
}
return out, outerr
}
func init() {
inputs.Add("procstat", func() telegraf.Input {
return NewProcstat()

View File

@@ -0,0 +1,76 @@
# Riak Plugin
The Riak plugin gathers metrics from one or more riak instances.
### Configuration:
```toml
# Description
[[inputs.riak]]
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
```
### Measurements & Fields:
Riak provides one measurement named "riak", with the following fields:
- cpu_avg1
- cpu_avg15
- cpu_avg5
- memory_code
- memory_ets
- memory_processes
- memory_system
- memory_total
- node_get_fsm_objsize_100
- node_get_fsm_objsize_95
- node_get_fsm_objsize_99
- node_get_fsm_objsize_mean
- node_get_fsm_objsize_median
- node_get_fsm_siblings_100
- node_get_fsm_siblings_95
- node_get_fsm_siblings_99
- node_get_fsm_siblings_mean
- node_get_fsm_siblings_median
- node_get_fsm_time_100
- node_get_fsm_time_95
- node_get_fsm_time_99
- node_get_fsm_time_mean
- node_get_fsm_time_median
- node_gets
- node_gets_total
- node_put_fsm_time_100
- node_put_fsm_time_95
- node_put_fsm_time_99
- node_put_fsm_time_mean
- node_put_fsm_time_median
- node_puts
- node_puts_total
- pbc_active
- pbc_connects
- pbc_connects_total
- vnode_gets
- vnode_gets_total
- vnode_index_reads
- vnode_index_reads_total
- vnode_index_writes
- vnode_index_writes_total
- vnode_puts
- vnode_puts_total
Measurements of time (such as node_get_fsm_time_mean) are measured in nanoseconds.
### Tags:
All measurements have the following tags:
- server (the host:port of the given server address, ex. `127.0.0.1:8087`)
- nodename (the internal node name received, ex. `riak@127.0.0.1`)
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter riak -test
> riak,nodename=riak@127.0.0.1,server=localhost:8098 cpu_avg1=31i,cpu_avg15=69i,cpu_avg5=51i,memory_code=11563738i,memory_ets=5925872i,memory_processes=30236069i,memory_system=93074971i,memory_total=123311040i,node_get_fsm_objsize_100=0i,node_get_fsm_objsize_95=0i,node_get_fsm_objsize_99=0i,node_get_fsm_objsize_mean=0i,node_get_fsm_objsize_median=0i,node_get_fsm_siblings_100=0i,node_get_fsm_siblings_95=0i,node_get_fsm_siblings_99=0i,node_get_fsm_siblings_mean=0i,node_get_fsm_siblings_median=0i,node_get_fsm_time_100=0i,node_get_fsm_time_95=0i,node_get_fsm_time_99=0i,node_get_fsm_time_mean=0i,node_get_fsm_time_median=0i,node_gets=0i,node_gets_total=19i,node_put_fsm_time_100=0i,node_put_fsm_time_95=0i,node_put_fsm_time_99=0i,node_put_fsm_time_mean=0i,node_put_fsm_time_median=0i,node_puts=0i,node_puts_total=0i,pbc_active=0i,pbc_connects=0i,pbc_connects_total=20i,vnode_gets=0i,vnode_gets_total=57i,vnode_index_reads=0i,vnode_index_reads_total=0i,vnode_index_writes=0i,vnode_index_writes_total=0i,vnode_puts=0i,vnode_puts_total=0i 1455913392622482332
```

196
plugins/inputs/riak/riak.go Normal file
View File

@@ -0,0 +1,196 @@
package riak
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
// Type Riak gathers statistics from one or more Riak instances
type Riak struct {
// Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098)
Servers []string
client *http.Client
}
// NewRiak return a new instance of Riak with a default http client
func NewRiak() *Riak {
return &Riak{client: http.DefaultClient}
}
// Type riakStats represents the data that is received from Riak
type riakStats struct {
CpuAvg1 int64 `json:"cpu_avg1"`
CpuAvg15 int64 `json:"cpu_avg15"`
CpuAvg5 int64 `json:"cpu_avg5"`
MemoryCode int64 `json:"memory_code"`
MemoryEts int64 `json:"memory_ets"`
MemoryProcesses int64 `json:"memory_processes"`
MemorySystem int64 `json:"memory_system"`
MemoryTotal int64 `json:"memory_total"`
NodeGetFsmObjsize100 int64 `json:"node_get_fsm_objsize_100"`
NodeGetFsmObjsize95 int64 `json:"node_get_fsm_objsize_95"`
NodeGetFsmObjsize99 int64 `json:"node_get_fsm_objsize_99"`
NodeGetFsmObjsizeMean int64 `json:"node_get_fsm_objsize_mean"`
NodeGetFsmObjsizeMedian int64 `json:"node_get_fsm_objsize_median"`
NodeGetFsmSiblings100 int64 `json:"node_get_fsm_siblings_100"`
NodeGetFsmSiblings95 int64 `json:"node_get_fsm_siblings_95"`
NodeGetFsmSiblings99 int64 `json:"node_get_fsm_siblings_99"`
NodeGetFsmSiblingsMean int64 `json:"node_get_fsm_siblings_mean"`
NodeGetFsmSiblingsMedian int64 `json:"node_get_fsm_siblings_median"`
NodeGetFsmTime100 int64 `json:"node_get_fsm_time_100"`
NodeGetFsmTime95 int64 `json:"node_get_fsm_time_95"`
NodeGetFsmTime99 int64 `json:"node_get_fsm_time_99"`
NodeGetFsmTimeMean int64 `json:"node_get_fsm_time_mean"`
NodeGetFsmTimeMedian int64 `json:"node_get_fsm_time_median"`
NodeGets int64 `json:"node_gets"`
NodeGetsTotal int64 `json:"node_gets_total"`
Nodename string `json:"nodename"`
NodePutFsmTime100 int64 `json:"node_put_fsm_time_100"`
NodePutFsmTime95 int64 `json:"node_put_fsm_time_95"`
NodePutFsmTime99 int64 `json:"node_put_fsm_time_99"`
NodePutFsmTimeMean int64 `json:"node_put_fsm_time_mean"`
NodePutFsmTimeMedian int64 `json:"node_put_fsm_time_median"`
NodePuts int64 `json:"node_puts"`
NodePutsTotal int64 `json:"node_puts_total"`
PbcActive int64 `json:"pbc_active"`
PbcConnects int64 `json:"pbc_connects"`
PbcConnectsTotal int64 `json:"pbc_connects_total"`
VnodeGets int64 `json:"vnode_gets"`
VnodeGetsTotal int64 `json:"vnode_gets_total"`
VnodeIndexReads int64 `json:"vnode_index_reads"`
VnodeIndexReadsTotal int64 `json:"vnode_index_reads_total"`
VnodeIndexWrites int64 `json:"vnode_index_writes"`
VnodeIndexWritesTotal int64 `json:"vnode_index_writes_total"`
VnodePuts int64 `json:"vnode_puts"`
VnodePutsTotal int64 `json:"vnode_puts_total"`
}
// A sample configuration to only gather stats from localhost, default port.
const sampleConfig = `
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
`
// Returns a sample configuration for the plugin
func (r *Riak) SampleConfig() string {
return sampleConfig
}
// Returns a description of the plugin
func (r *Riak) Description() string {
return "Read metrics one or many Riak servers"
}
// Reads stats from all configured servers.
func (r *Riak) Gather(acc telegraf.Accumulator) error {
// Default to a single server at localhost (default port) if none specified
if len(r.Servers) == 0 {
r.Servers = []string{"http://127.0.0.1:8098"}
}
// Range over all servers, gathering stats. Returns early in case of any error.
for _, s := range r.Servers {
if err := r.gatherServer(s, acc); err != nil {
return err
}
}
return nil
}
// Gathers stats from a single server, adding them to the accumulator
func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error {
// Parse the given URL to extract the server tag
u, err := url.Parse(s)
if err != nil {
return fmt.Errorf("riak unable to parse given server url %s: %s", s, err)
}
// Perform the GET request to the riak /stats endpoint
resp, err := r.client.Get(s + "/stats")
if err != nil {
return err
}
defer resp.Body.Close()
// Successful responses will always return status code 200
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("riak responded with unexepcted status code %d", resp.StatusCode)
}
// Decode the response JSON into a new stats struct
stats := &riakStats{}
if err := json.NewDecoder(resp.Body).Decode(stats); err != nil {
return fmt.Errorf("unable to decode riak response: %s", err)
}
// Build a map of tags
tags := map[string]string{
"nodename": stats.Nodename,
"server": u.Host,
}
// Build a map of field values
fields := map[string]interface{}{
"cpu_avg1": stats.CpuAvg1,
"cpu_avg15": stats.CpuAvg15,
"cpu_avg5": stats.CpuAvg5,
"memory_code": stats.MemoryCode,
"memory_ets": stats.MemoryEts,
"memory_processes": stats.MemoryProcesses,
"memory_system": stats.MemorySystem,
"memory_total": stats.MemoryTotal,
"node_get_fsm_objsize_100": stats.NodeGetFsmObjsize100,
"node_get_fsm_objsize_95": stats.NodeGetFsmObjsize95,
"node_get_fsm_objsize_99": stats.NodeGetFsmObjsize99,
"node_get_fsm_objsize_mean": stats.NodeGetFsmObjsizeMean,
"node_get_fsm_objsize_median": stats.NodeGetFsmObjsizeMedian,
"node_get_fsm_siblings_100": stats.NodeGetFsmSiblings100,
"node_get_fsm_siblings_95": stats.NodeGetFsmSiblings95,
"node_get_fsm_siblings_99": stats.NodeGetFsmSiblings99,
"node_get_fsm_siblings_mean": stats.NodeGetFsmSiblingsMean,
"node_get_fsm_siblings_median": stats.NodeGetFsmSiblingsMedian,
"node_get_fsm_time_100": stats.NodeGetFsmTime100,
"node_get_fsm_time_95": stats.NodeGetFsmTime95,
"node_get_fsm_time_99": stats.NodeGetFsmTime99,
"node_get_fsm_time_mean": stats.NodeGetFsmTimeMean,
"node_get_fsm_time_median": stats.NodeGetFsmTimeMedian,
"node_gets": stats.NodeGets,
"node_gets_total": stats.NodeGetsTotal,
"node_put_fsm_time_100": stats.NodePutFsmTime100,
"node_put_fsm_time_95": stats.NodePutFsmTime95,
"node_put_fsm_time_99": stats.NodePutFsmTime99,
"node_put_fsm_time_mean": stats.NodePutFsmTimeMean,
"node_put_fsm_time_median": stats.NodePutFsmTimeMedian,
"node_puts": stats.NodePuts,
"node_puts_total": stats.NodePutsTotal,
"pbc_active": stats.PbcActive,
"pbc_connects": stats.PbcConnects,
"pbc_connects_total": stats.PbcConnectsTotal,
"vnode_gets": stats.VnodeGets,
"vnode_gets_total": stats.VnodeGetsTotal,
"vnode_index_reads": stats.VnodeIndexReads,
"vnode_index_reads_total": stats.VnodeIndexReadsTotal,
"vnode_index_writes": stats.VnodeIndexWrites,
"vnode_index_writes_total": stats.VnodeIndexWritesTotal,
"vnode_puts": stats.VnodePuts,
"vnode_puts_total": stats.VnodePutsTotal,
}
// Accumulate the tags and values
acc.AddFields("riak", fields, tags)
return nil
}
func init() {
inputs.Add("riak", func() telegraf.Input {
return NewRiak()
})
}

View File

@@ -0,0 +1,275 @@
package riak
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestRiak(t *testing.T) {
// Create a test server with the const response JSON
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
}))
defer ts.Close()
// Parse the URL of the test server, used to verify the expected host
u, err := url.Parse(ts.URL)
require.NoError(t, err)
// Create a new Riak instance with our given test server
riak := NewRiak()
riak.Servers = []string{ts.URL}
// Create a test accumulator
acc := &testutil.Accumulator{}
// Gather data from the test server
err = riak.Gather(acc)
require.NoError(t, err)
// Expect the correct values for all known keys
expectFields := map[string]interface{}{
"cpu_avg1": int64(504),
"cpu_avg15": int64(294),
"cpu_avg5": int64(325),
"memory_code": int64(12329143),
"memory_ets": int64(17330176),
"memory_processes": int64(58454730),
"memory_system": int64(120401678),
"memory_total": int64(178856408),
"node_get_fsm_objsize_100": int64(73596),
"node_get_fsm_objsize_95": int64(36663),
"node_get_fsm_objsize_99": int64(51552),
"node_get_fsm_objsize_mean": int64(13241),
"node_get_fsm_objsize_median": int64(10365),
"node_get_fsm_siblings_100": int64(1),
"node_get_fsm_siblings_95": int64(1),
"node_get_fsm_siblings_99": int64(1),
"node_get_fsm_siblings_mean": int64(1),
"node_get_fsm_siblings_median": int64(1),
"node_get_fsm_time_100": int64(230445),
"node_get_fsm_time_95": int64(24259),
"node_get_fsm_time_99": int64(96653),
"node_get_fsm_time_mean": int64(6851),
"node_get_fsm_time_median": int64(2368),
"node_gets": int64(1116),
"node_gets_total": int64(1026058217),
"node_put_fsm_time_100": int64(267390),
"node_put_fsm_time_95": int64(38286),
"node_put_fsm_time_99": int64(84422),
"node_put_fsm_time_mean": int64(10832),
"node_put_fsm_time_median": int64(4085),
"node_puts": int64(1155),
"node_puts_total": int64(444895769),
"pbc_active": int64(360),
"pbc_connects": int64(120),
"pbc_connects_total": int64(66793268),
"vnode_gets": int64(14629),
"vnode_gets_total": int64(3748432761),
"vnode_index_reads": int64(20),
"vnode_index_reads_total": int64(3438296),
"vnode_index_writes": int64(4293),
"vnode_index_writes_total": int64(1515986619),
"vnode_puts": int64(4308),
"vnode_puts_total": int64(1519062272),
}
// Expect the correct values for all tags
expectTags := map[string]string{
"nodename": "riak@127.0.0.1",
"server": u.Host,
}
acc.AssertContainsTaggedFields(t, "riak", expectFields, expectTags)
}
var response = `
{
"riak_kv_stat_ts": 1455908558,
"vnode_gets": 14629,
"vnode_gets_total": 3748432761,
"vnode_puts": 4308,
"vnode_puts_total": 1519062272,
"vnode_index_refreshes": 0,
"vnode_index_refreshes_total": 0,
"vnode_index_reads": 20,
"vnode_index_reads_total": 3438296,
"vnode_index_writes": 4293,
"vnode_index_writes_total": 1515986619,
"vnode_index_writes_postings": 1,
"vnode_index_writes_postings_total": 265613,
"vnode_index_deletes": 0,
"vnode_index_deletes_total": 0,
"vnode_index_deletes_postings": 0,
"vnode_index_deletes_postings_total": 1,
"node_gets": 1116,
"node_gets_total": 1026058217,
"node_get_fsm_siblings_mean": 1,
"node_get_fsm_siblings_median": 1,
"node_get_fsm_siblings_95": 1,
"node_get_fsm_siblings_99": 1,
"node_get_fsm_siblings_100": 1,
"node_get_fsm_objsize_mean": 13241,
"node_get_fsm_objsize_median": 10365,
"node_get_fsm_objsize_95": 36663,
"node_get_fsm_objsize_99": 51552,
"node_get_fsm_objsize_100": 73596,
"node_get_fsm_time_mean": 6851,
"node_get_fsm_time_median": 2368,
"node_get_fsm_time_95": 24259,
"node_get_fsm_time_99": 96653,
"node_get_fsm_time_100": 230445,
"node_puts": 1155,
"node_puts_total": 444895769,
"node_put_fsm_time_mean": 10832,
"node_put_fsm_time_median": 4085,
"node_put_fsm_time_95": 38286,
"node_put_fsm_time_99": 84422,
"node_put_fsm_time_100": 267390,
"read_repairs": 2,
"read_repairs_total": 7918375,
"coord_redirs_total": 118238575,
"executing_mappers": 0,
"precommit_fail": 0,
"postcommit_fail": 0,
"index_fsm_create": 0,
"index_fsm_create_error": 0,
"index_fsm_active": 0,
"list_fsm_create": 0,
"list_fsm_create_error": 0,
"list_fsm_active": 0,
"pbc_active": 360,
"pbc_connects": 120,
"pbc_connects_total": 66793268,
"late_put_fsm_coordinator_ack": 152,
"node_get_fsm_active": 1,
"node_get_fsm_active_60s": 1029,
"node_get_fsm_in_rate": 21,
"node_get_fsm_out_rate": 21,
"node_get_fsm_rejected": 0,
"node_get_fsm_rejected_60s": 0,
"node_get_fsm_rejected_total": 0,
"node_put_fsm_active": 69,
"node_put_fsm_active_60s": 1053,
"node_put_fsm_in_rate": 30,
"node_put_fsm_out_rate": 31,
"node_put_fsm_rejected": 0,
"node_put_fsm_rejected_60s": 0,
"node_put_fsm_rejected_total": 0,
"read_repairs_primary_outofdate_one": 4,
"read_repairs_primary_outofdate_count": 14761552,
"read_repairs_primary_notfound_one": 0,
"read_repairs_primary_notfound_count": 65879,
"read_repairs_fallback_outofdate_one": 0,
"read_repairs_fallback_outofdate_count": 23761,
"read_repairs_fallback_notfound_one": 0,
"read_repairs_fallback_notfound_count": 455697,
"leveldb_read_block_error": 0,
"riak_pipe_stat_ts": 1455908558,
"pipeline_active": 0,
"pipeline_create_count": 0,
"pipeline_create_one": 0,
"pipeline_create_error_count": 0,
"pipeline_create_error_one": 0,
"cpu_nprocs": 362,
"cpu_avg1": 504,
"cpu_avg5": 325,
"cpu_avg15": 294,
"mem_total": 33695432704,
"mem_allocated": 33454874624,
"nodename": "riak@127.0.0.1",
"connected_nodes": [],
"sys_driver_version": "2.0",
"sys_global_heaps_size": 0,
"sys_heap_type": "private",
"sys_logical_processors": 8,
"sys_otp_release": "R15B01",
"sys_process_count": 2201,
"sys_smp_support": true,
"sys_system_version": "Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:8:8] [async-threads:64] [kernel-poll:true]",
"sys_system_architecture": "x86_64-unknown-linux-gnu",
"sys_threads_enabled": true,
"sys_thread_pool_size": 64,
"sys_wordsize": 8,
"ring_members": [
"riak@127.0.0.1"
],
"ring_num_partitions": 256,
"ring_ownership": "[{'riak@127.0.0.1',256}]",
"ring_creation_size": 256,
"storage_backend": "riak_kv_eleveldb_backend",
"erlydtl_version": "0.7.0",
"riak_control_version": "1.4.12-0-g964c5db",
"cluster_info_version": "1.2.4",
"riak_search_version": "1.4.12-0-g7fe0e00",
"merge_index_version": "1.3.2-0-gcb38ee7",
"riak_kv_version": "1.4.12-0-gc6bbd66",
"sidejob_version": "0.2.0",
"riak_api_version": "1.4.12-0-gd9e1cc8",
"riak_pipe_version": "1.4.12-0-g986a226",
"riak_core_version": "1.4.10",
"bitcask_version": "1.6.8-0-gea14cb0",
"basho_stats_version": "1.0.3",
"webmachine_version": "1.10.4-0-gfcff795",
"mochiweb_version": "1.5.1p6",
"inets_version": "5.9",
"erlang_js_version": "1.2.2",
"runtime_tools_version": "1.8.8",
"os_mon_version": "2.2.9",
"riak_sysmon_version": "1.1.3",
"ssl_version": "5.0.1",
"public_key_version": "0.15",
"crypto_version": "2.1",
"sasl_version": "2.2.1",
"lager_version": "2.0.1",
"goldrush_version": "0.1.5",
"compiler_version": "4.8.1",
"syntax_tools_version": "1.6.8",
"stdlib_version": "1.18.1",
"kernel_version": "2.15.1",
"memory_total": 178856408,
"memory_processes": 58454730,
"memory_processes_used": 58371238,
"memory_system": 120401678,
"memory_atom": 586345,
"memory_atom_used": 563485,
"memory_binary": 48677920,
"memory_code": 12329143,
"memory_ets": 17330176,
"riak_core_stat_ts": 1455908559,
"ignored_gossip_total": 0,
"rings_reconciled_total": 5459,
"rings_reconciled": 0,
"gossip_received": 6,
"rejected_handoffs": 94,
"handoff_timeouts": 0,
"dropped_vnode_requests_total": 0,
"converge_delay_min": 0,
"converge_delay_max": 0,
"converge_delay_mean": 0,
"converge_delay_last": 0,
"rebalance_delay_min": 0,
"rebalance_delay_max": 0,
"rebalance_delay_mean": 0,
"rebalance_delay_last": 0,
"riak_kv_vnodes_running": 16,
"riak_kv_vnodeq_min": 0,
"riak_kv_vnodeq_median": 0,
"riak_kv_vnodeq_mean": 0,
"riak_kv_vnodeq_max": 0,
"riak_kv_vnodeq_total": 0,
"riak_pipe_vnodes_running": 16,
"riak_pipe_vnodeq_min": 0,
"riak_pipe_vnodeq_median": 0,
"riak_pipe_vnodeq_mean": 0,
"riak_pipe_vnodeq_max": 0,
"riak_pipe_vnodeq_total": 0
}
`

View File

@@ -69,6 +69,9 @@ func TestSNMPErrorBulk(t *testing.T) {
}
func TestSNMPGet1(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{
Name: "oid1",
Unit: "octets",
@@ -104,6 +107,9 @@ func TestSNMPGet1(t *testing.T) {
}
func TestSNMPGet2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{
Name: "oid1",
Oid: "ifNumber",
@@ -139,6 +145,9 @@ func TestSNMPGet2(t *testing.T) {
}
func TestSNMPGet3(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{
Name: "oid1",
Unit: "octets",
@@ -177,6 +186,9 @@ func TestSNMPGet3(t *testing.T) {
}
func TestSNMPEasyGet4(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{
Name: "oid1",
Unit: "octets",
@@ -227,6 +239,9 @@ func TestSNMPEasyGet4(t *testing.T) {
}
func TestSNMPEasyGet5(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{
Name: "oid1",
Unit: "octets",
@@ -277,6 +292,9 @@ func TestSNMPEasyGet5(t *testing.T) {
}
func TestSNMPEasyGet6(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
h := Host{
Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf",
@@ -307,6 +325,9 @@ func TestSNMPEasyGet6(t *testing.T) {
}
func TestSNMPBulk1(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
bulk1 := Data{
Name: "oid1",
Unit: "octets",

View File

@@ -17,7 +17,11 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)
const UDP_PACKET_SIZE int = 1500
const (
UDP_PACKET_SIZE int = 1500
defaultFieldName = "value"
)
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
@@ -113,9 +117,9 @@ type cachedcounter struct {
}
type cachedtimings struct {
name string
stats RunningStats
tags map[string]string
name string
fields map[string]RunningStats
tags map[string]string
}
func (_ *Statsd) Description() string {
@@ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
now := time.Now()
for _, metric := range s.timings {
// Defining a template to parse field names for timers allows us to split
// out multiple fields per timer. In this case we prefix each stat with the
// field name and store these all in a single measurement.
fields := make(map[string]interface{})
fields["mean"] = metric.stats.Mean()
fields["stddev"] = metric.stats.Stddev()
fields["upper"] = metric.stats.Upper()
fields["lower"] = metric.stats.Lower()
fields["count"] = metric.stats.Count()
for _, percentile := range s.Percentiles {
name := fmt.Sprintf("%v_percentile", percentile)
fields[name] = metric.stats.Percentile(percentile)
for fieldName, stats := range metric.fields {
var prefix string
if fieldName != defaultFieldName {
prefix = fieldName + "_"
}
fields[prefix+"mean"] = stats.Mean()
fields[prefix+"stddev"] = stats.Stddev()
fields[prefix+"upper"] = stats.Upper()
fields[prefix+"lower"] = stats.Lower()
fields[prefix+"count"] = stats.Count()
for _, percentile := range s.Percentiles {
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
fields[name] = stats.Percentile(percentile)
}
}
acc.AddFields(metric.name, fields, metric.tags, now)
}
if s.DeleteTimings {
@@ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Parse the name & tags from bucket
m.name, m.field, m.tags = s.parseName(m.bucket)
// fields are not supported for timings, so if specified combine into
// the name
if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" {
m.name += "_" + m.field
}
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
@@ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
name = strings.Replace(name, "-", "__", -1)
}
if field == "" {
field = "value"
field = defaultFieldName
}
return name, field, tags
@@ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) {
func (s *Statsd) aggregate(m metric) {
switch m.mtype {
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
tags: m.tags,
stats: RunningStats{
PercLimit: s.PercentileLimit,
},
name: m.name,
fields: make(map[string]RunningStats),
tags: m.tags,
}
}
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
}
}
if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
cached.stats.AddValue(m.floatvalue)
field.AddValue(m.floatvalue)
}
s.timings[m.hash] = cached
} else {
cached.stats.AddValue(m.floatvalue)
s.timings[m.hash] = cached
field.AddValue(m.floatvalue)
}
cached.fields[m.field] = field
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
_, ok := s.counters[m.hash]

View File

@@ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
// A 0 with invalid samplerate will add a single 0,
// plus the last bit of value 1
// which adds up to 12 individual datapoints to be cached
if cachedtiming.stats.n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
if cachedtiming.fields[defaultFieldName].n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n)
}
if cachedtiming.stats.upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
if cachedtiming.fields[defaultFieldName].upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper)
}
}
@@ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) {
}
acc.AssertContainsFields(t, "test_timing", valid)
}
// Tests low-level functionality of timings when multiple fields is enabled
// and a measurement template has been defined which can parse field names
func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) {
s := NewStatsd()
s.Templates = []string{"measurement.field"}
s.Percentiles = []int{90}
acc := &testutil.Accumulator{}
validLines := []string{
"test_timing.success:1|ms",
"test_timing.success:11|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.error:2|ms",
"test_timing.error:22|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
}
for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)
valid := map[string]interface{}{
"success_90_percentile": float64(11),
"success_count": int64(5),
"success_lower": float64(1),
"success_mean": float64(3),
"success_stddev": float64(4),
"success_upper": float64(11),
"error_90_percentile": float64(22),
"error_count": int64(5),
"error_lower": float64(2),
"error_mean": float64(6),
"error_stddev": float64(8),
"error_upper": float64(22),
}
acc.AssertContainsFields(t, "test_timing", valid)
}
// Tests low-level functionality of timings when multiple fields is enabled
// but a measurement template hasn't been defined so we can't parse field names
// In this case the behaviour should be the same as normal behaviour
func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) {
s := NewStatsd()
s.Templates = []string{}
s.Percentiles = []int{90}
acc := &testutil.Accumulator{}
validLines := []string{
"test_timing.success:1|ms",
"test_timing.success:11|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.error:2|ms",
"test_timing.error:22|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
}
for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)
expectedSuccess := map[string]interface{}{
"90_percentile": float64(11),
"count": int64(5),
"lower": float64(1),
"mean": float64(3),
"stddev": float64(4),
"upper": float64(11),
}
expectedError := map[string]interface{}{
"90_percentile": float64(22),
"count": int64(5),
"lower": float64(2),
"mean": float64(6),
"stddev": float64(8),
"upper": float64(22),
}
acc.AssertContainsFields(t, "test_timing_success", expectedSuccess)
acc.AssertContainsFields(t, "test_timing_error", expectedError)
}
func TestParse_Timings_Delete(t *testing.T) {

View File

@@ -33,7 +33,7 @@ var sampleConfig = `
## Whether to report total system cpu stats or not
totalcpu = true
## Comment this line if you want the raw CPU time metrics
drop = ["time_*"]
fielddrop = ["time_*"]
`
func (_ *CPUStats) SampleConfig() string {
@@ -113,6 +113,10 @@ func totalCpuTime(t cpu.CPUTimesStat) float64 {
func init() {
inputs.Add("cpu", func() telegraf.Input {
return &CPUStats{ps: &systemPS{}}
return &CPUStats{
PerCPU: true,
TotalCPU: true,
ps: &systemPS{},
}
})
}

View File

@@ -14,6 +14,7 @@ type DiskStats struct {
Mountpoints []string
MountPoints []string
IgnoreFS []string `toml:"ignore_fs"`
}
func (_ *DiskStats) Description() string {
@@ -24,6 +25,10 @@ var diskSampleConfig = `
## By default, telegraf gather stats for all mountpoints.
## Setting mountpoints will restrict the stats to the specified mountpoints.
# mount_points = ["/"]
## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually
## present on /run, /var/run, /dev/shm or /dev).
ignore_fs = ["tmpfs", "devtmpfs"]
`
func (_ *DiskStats) SampleConfig() string {
@@ -36,12 +41,16 @@ func (s *DiskStats) Gather(acc telegraf.Accumulator) error {
s.MountPoints = s.Mountpoints
}
disks, err := s.ps.DiskUsage(s.MountPoints)
disks, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS)
if err != nil {
return fmt.Errorf("error getting disk usage info: %s", err)
}
for _, du := range disks {
if du.Total == 0 {
// Skip dummy filesystem (procfs, cgroupfs, ...)
continue
}
tags := map[string]string{
"path": du.Path,
"fstype": du.Fstype,
@@ -79,11 +88,11 @@ func (_ *DiskIOStats) Description() string {
}
var diskIoSampleConfig = `
# By default, telegraf will gather stats for all devices including
# disk partitions.
# Setting devices will restrict the stats to the specified devices.
## By default, telegraf will gather stats for all devices including
## disk partitions.
## Setting devices will restrict the stats to the specified devices.
# devices = ["sda", "sdb"]
# Uncomment the following line if you do not need disk serial numbers.
## Uncomment the following line if you do not need disk serial numbers.
# skip_serial_number = true
`

View File

@@ -50,9 +50,9 @@ func TestDiskStats(t *testing.T) {
},
}
mps.On("DiskUsage", []string(nil)).Return(duAll, nil)
mps.On("DiskUsage", []string{"/", "/dev"}).Return(duFiltered, nil)
mps.On("DiskUsage", []string{"/", "/home"}).Return(duAll, nil)
mps.On("DiskUsage", []string(nil), []string(nil)).Return(duAll, nil)
mps.On("DiskUsage", []string{"/", "/dev"}, []string(nil)).Return(duFiltered, nil)
mps.On("DiskUsage", []string{"/", "/home"}, []string(nil)).Return(duAll, nil)
err = (&DiskStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)

View File

@@ -33,8 +33,8 @@ func (m *MockPS) CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error) {
return r0, r1
}
func (m *MockPS) DiskUsage(mountPointFilter []string) ([]*disk.DiskUsageStat, error) {
ret := m.Called(mountPointFilter)
func (m *MockPS) DiskUsage(mountPointFilter []string, fstypeExclude []string) ([]*disk.DiskUsageStat, error) {
ret := m.Called(mountPointFilter, fstypeExclude)
r0 := ret.Get(0).([]*disk.DiskUsageStat)
r1 := ret.Error(1)

View File

@@ -14,7 +14,7 @@ import (
type PS interface {
CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error)
DiskUsage(mountPointFilter []string) ([]*disk.DiskUsageStat, error)
DiskUsage(mountPointFilter []string, fstypeExclude []string) ([]*disk.DiskUsageStat, error)
NetIO() ([]net.NetIOCountersStat, error)
NetProto() ([]net.NetProtoCountersStat, error)
DiskIO() (map[string]disk.DiskIOCountersStat, error)
@@ -53,6 +53,7 @@ func (s *systemPS) CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error) {
func (s *systemPS) DiskUsage(
mountPointFilter []string,
fstypeExclude []string,
) ([]*disk.DiskUsageStat, error) {
parts, err := disk.DiskPartitions(true)
if err != nil {
@@ -60,9 +61,13 @@ func (s *systemPS) DiskUsage(
}
// Make a "set" out of the filter slice
filterSet := make(map[string]bool)
mountPointFilterSet := make(map[string]bool)
for _, filter := range mountPointFilter {
filterSet[filter] = true
mountPointFilterSet[filter] = true
}
fstypeExcludeSet := make(map[string]bool)
for _, filter := range fstypeExclude {
fstypeExcludeSet[filter] = true
}
var usage []*disk.DiskUsageStat
@@ -71,7 +76,7 @@ func (s *systemPS) DiskUsage(
if len(mountPointFilter) > 0 {
// If the mount point is not a member of the filter set,
// don't gather info on it.
_, ok := filterSet[p.Mountpoint]
_, ok := mountPointFilterSet[p.Mountpoint]
if !ok {
continue
}
@@ -81,6 +86,12 @@ func (s *systemPS) DiskUsage(
if err != nil {
return nil, err
}
// If the mount point is a member of the exclude set,
// don't gather info on it.
_, ok := fstypeExcludeSet[p.Fstype]
if ok {
continue
}
du.Fstype = p.Fstype
usage = append(usage, du)
}

View File

@@ -36,6 +36,11 @@ func (f *File) SetSerializer(serializer serializers.Serializer) {
func (f *File) Connect() error {
writers := []io.Writer{}
if len(f.Files) == 0 {
f.Files = []string{"stdout"}
}
for _, file := range f.Files {
if file == "stdout" {
writers = append(writers, os.Stdout)

View File

@@ -52,9 +52,9 @@ var sampleConfig = `
## note: using "s" precision greatly improves InfluxDB compression
precision = "s"
## Connection timeout (for the connection with InfluxDB), formatted as a string.
## If not provided, will default to 0 (no timeout)
# timeout = "5s"
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
@@ -185,6 +185,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
func init() {
outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{}
return &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}