Merge remote-tracking branch 'origin/master' into pagerduty

This commit is contained in:
Ranjib Dey 2016-06-03 15:25:56 -07:00
commit a0b096ba05
56 changed files with 3231 additions and 657 deletions

4
.gitattributes vendored
View File

@ -1,2 +1,4 @@
CHANGELOG.md merge=union
README.md merge=union
plugins/inputs/all/all.go merge=union
plugins/outputs/all/all.go merge=union

View File

@ -2,6 +2,10 @@
### Release Notes
- `flush_jitter` behavior has been changed. The random jitter will now be
evaluated at every flush interval, rather than once at startup. This makes it
consistent with the behavior of `collection_jitter`.
- All AWS plugins now utilize a standard mechanism for evaluating credentials.
This allows all AWS plugins to support environment variables, shared credential
files & profiles, and role assumptions. See the specific plugin README for
@ -18,7 +22,10 @@ in conjunction with wildcard dimension values as it will control the amount of
time before a new metric is included by the plugin.
### Features
- [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging.
- [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash
- [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy!
- [#1165](https://github.com/influxdata/telegraf/pull/1165): vmstat input plugin. Thanks @jshim-xm!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar input plugin. Thanks @francois2metz and @cduez!
- [#1208](https://github.com/influxdata/telegraf/pull/1208): Standardized AWS credentials evaluation & wildcard CloudWatch dimensions. Thanks @johnrengelman!
- [#1264](https://github.com/influxdata/telegraf/pull/1264): Add SSL config options to http_response plugin.
@ -27,6 +34,10 @@ time before a new metric is included by the plugin.
- [#1275](https://github.com/influxdata/telegraf/pull/1275): Allow wildcard filtering of varnish stats.
- [#1142](https://github.com/influxdata/telegraf/pull/1142): Support for glob patterns in exec plugin commands configuration.
- [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified
- [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second.
- [#1278](https://github.com/influxdata/telegraf/pull/1278) & [#1288](https://github.com/influxdata/telegraf/pull/1288) & [#1295](https://github.com/influxdata/telegraf/pull/1295): RabbitMQ/Apache/InfluxDB inputs: made url(s) parameter optional by using reasonable input defaults if not specified
- [#1296](https://github.com/influxdata/telegraf/issues/1296): Refactor of flush_jitter argument.
- [#1213](https://github.com/influxdata/telegraf/issues/1213): Add inactive & active memory to mem plugin.
### Bugfixes
@ -35,6 +46,10 @@ time before a new metric is included by the plugin.
- [#1258](https://github.com/influxdata/telegraf/pull/1258): Fix potential kernel plugin integer parse error.
- [#1268](https://github.com/influxdata/telegraf/pull/1268): Fix potential influxdb input type assertion panic.
- [#1283](https://github.com/influxdata/telegraf/pull/1283): Still send processes metrics if a process exited during metric collection.
- [#1297](https://github.com/influxdata/telegraf/issues/1297): disk plugin panic when usage grab fails.
- [#1316](https://github.com/influxdata/telegraf/pull/1316): Removed leaked "database" tag on redis metrics. Thanks @PierreF!
- [#1323](https://github.com/influxdata/telegraf/issues/1323): Processes plugin: fix potential error with /proc/net/stat directory.
- [#1322](https://github.com/influxdata/telegraf/issues/1322): Fix rare RHEL 5.2 panic in gopsutil diskio gathering function.
## v0.13.1 [2016-05-24]

View File

@ -212,8 +212,8 @@ func (s *Simple) Close() error {
}
func (s *Simple) Write(metrics []telegraf.Metric) error {
for _, pt := range points {
// write `pt` to the output sink here
for _, metric := range metrics {
// write `metric` to the output sink here
}
return nil
}

3
Godeps
View File

@ -23,6 +23,7 @@ github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a
github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/consul 5aa90455ce78d4d41578bafc86305e6e6b28d7d2
github.com/hpcloud/tail b2940955ab8b26e19d43a43c4da0475dd81bdb56
github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb e094138084855d444195b252314dfee9eae34cab
@ -42,7 +43,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil 83c6e72cbdef6e8ada934549abf700ff0ba96776
github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c

View File

@ -145,6 +145,8 @@ Currently implemented sources:
* [cassandra](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/cassandra)
* [ceph](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/ceph)
* [chrony](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/chrony)
* [consul](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/consul)
* [conntrack](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/conntrack)
* [couchbase](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchbase)
* [couchdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchdb)
* [disque](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/disque)
@ -205,6 +207,7 @@ Currently implemented sources:
* swap
* processes
* kernel (/proc/stat)
* kernel (/proc/vmstat)
Telegraf can also collect metrics via the following service plugins:
@ -231,6 +234,7 @@ want to add support for another service or third-party API.
* [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog)
* [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file)
* [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite)
* [graylog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graylog)
* [instrumental](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/instrumental)
* [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka)
* [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato)

View File

@ -1,17 +1,15 @@
package agent
import (
cryptorand "crypto/rand"
"fmt"
"log"
"math/big"
"math/rand"
"os"
"runtime"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"
)
@ -115,27 +113,16 @@ func (a *Agent) gatherer(
ticker := time.NewTicker(interval)
defer ticker.Stop()
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for {
var outerr error
start := time.Now()
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}
internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
start := time.Now()
gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start)
@ -274,6 +261,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
@ -283,35 +271,10 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}
}
// jitterInterval applies the the interval jitter to the flush interval using
// crypto/rand number generator
func jitterInterval(ininterval, injitter time.Duration) time.Duration {
var jitter int64
outinterval := ininterval
if injitter.Nanoseconds() != 0 {
maxjitter := big.NewInt(injitter.Nanoseconds())
if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil {
jitter = j.Int64()
}
outinterval = time.Duration(jitter + ininterval.Nanoseconds())
}
if outinterval.Nanoseconds() < time.Duration(500*time.Millisecond).Nanoseconds() {
log.Printf("Flush interval %s too low, setting to 500ms\n", outinterval)
outinterval = time.Duration(500 * time.Millisecond)
}
return outinterval
}
// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
a.Config.Agent.FlushInterval.Duration = jitterInterval(
a.Config.Agent.FlushInterval.Duration,
a.Config.Agent.FlushJitter.Duration)
log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s \n",
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,

View File

@ -2,7 +2,6 @@ package agent
import (
"testing"
"time"
"github.com/influxdata/telegraf/internal/config"
@ -110,75 +109,3 @@ func TestAgent_LoadOutput(t *testing.T) {
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
}
func TestAgent_ZeroJitter(t *testing.T) {
flushinterval := jitterInterval(time.Duration(10*time.Second),
time.Duration(0*time.Second))
actual := flushinterval.Nanoseconds()
exp := time.Duration(10 * time.Second).Nanoseconds()
if actual != exp {
t.Errorf("Actual %v, expected %v", actual, exp)
}
}
func TestAgent_ZeroInterval(t *testing.T) {
min := time.Duration(500 * time.Millisecond).Nanoseconds()
max := time.Duration(5 * time.Second).Nanoseconds()
for i := 0; i < 1000; i++ {
flushinterval := jitterInterval(time.Duration(0*time.Second),
time.Duration(5*time.Second))
actual := flushinterval.Nanoseconds()
if actual > max {
t.Errorf("Didn't expect interval %d to be > %d", actual, max)
break
}
if actual < min {
t.Errorf("Didn't expect interval %d to be < %d", actual, min)
break
}
}
}
func TestAgent_ZeroBoth(t *testing.T) {
flushinterval := jitterInterval(time.Duration(0*time.Second),
time.Duration(0*time.Second))
actual := flushinterval
exp := time.Duration(500 * time.Millisecond)
if actual != exp {
t.Errorf("Actual %v, expected %v", actual, exp)
}
}
func TestAgent_JitterMax(t *testing.T) {
max := time.Duration(32 * time.Second).Nanoseconds()
for i := 0; i < 1000; i++ {
flushinterval := jitterInterval(time.Duration(30*time.Second),
time.Duration(2*time.Second))
actual := flushinterval.Nanoseconds()
if actual > max {
t.Errorf("Didn't expect interval %d to be > %d", actual, max)
break
}
}
}
func TestAgent_JitterMin(t *testing.T) {
min := time.Duration(30 * time.Second).Nanoseconds()
for i := 0; i < 1000; i++ {
flushinterval := jitterInterval(time.Duration(30*time.Second),
time.Duration(2*time.Second))
actual := flushinterval.Nanoseconds()
if actual < min {
t.Errorf("Didn't expect interval %d to be < %d", actual, min)
break
}
}
}

View File

@ -154,12 +154,18 @@
#
# ## Amazon Credentials
# ## Credentials are loaded in the following order
# ## 1) explicit credentials from 'access_key' and 'secret_key'
# ## 2) environment variables
# ## 3) shared credentials file
# ## 4) EC2 Instance Profile
# ## 1) Assumed credentials via STS if role_arn is specified
# ## 2) explicit credentials from 'access_key' and 'secret_key'
# ## 3) shared profile from 'profile'
# ## 4) environment variables
# ## 5) shared credentials file
# ## 6) EC2 Instance Profile
# #access_key = ""
# #secret_key = ""
# #token = ""
# #role_arn = ""
# #profile = ""
# #shared_credential_file = ""
#
# ## Namespace for the CloudWatch MetricDatums
# namespace = 'InfluxData/Telegraf'
@ -199,6 +205,12 @@
# timeout = 2
# # Send telegraf metrics to graylog(s)
# [[outputs.graylog]]
# ## Udp endpoint for your graylog instance.
# servers = ["127.0.0.1:12201", "192.168.1.1:12201"]
# # Configuration for sending metrics to an Instrumental project
# [[outputs.instrumental]]
# ## Project API Token (required)
@ -271,12 +283,18 @@
#
# ## Amazon Credentials
# ## Credentials are loaded in the following order
# ## 1) explicit credentials from 'access_key' and 'secret_key'
# ## 2) environment variables
# ## 3) shared credentials file
# ## 4) EC2 Instance Profile
# ## 1) Assumed credentials via STS if role_arn is specified
# ## 2) explicit credentials from 'access_key' and 'secret_key'
# ## 3) shared profile from 'profile'
# ## 4) environment variables
# ## 5) shared credentials file
# ## 6) EC2 Instance Profile
# #access_key = ""
# #secret_key = ""
# #token = ""
# #role_arn = ""
# #profile = ""
# #shared_credential_file = ""
#
# ## Kinesis StreamName must exist prior to starting telegraf.
# streamname = "StreamName"
@ -513,12 +531,18 @@
#
# ## Amazon Credentials
# ## Credentials are loaded in the following order
# ## 1) explicit credentials from 'access_key' and 'secret_key'
# ## 2) environment variables
# ## 3) shared credentials file
# ## 4) EC2 Instance Profile
# ## 1) Assumed credentials via STS if role_arn is specified
# ## 2) explicit credentials from 'access_key' and 'secret_key'
# ## 3) shared profile from 'profile'
# ## 4) environment variables
# ## 5) shared credentials file
# ## 6) EC2 Instance Profile
# #access_key = ""
# #secret_key = ""
# #token = ""
# #role_arn = ""
# #profile = ""
# #shared_credential_file = ""
#
# ## Requested CloudWatch aggregation Period (required - must be a multiple of 60s)
# period = '1m'
@ -530,6 +554,10 @@
# ## gaps or overlap in pulled data
# interval = '1m'
#
# ## Configure the TTL for the internal cache of metrics.
# ## Defaults to 1 hr if not specified
# #cache_ttl = '10m'
#
# ## Metric Statistic Namespace (required)
# namespace = 'AWS/ELB'
#
@ -545,6 +573,23 @@
# # value = 'p-example'
# # Gather health check statuses from services registered in Consul
# [[inputs.consul]]
# ## Most of these values defaults to the one configured on a Consul's agent level.
# ## Optional Consul server address (default: "localhost")
# # address = "localhost"
# ## Optional URI scheme for the Consul server (default: "http")
# # scheme = "http"
# ## Optional ACL token used in every request (default: "")
# # token = ""
# ## Optional username used for request HTTP Basic Authentication (default: "")
# # username = ""
# ## Optional password used for HTTP Basic Authentication (default: "")
# # password = ""
# ## Optional data centre to query the health checks from (default: "")
# # datacentre = ""
# # Read metrics from one or many couchbase clusters
# [[inputs.couchbase]]
# ## specify servers via a url matching:
@ -637,7 +682,11 @@
# # Read metrics from one or more commands that can output to stdout
# [[inputs.exec]]
# ## Commands array
# commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
# commands = [
# "/tmp/test.sh",
# "/usr/bin/mycollector --foo=bar",
# "/tmp/collect_*.sh"
# ]
#
# ## Timeout for each command to complete.
# timeout = "5s"
@ -668,6 +717,43 @@
# md5 = false
# # Read flattened metrics from one or more GrayLog HTTP endpoints
# [[inputs.graylog]]
# ## API endpoint, currently supported API:
# ##
# ## - multiple (Ex http://<host>:12900/system/metrics/multiple)
# ## - namespace (Ex http://<host>:12900/system/metrics/namespace/{namespace})
# ##
# ## For namespace endpoint, the metrics array will be ignored for that call.
# ## Endpoint can contain namespace and multiple type calls.
# ##
# ## Please check http://[graylog-server-ip]:12900/api-browser for full list
# ## of endpoints
# servers = [
# "http://[graylog-server-ip]:12900/system/metrics/multiple",
# ]
#
# ## Metrics list
# ## List of metrics can be found on Graylog webservice documentation.
# ## Or by hitting the the web service api at:
# ## http://[graylog-host]:12900/system/metrics
# metrics = [
# "jvm.cl.loaded",
# "jvm.memory.pools.Metaspace.committed"
# ]
#
# ## Username and password
# username = ""
# password = ""
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
# # Read metrics of haproxy, via socket or csv stats page
# [[inputs.haproxy]]
# ## An array of address to gather stats about. Specify an ip on hostname
@ -696,6 +782,13 @@
# # body = '''
# # {'fake':'data'}
# # '''
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
# # Read flattened metrics from one or more JSON HTTP endpoints
@ -746,6 +839,7 @@
# ## See the influxdb plugin's README for more details.
#
# ## Multiple URLs from which to read InfluxDB-formatted JSON
# ## Default is "http://localhost:8086/debug/vars".
# urls = [
# "http://localhost:8086/debug/vars"
# ]
@ -1360,22 +1454,23 @@
#
# ## By default, telegraf gather stats for 3 metric points.
# ## Setting stats will override the defaults shown below.
# ## stats may also be set to ["all"], which will collect all stats
# ## Glob matching can be used, ie, stats = ["MAIN.*"]
# ## stats may also be set to ["*"], which will collect all stats
# stats = ["MAIN.cache_hit", "MAIN.cache_miss", "MAIN.uptime"]
# # Read metrics of ZFS from arcstats, zfetchstats and vdev_cache_stats
# # Read metrics of ZFS from arcstats, zfetchstats, vdev_cache_stats, and pools
# [[inputs.zfs]]
# ## ZFS kstat path
# ## ZFS kstat path. Ignored on FreeBSD
# ## If not specified, then default is:
# kstatPath = "/proc/spl/kstat/zfs"
# # kstatPath = "/proc/spl/kstat/zfs"
#
# ## By default, telegraf gather all zfs stats
# ## If not specified, then default is:
# kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
# # kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
#
# ## By default, don't gather zpool stats
# poolMetrics = false
# # poolMetrics = false
# # Reads 'mntr' stats from one or many zookeeper servers

View File

@ -58,7 +58,6 @@ func NewConfig() *Config {
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushJitter: internal.Duration{Duration: 5 * time.Second},
},
Tags: make(map[string]string),
@ -357,7 +356,7 @@ func printConfig(name string, p printer, op string, commented bool) {
fmt.Print("\n")
continue
}
fmt.Print(comment + line + "\n")
fmt.Print(strings.TrimRight(comment+line, " ") + "\n")
}
}
}

View File

@ -0,0 +1,37 @@
package errchan
import (
"fmt"
"strings"
)
type ErrChan struct {
C chan error
}
// New returns an error channel of max length 'n'
// errors can be sent to the ErrChan.C channel, and will be returned when
// ErrChan.Error() is called.
func New(n int) *ErrChan {
return &ErrChan{
C: make(chan error, n),
}
}
// Error closes the ErrChan.C channel and returns an error if there are any
// non-nil errors, otherwise returns nil.
func (e *ErrChan) Error() error {
close(e.C)
var out string
for err := range e.C {
if err != nil {
out += "[" + err.Error() + "], "
}
}
if out != "" {
return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", "))
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math/big"
"os"
"os/exec"
"strconv"
@ -228,3 +229,27 @@ func CompileFilter(filters []string) (glob.Glob, error) {
}
return out, err
}
// RandomSleep will sleep for a random amount of time up to max.
// If the shutdown channel is closed, it will return before it has finished
// sleeping.
func RandomSleep(max time.Duration, shutdown chan struct{}) {
if max == 0 {
return
}
maxSleep := big.NewInt(max.Nanoseconds())
var sleepns int64
if j, err := rand.Int(rand.Reader, maxSleep); err == nil {
sleepns = j.Int64()
}
t := time.NewTimer(time.Nanosecond * time.Duration(sleepns))
select {
case <-t.C:
return
case <-shutdown:
t.Stop()
return
}
}

View File

@ -137,3 +137,28 @@ func TestCompileFilter(t *testing.T) {
assert.True(t, f.Match("mem"))
assert.True(t, f.Match("network"))
}
func TestRandomSleep(t *testing.T) {
// test that zero max returns immediately
s := time.Now()
RandomSleep(time.Duration(0), make(chan struct{}))
elapsed := time.Since(s)
assert.True(t, elapsed < time.Millisecond)
// test that max sleep is respected
s = time.Now()
RandomSleep(time.Millisecond*50, make(chan struct{}))
elapsed = time.Since(s)
assert.True(t, elapsed < time.Millisecond*50)
// test that shutdown is respected
s = time.Now()
shutdown := make(chan struct{})
go func() {
time.Sleep(time.Millisecond * 100)
close(shutdown)
}()
RandomSleep(time.Second, shutdown)
elapsed = time.Since(s)
assert.True(t, elapsed < time.Millisecond*150)
}

View File

@ -0,0 +1,59 @@
package limiter
import (
"sync"
"time"
)
// NewRateLimiter returns a rate limiter that will will emit from the C
// channel only 'n' times every 'rate' seconds.
func NewRateLimiter(n int, rate time.Duration) *rateLimiter {
r := &rateLimiter{
C: make(chan bool),
rate: rate,
n: n,
shutdown: make(chan bool),
}
r.wg.Add(1)
go r.limiter()
return r
}
type rateLimiter struct {
C chan bool
rate time.Duration
n int
shutdown chan bool
wg sync.WaitGroup
}
func (r *rateLimiter) Stop() {
close(r.shutdown)
r.wg.Wait()
close(r.C)
}
func (r *rateLimiter) limiter() {
defer r.wg.Done()
ticker := time.NewTicker(r.rate)
defer ticker.Stop()
counter := 0
for {
select {
case <-r.shutdown:
return
case <-ticker.C:
counter = 0
default:
if counter < r.n {
select {
case r.C <- true:
counter++
case <-r.shutdown:
return
}
}
}
}
}

View File

@ -0,0 +1,54 @@
package limiter
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestRateLimiter(t *testing.T) {
r := NewRateLimiter(5, time.Second)
ticker := time.NewTicker(time.Millisecond * 75)
// test that we can only get 5 receives from the rate limiter
counter := 0
outer:
for {
select {
case <-r.C:
counter++
case <-ticker.C:
break outer
}
}
assert.Equal(t, 5, counter)
r.Stop()
// verify that the Stop function closes the channel.
_, ok := <-r.C
assert.False(t, ok)
}
func TestRateLimiterMultipleIterations(t *testing.T) {
r := NewRateLimiter(5, time.Millisecond*50)
ticker := time.NewTicker(time.Millisecond * 250)
// test that we can get 15 receives from the rate limiter
counter := 0
outer:
for {
select {
case <-ticker.C:
break outer
case <-r.C:
counter++
}
}
assert.True(t, counter > 10)
r.Stop()
// verify that the Stop function closes the channel.
_, ok := <-r.C
assert.False(t, ok)
}

View File

@ -8,6 +8,8 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/ceph"
_ "github.com/influxdata/telegraf/plugins/inputs/chrony"
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/inputs/conntrack"
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
@ -18,6 +20,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"

View File

@ -3,6 +3,7 @@ package cloudwatch
import (
"fmt"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
@ -12,6 +13,8 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -165,25 +168,27 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
}
metricCount := len(metrics)
var errChan = make(chan error, metricCount)
errChan := errchan.New(metricCount)
now := time.Now()
// limit concurrency or we can easily exhaust user connection limit
semaphore := make(chan byte, 64)
// see cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(10, time.Second)
defer lmtr.Stop()
var wg sync.WaitGroup
wg.Add(len(metrics))
for _, m := range metrics {
semaphore <- 0x1
go c.gatherMetric(acc, m, now, semaphore, errChan)
<-lmtr.C
go func(inm *cloudwatch.Metric) {
defer wg.Done()
c.gatherMetric(acc, inm, now, errChan.C)
}(m)
}
wg.Wait()
for i := 1; i <= metricCount; i++ {
err := <-errChan
if err != nil {
return err
}
}
return nil
return errChan.Error()
}
func init() {
@ -257,12 +262,16 @@ func (c *CloudWatch) fetchNamespaceMetrics() (metrics []*cloudwatch.Metric, err
/*
* Gather given Metric and emit any error
*/
func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.Metric, now time.Time, semaphore chan byte, errChan chan error) {
func (c *CloudWatch) gatherMetric(
acc telegraf.Accumulator,
metric *cloudwatch.Metric,
now time.Time,
errChan chan error,
) {
params := c.getStatisticsInput(metric, now)
resp, err := c.client.GetMetricStatistics(params)
if err != nil {
errChan <- err
<-semaphore
return
}
@ -299,7 +308,6 @@ func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.M
}
errChan <- nil
<-semaphore
}
/*

View File

@ -0,0 +1,56 @@
# Conntrack Plugin
Collects stats from Netfilter's conntrack-tools.
The conntrack-tools provide a mechanism for tracking various aspects of
network connections as they are processed by netfilter. At runtime,
conntrack exposes many of those connection statistics within /proc/sys/net.
Depending on your kernel version, these files can be found in either
/proc/sys/net/ipv4/netfilter or /proc/sys/net/netfilter and will be
prefixed with either ip_ or nf_. This plugin reads the files specified
in its configuration and publishes each one as a field, with the prefix
normalized to ip_.
In order to simplify configuration in a heterogeneous environment, a superset
of directory and filenames can be specified. Any locations that don't exist
will be ignored.
For more information on conntrack-tools, see the
[Netfilter Documentation](http://conntrack-tools.netfilter.org/).
### Configuration:
```toml
# Collects conntrack stats from the configured directories and files.
[[inputs.conntrack]]
## The following defaults would work with multiple versions of conntrack.
## Note the nf_ and ip_ filename prefixes are mutually exclusive across
## kernel versions, as are the directory locations.
## Superset of filenames to look for within the conntrack dirs.
## Missing files will be ignored.
files = ["ip_conntrack_count","ip_conntrack_max",
"nf_conntrack_count","nf_conntrack_max"]
## Directories to search within for the conntrack files above.
## Missing directrories will be ignored.
dirs = ["/proc/sys/net/ipv4/netfilter","/proc/sys/net/netfilter"]
```
### Measurements & Fields:
- conntrack
- ip_conntrack_count (int, count): the number of entries in the conntrack table
- ip_conntrack_max (int, size): the max capacity of the conntrack table
### Tags:
This input does not use tags.
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter conntrack -test
conntrack,host=myhost ip_conntrack_count=2,ip_conntrack_max=262144 1461620427667995735
```

View File

@ -0,0 +1,119 @@
// +build linux
package conntrack
import (
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"log"
"path/filepath"
)
type Conntrack struct {
Path string
Dirs []string
Files []string
}
const (
inputName = "conntrack"
)
var dfltDirs = []string{
"/proc/sys/net/ipv4/netfilter",
"/proc/sys/net/netfilter",
}
var dfltFiles = []string{
"ip_conntrack_count",
"ip_conntrack_max",
"nf_conntrack_count",
"nf_conntrack_max",
}
func (c *Conntrack) setDefaults() {
if len(c.Dirs) == 0 {
c.Dirs = dfltDirs
}
if len(c.Files) == 0 {
c.Files = dfltFiles
}
}
func (c *Conntrack) Description() string {
return "Collects conntrack stats from the configured directories and files."
}
var sampleConfig = `
## The following defaults would work with multiple versions of conntrack.
## Note the nf_ and ip_ filename prefixes are mutually exclusive across
## kernel versions, as are the directory locations.
## Superset of filenames to look for within the conntrack dirs.
## Missing files will be ignored.
files = ["ip_conntrack_count","ip_conntrack_max",
"nf_conntrack_count","nf_conntrack_max"]
## Directories to search within for the conntrack files above.
## Missing directrories will be ignored.
dirs = ["/proc/sys/net/ipv4/netfilter","/proc/sys/net/netfilter"]
`
func (c *Conntrack) SampleConfig() string {
return sampleConfig
}
func (c *Conntrack) Gather(acc telegraf.Accumulator) error {
c.setDefaults()
var metricKey string
fields := make(map[string]interface{})
for _, dir := range c.Dirs {
for _, file := range c.Files {
// NOTE: no system will have both nf_ and ip_ prefixes,
// so we're safe to branch on suffix only.
parts := strings.SplitN(file, "_", 2)
if len(parts) < 2 {
continue
}
metricKey = "ip_" + parts[1]
fName := filepath.Join(dir, file)
if _, err := os.Stat(fName); err != nil {
continue
}
contents, err := ioutil.ReadFile(fName)
if err != nil {
log.Printf("failed to read file '%s': %v", fName, err)
}
v := strings.TrimSpace(string(contents))
fields[metricKey], err = strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("failed to parse metric, expected number but "+
" found '%s': %v", v, err)
}
}
}
if len(fields) == 0 {
return fmt.Errorf("Conntrack input failed to collect metrics. " +
"Is the conntrack kernel module loaded?")
}
acc.AddFields(inputName, fields, nil)
return nil
}
func init() {
inputs.Add(inputName, func() telegraf.Input { return &Conntrack{} })
}

View File

@ -0,0 +1,3 @@
// +build !linux
package conntrack

View File

@ -0,0 +1,90 @@
// +build linux
package conntrack
import (
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"testing"
)
func restoreDflts(savedFiles, savedDirs []string) {
dfltFiles = savedFiles
dfltDirs = savedDirs
}
func TestNoFilesFound(t *testing.T) {
defer restoreDflts(dfltFiles, dfltDirs)
dfltFiles = []string{"baz.txt"}
dfltDirs = []string{"./foo/bar"}
c := &Conntrack{}
acc := &testutil.Accumulator{}
err := c.Gather(acc)
assert.EqualError(t, err, "Conntrack input failed to collect metrics. "+
"Is the conntrack kernel module loaded?")
}
func TestDefaultsUsed(t *testing.T) {
defer restoreDflts(dfltFiles, dfltDirs)
tmpdir, err := ioutil.TempDir("", "tmp1")
assert.NoError(t, err)
defer os.Remove(tmpdir)
tmpFile, err := ioutil.TempFile(tmpdir, "ip_conntrack_count")
assert.NoError(t, err)
dfltDirs = []string{tmpdir}
fname := path.Base(tmpFile.Name())
dfltFiles = []string{fname}
count := 1234321
ioutil.WriteFile(tmpFile.Name(), []byte(strconv.Itoa(count)), 0660)
c := &Conntrack{}
acc := &testutil.Accumulator{}
c.Gather(acc)
acc.AssertContainsFields(t, inputName, map[string]interface{}{
fname: float64(count)})
}
func TestConfigsUsed(t *testing.T) {
defer restoreDflts(dfltFiles, dfltDirs)
tmpdir, err := ioutil.TempDir("", "tmp1")
assert.NoError(t, err)
defer os.Remove(tmpdir)
cntFile, err := ioutil.TempFile(tmpdir, "nf_conntrack_count")
maxFile, err := ioutil.TempFile(tmpdir, "nf_conntrack_max")
assert.NoError(t, err)
dfltDirs = []string{tmpdir}
cntFname := path.Base(cntFile.Name())
maxFname := path.Base(maxFile.Name())
dfltFiles = []string{cntFname, maxFname}
count := 1234321
max := 9999999
ioutil.WriteFile(cntFile.Name(), []byte(strconv.Itoa(count)), 0660)
ioutil.WriteFile(maxFile.Name(), []byte(strconv.Itoa(max)), 0660)
c := &Conntrack{}
acc := &testutil.Accumulator{}
c.Gather(acc)
fix := func(s string) string {
return strings.Replace(s, "nf_", "ip_", 1)
}
acc.AssertContainsFields(t, inputName,
map[string]interface{}{
fix(cntFname): float64(count),
fix(maxFname): float64(max),
})
}

View File

@ -0,0 +1,46 @@
# Telegraf Input Plugin: Consul
This plugin will collect statistics about all helath checks registered in the Consul. It uses [Consul API](https://www.consul.io/docs/agent/http/health.html#health_state)
to query the data. It will not report the [telemetry](https://www.consul.io/docs/agent/telemetry.html) but Consul can report those stats already using StatsD protocol if needed.
## Configuration:
```
# Gather health check statuses from services registered in Consul
[[inputs.consul]]
## Most of these values defaults to the one configured on a Consul's agent level.
## Optional Consul server address (default: "")
# address = ""
## Optional URI scheme for the Consul server (default: "")
# scheme = ""
## Optional ACL token used in every request (default: "")
# token = ""
## Optional username used for request HTTP Basic Authentication (default: "")
# username = ""
## Optional password used for HTTP Basic Authentication (default: "")
# password = ""
## Optional data centre to query the health checks from (default: "")
# datacentre = ""
```
## Measurements:
### Consul:
Tags:
- node: on which node check/service is registered on
- service_name: name of the service (this is the service name not the service ID)
Fields:
- check_id
- check_name
- service_id
- status
## Example output
```
$ telegraf --config ./telegraf.conf -input-filter consul -test
* Plugin: consul, Collection 1
> consul_health_checks,host=wolfpit,node=consul-server-node check_id="serfHealth",check_name="Serf Health Status",service_id="",status="passing" 1464698464486439902
> consul_health_checks,host=wolfpit,node=consul-server-node,service_name=www.example.com check_id="service:www-example-com.test01",check_name="Service 'www.example.com' check",service_id="www-example-com.test01",status="critical" 1464698464486519036
```

View File

@ -0,0 +1,136 @@
package consul
import (
"net/http"
"github.com/hashicorp/consul/api"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Consul struct {
Address string
Scheme string
Token string
Username string
Password string
Datacentre string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// client used to connect to Consul agnet
client *api.Client
}
var sampleConfig = `
## Most of these values defaults to the one configured on a Consul's agent level.
## Optional Consul server address (default: "localhost")
# address = "localhost"
## Optional URI scheme for the Consul server (default: "http")
# scheme = "http"
## Optional ACL token used in every request (default: "")
# token = ""
## Optional username used for request HTTP Basic Authentication (default: "")
# username = ""
## Optional password used for HTTP Basic Authentication (default: "")
# password = ""
## Optional data centre to query the health checks from (default: "")
# datacentre = ""
`
func (c *Consul) Description() string {
return "Gather health check statuses from services registered in Consul"
}
func (c *Consul) SampleConfig() string {
return sampleConfig
}
func (c *Consul) createAPIClient() (*api.Client, error) {
config := api.DefaultConfig()
if c.Address != "" {
config.Address = c.Address
}
if c.Scheme != "" {
config.Scheme = c.Scheme
}
if c.Datacentre != "" {
config.Datacenter = c.Datacentre
}
if c.Username != "" {
config.HttpAuth = &api.HttpBasicAuth{
Username: c.Username,
Password: c.Password,
}
}
tlsCfg, err := internal.GetTLSConfig(
c.SSLCert, c.SSLKey, c.SSLCA, c.InsecureSkipVerify)
if err != nil {
return nil, err
}
config.HttpClient.Transport = &http.Transport{
TLSClientConfig: tlsCfg,
}
return api.NewClient(config)
}
func (c *Consul) GatherHealthCheck(acc telegraf.Accumulator, checks []*api.HealthCheck) {
for _, check := range checks {
record := make(map[string]interface{})
tags := make(map[string]string)
record["check_id"] = check.CheckID
record["check_name"] = check.Name
record["service_id"] = check.ServiceID
record["status"] = check.Status
tags["node"] = check.Node
tags["service_name"] = check.ServiceName
acc.AddFields("consul_health_checks", record, tags)
}
}
func (c *Consul) Gather(acc telegraf.Accumulator) error {
if c.client == nil {
newClient, err := c.createAPIClient()
if err != nil {
return err
}
c.client = newClient
}
checks, _, err := c.client.Health().State("any", nil)
if err != nil {
return err
}
c.GatherHealthCheck(acc, checks)
return nil
}
func init() {
inputs.Add("consul", func() telegraf.Input {
return &Consul{}
})
}

View File

@ -0,0 +1,42 @@
package consul
import (
"testing"
"github.com/hashicorp/consul/api"
"github.com/influxdata/telegraf/testutil"
)
var sampleChecks = []*api.HealthCheck{
&api.HealthCheck{
Node: "localhost",
CheckID: "foo.health123",
Name: "foo.health",
Status: "passing",
Notes: "lorem ipsum",
Output: "OK",
ServiceID: "foo.123",
ServiceName: "foo",
},
}
func TestGatherHealtCheck(t *testing.T) {
expectedFields := map[string]interface{}{
"check_id": "foo.health123",
"check_name": "foo.health",
"status": "passing",
"service_id": "foo.123",
}
expectedTags := map[string]string{
"node": "localhost",
"service_name": "foo",
}
var acc testutil.Accumulator
consul := &Consul{}
consul.GatherHealthCheck(&acc, sampleChecks)
acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags)
}

View File

@ -2,14 +2,13 @@ package elasticsearch
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
errChan := make(chan error, len(e.Servers))
errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))
@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
url = s + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
errChan <- err
errChan.C <- err
return
}
if e.ClusterHealth {
@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
wg.Wait()
close(errChan)
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
return errChan.Error()
}
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {

View File

@ -11,39 +11,22 @@ all scripts matching glob pattern ```/tmp/collect_*.sh``` are configured for ```
in JSON format. Glob patterns are matched on every run, so adding new scripts that match the pattern
will cause them to be picked up immediately.
```
```toml
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
# Full command line to executable with parameters, or a glob pattern to run all matching files.
commands = ["/tmp/test.sh", "/tmp/test2.sh", "/tmp/collect_*.sh"]
## Timeout for each command to complete.
timeout = "5s"
# Data format to consume.
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Below configuration will be used for data_format = "graphite", can be ignored for other data_format
## If matching multiple measurement files, this string will be used to join the matched values.
#separator = "."
## Each template line requires a template pattern. It can have an optional
## filter before the template and separated by spaces. It can also have optional extra
## tags following the template. Multiple tags should be separated by commas and no spaces
## similar to the line protocol format. The can be only one default template.
## Templates support below format:
## 1. filter + template
## 2. filter + template + extra tag
## 3. filter + template with field key
## 4. default template
#templates = [
# "*.app env.service.resource.measurement",
# "stats.* .host.measurement* region=us-west,agent=sensu",
# "stats2.* .host.measurement.field",
# "measurement*"
#]
```
Other options for modifying the measurement names are:
@ -82,7 +65,7 @@ in influx line-protocol format.
#### Configuration
```
```toml
[[inputs.exec]]
# Shell/commands array
# compatible with old version
@ -90,6 +73,9 @@ in influx line-protocol format.
# command = "/usr/bin/line_protocol_collector"
commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"]
## Timeout for each command to complete.
timeout = "5s"
# Data format to consume.
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "influx"
@ -123,12 +109,16 @@ We can also change the data_format to "graphite" to use the metrics collecting s
In this example a script called /tmp/test.sh and a script called /tmp/test2.sh are configured for [[inputs.exec]] in graphite format.
#### Configuration
```
```toml
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
commands = ["/tmp/test.sh","/tmp/test2.sh"]
## Timeout for each command to complete.
timeout = "5s"
# Data format to consume.
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "graphite"

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -182,23 +183,15 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
}
}
e.errChan = make(chan error, len(commands))
errChan := errchan.New(len(commands))
e.errChan = errChan.C
e.wg.Add(len(commands))
for _, command := range commands {
go e.ProcessCommand(command, acc)
}
e.wg.Wait()
select {
default:
close(e.errChan)
return nil
case err := <-e.errChan:
close(e.errChan)
return err
}
return errChan.Error()
}
func init() {

View File

@ -0,0 +1,55 @@
# GrayLog plugin
The Graylog plugin can collect data from remote Graylog service URLs.
Plugin currently support two type of end points:-
- multiple (Ex http://[graylog-server-ip]:12900/system/metrics/multiple)
- namespace (Ex http://[graylog-server-ip]:12900/system/metrics/namespace/{namespace})
End Point can be a mixe of one multiple end point and several namespaces end points
Note: if namespace end point specified metrics array will be ignored for that call.
### Configuration:
```toml
# Read flattened metrics from one or more GrayLog HTTP endpoints
[[inputs.graylog]]
## API endpoint, currently supported API:
##
## - multiple (Ex http://<host>:12900/system/metrics/multiple)
## - namespace (Ex http://<host>:12900/system/metrics/namespace/{namespace})
##
## For namespace endpoint, the metrics array will be ignored for that call.
## Endpoint can contain namespace and multiple type calls.
##
## Please check http://[graylog-server-ip]:12900/api-browser for full list
## of endpoints
servers = [
"http://[graylog-server-ip]:12900/system/metrics/multiple",
]
## Metrics list
## List of metrics can be found on Graylog webservice documentation.
## Or by hitting the the web service api at:
## http://[graylog-host]:12900/system/metrics
metrics = [
"jvm.cl.loaded",
"jvm.memory.pools.Metaspace.committed"
]
## Username and password
username = ""
password = ""
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```
Please refer to GrayLog metrics api browser for full metric end points http://host:12900/api-browser

View File

@ -0,0 +1,312 @@
package graylog
import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
type ResponseMetrics struct {
total int
Metrics []Metric `json:"metrics"`
}
type Metric struct {
FullName string `json:"full_name"`
Name string `json:"name"`
Type string `json:"type"`
Fields map[string]interface{} `json:"metric"`
}
type GrayLog struct {
Servers []string
Metrics []string
Username string
Password string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
client HTTPClient
}
type HTTPClient interface {
// Returns the result of an http request
//
// Parameters:
// req: HTTP request object
//
// Returns:
// http.Response: HTTP respons object
// error : Any error that may have occurred
MakeRequest(req *http.Request) (*http.Response, error)
SetHTTPClient(client *http.Client)
HTTPClient() *http.Client
}
type Messagebody struct {
Metrics []string `json:"metrics"`
}
type RealHTTPClient struct {
client *http.Client
}
func (c *RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
func (c *RealHTTPClient) SetHTTPClient(client *http.Client) {
c.client = client
}
func (c *RealHTTPClient) HTTPClient() *http.Client {
return c.client
}
var sampleConfig = `
## API endpoint, currently supported API:
##
## - multiple (Ex http://<host>:12900/system/metrics/multiple)
## - namespace (Ex http://<host>:12900/system/metrics/namespace/{namespace})
##
## For namespace endpoint, the metrics array will be ignored for that call.
## Endpoint can contain namespace and multiple type calls.
##
## Please check http://[graylog-server-ip]:12900/api-browser for full list
## of endpoints
servers = [
"http://[graylog-server-ip]:12900/system/metrics/multiple",
]
## Metrics list
## List of metrics can be found on Graylog webservice documentation.
## Or by hitting the the web service api at:
## http://[graylog-host]:12900/system/metrics
metrics = [
"jvm.cl.loaded",
"jvm.memory.pools.Metaspace.committed"
]
## Username and password
username = ""
password = ""
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
func (h *GrayLog) SampleConfig() string {
return sampleConfig
}
func (h *GrayLog) Description() string {
return "Read flattened metrics from one or more GrayLog HTTP endpoints"
}
// Gathers data for all servers.
func (h *GrayLog) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
if h.client.HTTPClient() == nil {
tlsCfg, err := internal.GetTLSConfig(
h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify)
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
h.client.SetHTTPClient(client)
}
errorChannel := make(chan error, len(h.Servers))
for _, server := range h.Servers {
wg.Add(1)
go func(server string) {
defer wg.Done()
if err := h.gatherServer(acc, server); err != nil {
errorChannel <- err
}
}(server)
}
wg.Wait()
close(errorChannel)
// Get all errors and return them as one giant error
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
// Gathers data from a particular server
// Parameters:
// acc : The telegraf Accumulator to use
// serverURL: endpoint to send request to
// service : the service being queried
//
// Returns:
// error: Any error that may have occurred
func (h *GrayLog) gatherServer(
acc telegraf.Accumulator,
serverURL string,
) error {
resp, _, err := h.sendRequest(serverURL)
if err != nil {
return err
}
requestURL, err := url.Parse(serverURL)
host, port, _ := net.SplitHostPort(requestURL.Host)
var dat ResponseMetrics
if err != nil {
return err
}
if err := json.Unmarshal([]byte(resp), &dat); err != nil {
return err
}
for _, m_item := range dat.Metrics {
fields := make(map[string]interface{})
tags := map[string]string{
"server": host,
"port": port,
"name": m_item.Name,
"type": m_item.Type,
}
h.flatten(m_item.Fields, fields, "")
acc.AddFields(m_item.FullName, fields, tags)
}
return nil
}
// Flatten JSON hierarchy to produce field name and field value
// Parameters:
// item: Item map to flatten
// fields: Map to store generated fields.
// id: Prefix for top level metric (empty string "")
// Returns:
// void
func (h *GrayLog) flatten(item map[string]interface{}, fields map[string]interface{}, id string) {
if id != "" {
id = id + "_"
}
for k, i := range item {
switch i.(type) {
case int:
fields[id+k] = i.(float64)
case float64:
fields[id+k] = i.(float64)
case map[string]interface{}:
h.flatten(i.(map[string]interface{}), fields, id+k)
default:
}
}
}
// Sends an HTTP request to the server using the GrayLog object's HTTPClient.
// Parameters:
// serverURL: endpoint to send request to
//
// Returns:
// string: body of the response
// error : Any error that may have occurred
func (h *GrayLog) sendRequest(serverURL string) (string, float64, error) {
headers := map[string]string{
"Content-Type": "application/json",
"Accept": "application/json",
}
method := "GET"
content := bytes.NewBufferString("")
headers["Authorization"] = "Basic " + base64.URLEncoding.EncodeToString([]byte(h.Username+":"+h.Password))
// Prepare URL
requestURL, err := url.Parse(serverURL)
if err != nil {
return "", -1, fmt.Errorf("Invalid server URL \"%s\"", serverURL)
}
if strings.Contains(requestURL.String(), "multiple") {
m := &Messagebody{Metrics: h.Metrics}
http_body, err := json.Marshal(m)
if err != nil {
return "", -1, fmt.Errorf("Invalid list of Metrics %s", h.Metrics)
}
method = "POST"
content = bytes.NewBuffer(http_body)
}
req, err := http.NewRequest(method, requestURL.String(), content)
if err != nil {
return "", -1, err
}
// Add header parameters
for k, v := range headers {
req.Header.Add(k, v)
}
start := time.Now()
resp, err := h.client.MakeRequest(req)
if err != nil {
return "", -1, err
}
defer resp.Body.Close()
responseTime := time.Since(start).Seconds()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return string(body), responseTime, err
}
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestURL.String(),
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return string(body), responseTime, err
}
return string(body), responseTime, err
}
func init() {
inputs.Add("graylog", func() telegraf.Input {
return &GrayLog{
client: &RealHTTPClient{},
}
})
}

View File

@ -0,0 +1,199 @@
package graylog
import (
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const validJSON = `
{
"total": 3,
"metrics": [
{
"full_name": "jvm.cl.loaded",
"metric": {
"value": 18910
},
"name": "loaded",
"type": "gauge"
},
{
"full_name": "jvm.memory.pools.Metaspace.committed",
"metric": {
"value": 108040192
},
"name": "committed",
"type": "gauge"
},
{
"full_name": "org.graylog2.shared.journal.KafkaJournal.writeTime",
"metric": {
"time": {
"min": 99
},
"rate": {
"total": 10,
"mean": 2
},
"duration_unit": "microseconds",
"rate_unit": "events/second"
},
"name": "writeTime",
"type": "hdrtimer"
}
]
}`
var validTags = map[string]map[string]string{
"jvm.cl.loaded": {
"name": "loaded",
"type": "gauge",
"port": "12900",
"server": "localhost",
},
"jvm.memory.pools.Metaspace.committed": {
"name": "committed",
"type": "gauge",
"port": "12900",
"server": "localhost",
},
"org.graylog2.shared.journal.KafkaJournal.writeTime": {
"name": "writeTime",
"type": "hdrtimer",
"port": "12900",
"server": "localhost",
},
}
var expectedFields = map[string]map[string]interface{}{
"jvm.cl.loaded": {
"value": float64(18910),
},
"jvm.memory.pools.Metaspace.committed": {
"value": float64(108040192),
},
"org.graylog2.shared.journal.KafkaJournal.writeTime": {
"time_min": float64(99),
"rate_total": float64(10),
"rate_mean": float64(2),
},
}
const invalidJSON = "I don't think this is JSON"
const empty = ""
type mockHTTPClient struct {
responseBody string
statusCode int
}
// Mock implementation of MakeRequest. Usually returns an http.Response with
// hard-coded responseBody and statusCode. However, if the request uses a
// nonstandard method, it uses status code 405 (method not allowed)
func (c *mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
resp := http.Response{}
resp.StatusCode = c.statusCode
// basic error checking on request method
allowedMethods := []string{"GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"}
methodValid := false
for _, method := range allowedMethods {
if req.Method == method {
methodValid = true
break
}
}
if !methodValid {
resp.StatusCode = 405 // Method not allowed
}
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
return &resp, nil
}
func (c *mockHTTPClient) SetHTTPClient(_ *http.Client) {
}
func (c *mockHTTPClient) HTTPClient() *http.Client {
return nil
}
// Generates a pointer to an HttpJson object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genMockGrayLog(response string, statusCode int) []*GrayLog {
return []*GrayLog{
&GrayLog{
client: &mockHTTPClient{responseBody: response, statusCode: statusCode},
Servers: []string{
"http://localhost:12900/system/metrics/multiple",
},
Metrics: []string{
"jvm.memory.pools.Metaspace.committed",
"jvm.cl.loaded",
"org.graylog2.shared.journal.KafkaJournal.writeTime",
},
Username: "test",
Password: "test",
},
}
}
// Test that the proper values are ignored or collected
func TestNormalResponse(t *testing.T) {
graylog := genMockGrayLog(validJSON, 200)
for _, service := range graylog {
var acc testutil.Accumulator
err := service.Gather(&acc)
require.NoError(t, err)
for k, v := range expectedFields {
acc.AssertContainsTaggedFields(t, k, v, validTags[k])
}
}
}
// Test response to HTTP 500
func TestHttpJson500(t *testing.T) {
graylog := genMockGrayLog(validJSON, 500)
var acc testutil.Accumulator
err := graylog[0].Gather(&acc)
assert.NotNil(t, err)
assert.Equal(t, 0, acc.NFields())
}
// Test response to malformed JSON
func TestHttpJsonBadJson(t *testing.T) {
graylog := genMockGrayLog(invalidJSON, 200)
var acc testutil.Accumulator
err := graylog[0].Gather(&acc)
assert.NotNil(t, err)
assert.Equal(t, 0, acc.NFields())
}
// Test response to empty string as response objectgT
func TestHttpJsonEmptyResponse(t *testing.T) {
graylog := genMockGrayLog(empty, 200)
var acc testutil.Accumulator
err := graylog[0].Gather(&acc)
assert.NotNil(t, err)
assert.Equal(t, 0, acc.NFields())
}

View File

@ -3,8 +3,6 @@ package haproxy
import (
"encoding/csv"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io"
"net"
"net/http"
@ -13,6 +11,10 @@ import (
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
)
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
@ -113,20 +115,17 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
}
var wg sync.WaitGroup
var outerr error
for _, serv := range g.Servers {
wg.Add(1)
errChan := errchan.New(len(g.Servers))
wg.Add(len(g.Servers))
for _, server := range g.Servers {
go func(serv string) {
defer wg.Done()
outerr = g.gatherServer(serv, acc)
}(serv)
errChan.C <- g.gatherServer(serv, acc)
}(server)
}
wg.Wait()
return outerr
return errChan.Error()
}
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {

View File

@ -15,6 +15,7 @@ InfluxDB-formatted endpoints. See below for more information.
## See the influxdb plugin's README for more details.
## Multiple URLs from which to read InfluxDB-formatted JSON
## Default is "http://localhost:8086/debug/vars".
urls = [
"http://localhost:8086/debug/vars"
]

View File

@ -28,6 +28,7 @@ func (*InfluxDB) SampleConfig() string {
## See the influxdb plugin's README for more details.
## Multiple URLs from which to read InfluxDB-formatted JSON
## Default is "http://localhost:8086/debug/vars".
urls = [
"http://localhost:8086/debug/vars"
]
@ -35,6 +36,9 @@ func (*InfluxDB) SampleConfig() string {
}
func (i *InfluxDB) Gather(acc telegraf.Accumulator) error {
if len(i.URLs) == 0 {
i.URLs = []string{"http://localhost:8086/debug/vars"}
}
errorChannel := make(chan error, len(i.URLs))
var wg sync.WaitGroup

View File

@ -5,9 +5,11 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -129,20 +131,18 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
}
}
var errChan = make(chan error, len(gatherFunctions))
var wg sync.WaitGroup
wg.Add(len(gatherFunctions))
errChan := errchan.New(len(gatherFunctions))
for _, f := range gatherFunctions {
go f(r, acc, errChan)
go func(gf gatherFunc) {
defer wg.Done()
gf(r, acc, errChan.C)
}(f)
}
wg.Wait()
for i := 1; i <= len(gatherFunctions); i++ {
err := <-errChan
if err != nil {
return err
}
}
return nil
return errChan.Error()
}
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {

View File

@ -241,10 +241,14 @@ func gatherKeyspaceLine(
name string,
line string,
acc telegraf.Accumulator,
tags map[string]string,
global_tags map[string]string,
) {
if strings.Contains(line, "keys=") {
fields := make(map[string]interface{})
tags := make(map[string]string)
for k, v := range global_tags {
tags[k] = v
}
tags["database"] = name
dbparts := strings.Split(line, ",")
for _, dbp := range dbparts {

View File

@ -35,6 +35,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
err := gatherInfoOutput(rdr, &acc, tags)
require.NoError(t, err)
tags = map[string]string{"host": "redis.net", "role": "master"}
fields := map[string]interface{}{
"uptime": uint64(238),
"clients": uint64(1),
@ -70,13 +71,14 @@ func TestRedis_ParseMetrics(t *testing.T) {
"used_cpu_user_children": float64(0.00),
"keyspace_hitrate": float64(0.50),
}
keyspaceTags := map[string]string{"host": "redis.net", "role": "master", "database": "db0"}
keyspaceFields := map[string]interface{}{
"avg_ttl": uint64(0),
"expires": uint64(0),
"keys": uint64(2),
}
acc.AssertContainsTaggedFields(t, "redis", fields, tags)
acc.AssertContainsTaggedFields(t, "redis_keyspace", keyspaceFields, tags)
acc.AssertContainsTaggedFields(t, "redis_keyspace", keyspaceFields, keyspaceTags)
}
const testOutput = `# Server

View File

@ -0,0 +1,225 @@
# Kernel VMStat Input Plugin
The kernel_vmstat plugin gathers virtual memory statistics
by reading /proc/vmstat. For a full list of available fields see the
/proc/vmstat section of the [proc man page](http://man7.org/linux/man-pages/man5/proc.5.html).
For a better idea of what each field represents, see the
[vmstat man page](http://linux.die.net/man/8/vmstat).
```
/proc/vmstat
kernel/system statistics. Common entries include (from http://www.linuxinsight.com/proc_vmstat.html):
Number of pages that are dirty, under writeback or unstable:
nr_dirty 1550
nr_writeback 0
nr_unstable 0
Number of pages allocated to page tables, mapped by files or allocated by the kernel slab allocator:
nr_page_table_pages 699
nr_mapped 139596
nr_slab 42723
Number of pageins and pageouts (since the last boot):
pgpgin 33754195
pgpgout 38985992
Number of swapins and swapouts (since the last boot):
pswpin 2473
pswpout 2995
Number of page allocations per zone (since the last boot):
pgalloc_high 0
pgalloc_normal 110123213
pgalloc_dma32 0
pgalloc_dma 415219
Number of page frees, activations and deactivations (since the last boot):
pgfree 110549163
pgactivate 4509729
pgdeactivate 2136215
Number of minor and major page faults (since the last boot):
pgfault 80663722
pgmajfault 49813
Number of page refills (per zone, since the last boot):
pgrefill_high 0
pgrefill_normal 5817500
pgrefill_dma32 0
pgrefill_dma 149176
Number of page steals (per zone, since the last boot):
pgsteal_high 0
pgsteal_normal 10421346
pgsteal_dma32 0
pgsteal_dma 142196
Number of pages scanned by the kswapd daemon (per zone, since the last boot):
pgscan_kswapd_high 0
pgscan_kswapd_normal 10491424
pgscan_kswapd_dma32 0
pgscan_kswapd_dma 156130
Number of pages reclaimed directly (per zone, since the last boot):
pgscan_direct_high 0
pgscan_direct_normal 11904
pgscan_direct_dma32 0
pgscan_direct_dma 225
Number of pages reclaimed via inode freeing (since the last boot):
pginodesteal 11
Number of slab objects scanned (since the last boot):
slabs_scanned 8926976
Number of pages reclaimed by kswapd (since the last boot):
kswapd_steal 10551674
Number of pages reclaimed by kswapd via inode freeing (since the last boot):
kswapd_inodesteal 338730
Number of kswapd's calls to page reclaim (since the last boot):
pageoutrun 181908
Number of direct reclaim calls (since the last boot):
allocstall 160
Miscellaneous statistics:
pgrotated 3781
nr_bounce 0
```
### Configuration:
```toml
# Get kernel statistics from /proc/vmstat
[[inputs.kernel_vmstat]]
# no configuration
```
### Measurements & Fields:
- kernel_vmstat
- nr_free_pages (integer, `nr_free_pages`)
- nr_inactive_anon (integer, `nr_inactive_anon`)
- nr_active_anon (integer, `nr_active_anon`)
- nr_inactive_file (integer, `nr_inactive_file`)
- nr_active_file (integer, `nr_active_file`)
- nr_unevictable (integer, `nr_unevictable`)
- nr_mlock (integer, `nr_mlock`)
- nr_anon_pages (integer, `nr_anon_pages`)
- nr_mapped (integer, `nr_mapped`)
- nr_file_pages (integer, `nr_file_pages`)
- nr_dirty (integer, `nr_dirty`)
- nr_writeback (integer, `nr_writeback`)
- nr_slab_reclaimable (integer, `nr_slab_reclaimable`)
- nr_slab_unreclaimable (integer, `nr_slab_unreclaimable`)
- nr_page_table_pages (integer, `nr_page_table_pages`)
- nr_kernel_stack (integer, `nr_kernel_stack`)
- nr_unstable (integer, `nr_unstable`)
- nr_bounce (integer, `nr_bounce`)
- nr_vmscan_write (integer, `nr_vmscan_write`)
- nr_writeback_temp (integer, `nr_writeback_temp`)
- nr_isolated_anon (integer, `nr_isolated_anon`)
- nr_isolated_file (integer, `nr_isolated_file`)
- nr_shmem (integer, `nr_shmem`)
- numa_hit (integer, `numa_hit`)
- numa_miss (integer, `numa_miss`)
- numa_foreign (integer, `numa_foreign`)
- numa_interleave (integer, `numa_interleave`)
- numa_local (integer, `numa_local`)
- numa_other (integer, `numa_other`)
- nr_anon_transparent_hugepages (integer, `nr_anon_transparent_hugepages`)
- pgpgin (integer, `pgpgin`)
- pgpgout (integer, `pgpgout`)
- pswpin (integer, `pswpin`)
- pswpout (integer, `pswpout`)
- pgalloc_dma (integer, `pgalloc_dma`)
- pgalloc_dma32 (integer, `pgalloc_dma32`)
- pgalloc_normal (integer, `pgalloc_normal`)
- pgalloc_movable (integer, `pgalloc_movable`)
- pgfree (integer, `pgfree`)
- pgactivate (integer, `pgactivate`)
- pgdeactivate (integer, `pgdeactivate`)
- pgfault (integer, `pgfault`)
- pgmajfault (integer, `pgmajfault`)
- pgrefill_dma (integer, `pgrefill_dma`)
- pgrefill_dma32 (integer, `pgrefill_dma32`)
- pgrefill_normal (integer, `pgrefill_normal`)
- pgrefill_movable (integer, `pgrefill_movable`)
- pgsteal_dma (integer, `pgsteal_dma`)
- pgsteal_dma32 (integer, `pgsteal_dma32`)
- pgsteal_normal (integer, `pgsteal_normal`)
- pgsteal_movable (integer, `pgsteal_movable`)
- pgscan_kswapd_dma (integer, `pgscan_kswapd_dma`)
- pgscan_kswapd_dma32 (integer, `pgscan_kswapd_dma32`)
- pgscan_kswapd_normal (integer, `pgscan_kswapd_normal`)
- pgscan_kswapd_movable (integer, `pgscan_kswapd_movable`)
- pgscan_direct_dma (integer, `pgscan_direct_dma`)
- pgscan_direct_dma32 (integer, `pgscan_direct_dma32`)
- pgscan_direct_normal (integer, `pgscan_direct_normal`)
- pgscan_direct_movable (integer, `pgscan_direct_movable`)
- zone_reclaim_failed (integer, `zone_reclaim_failed`)
- pginodesteal (integer, `pginodesteal`)
- slabs_scanned (integer, `slabs_scanned`)
- kswapd_steal (integer, `kswapd_steal`)
- kswapd_inodesteal (integer, `kswapd_inodesteal`)
- kswapd_low_wmark_hit_quickly (integer, `kswapd_low_wmark_hit_quickly`)
- kswapd_high_wmark_hit_quickly (integer, `kswapd_high_wmark_hit_quickly`)
- kswapd_skip_congestion_wait (integer, `kswapd_skip_congestion_wait`)
- pageoutrun (integer, `pageoutrun`)
- allocstall (integer, `allocstall`)
- pgrotated (integer, `pgrotated`)
- compact_blocks_moved (integer, `compact_blocks_moved`)
- compact_pages_moved (integer, `compact_pages_moved`)
- compact_pagemigrate_failed (integer, `compact_pagemigrate_failed`)
- compact_stall (integer, `compact_stall`)
- compact_fail (integer, `compact_fail`)
- compact_success (integer, `compact_success`)
- htlb_buddy_alloc_success (integer, `htlb_buddy_alloc_success`)
- htlb_buddy_alloc_fail (integer, `htlb_buddy_alloc_fail`)
- unevictable_pgs_culled (integer, `unevictable_pgs_culled`)
- unevictable_pgs_scanned (integer, `unevictable_pgs_scanned`)
- unevictable_pgs_rescued (integer, `unevictable_pgs_rescued`)
- unevictable_pgs_mlocked (integer, `unevictable_pgs_mlocked`)
- unevictable_pgs_munlocked (integer, `unevictable_pgs_munlocked`)
- unevictable_pgs_cleared (integer, `unevictable_pgs_cleared`)
- unevictable_pgs_stranded (integer, `unevictable_pgs_stranded`)
- unevictable_pgs_mlockfreed (integer, `unevictable_pgs_mlockfreed`)
- thp_fault_alloc (integer, `thp_fault_alloc`)
- thp_fault_fallback (integer, `thp_fault_fallback`)
- thp_collapse_alloc (integer, `thp_collapse_alloc`)
- thp_collapse_alloc_failed (integer, `thp_collapse_alloc_failed`)
- thp_split (integer, `thp_split`)
### Tags:
None
### Example Output:
```
$ telegraf -config ~/ws/telegraf.conf -input-filter kernel_vmstat -test
* Plugin: kernel_vmstat, Collection 1
> kernel_vmstat allocstall=81496i,compact_blocks_moved=238196i,compact_fail=135220i,compact_pagemigrate_failed=0i,compact_pages_moved=6370588i,compact_stall=142092i,compact_success=6872i,htlb_buddy_alloc_fail=0i,htlb_buddy_alloc_success=0i,kswapd_high_wmark_hit_quickly=25439i,kswapd_inodesteal=29770874i,kswapd_low_wmark_hit_quickly=8756i,kswapd_skip_congestion_wait=0i,kswapd_steal=291534428i,nr_active_anon=2515657i,nr_active_file=2244914i,nr_anon_pages=1358675i,nr_anon_transparent_hugepages=2034i,nr_bounce=0i,nr_dirty=5690i,nr_file_pages=5153546i,nr_free_pages=78730i,nr_inactive_anon=426259i,nr_inactive_file=2366791i,nr_isolated_anon=0i,nr_isolated_file=0i,nr_kernel_stack=579i,nr_mapped=558821i,nr_mlock=0i,nr_page_table_pages=11115i,nr_shmem=541689i,nr_slab_reclaimable=459806i,nr_slab_unreclaimable=47859i,nr_unevictable=0i,nr_unstable=0i,nr_vmscan_write=6206i,nr_writeback=0i,nr_writeback_temp=0i,numa_foreign=0i,numa_hit=5113399878i,numa_interleave=35793i,numa_local=5113399878i,numa_miss=0i,numa_other=0i,pageoutrun=505006i,pgactivate=375664931i,pgalloc_dma=0i,pgalloc_dma32=122480220i,pgalloc_movable=0i,pgalloc_normal=5233176719i,pgdeactivate=122735906i,pgfault=8699921410i,pgfree=5359765021i,pginodesteal=9188431i,pgmajfault=122210i,pgpgin=219717626i,pgpgout=3495885510i,pgrefill_dma=0i,pgrefill_dma32=1180010i,pgrefill_movable=0i,pgrefill_normal=119866676i,pgrotated=60620i,pgscan_direct_dma=0i,pgscan_direct_dma32=12256i,pgscan_direct_movable=0i,pgscan_direct_normal=31501600i,pgscan_kswapd_dma=0i,pgscan_kswapd_dma32=4480608i,pgscan_kswapd_movable=0i,pgscan_kswapd_normal=287857984i,pgsteal_dma=0i,pgsteal_dma32=4466436i,pgsteal_movable=0i,pgsteal_normal=318463755i,pswpin=2092i,pswpout=6206i,slabs_scanned=93775616i,thp_collapse_alloc=24857i,thp_collapse_alloc_failed=102214i,thp_fault_alloc=346219i,thp_fault_fallback=895453i,thp_split=9817i,unevictable_pgs_cleared=0i,unevictable_pgs_culled=1531i,unevictable_pgs_mlocked=6988i,unevictable_pgs_mlockfreed=0i,unevictable_pgs_munlocked=6988i,unevictable_pgs_rescued=5426i,unevictable_pgs_scanned=0i,unevictable_pgs_stranded=0i,zone_reclaim_failed=0i 1459455200071462843
```

View File

@ -18,7 +18,7 @@ It is supposed to be used to monitor actual memory usage in a cross platform fas
designed for informational purposes only.
- **free**: memory not being used at all (zeroed) that is readily available; note
that this doesn't reflect the actual memory available (use 'available' instead).
- **used_percent**: the percentage usage calculated as `(total - used) / total * 100`
- **used_percent**: the percentage usage calculated as `used / total * 100`
## Measurements:
#### Raw Memory measurements:

View File

@ -0,0 +1,78 @@
// +build linux
package system
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"strconv"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type KernelVmstat struct {
statFile string
}
func (k *KernelVmstat) Description() string {
return "Get kernel statistics from /proc/vmstat"
}
func (k *KernelVmstat) SampleConfig() string {
return ""
}
func (k *KernelVmstat) Gather(acc telegraf.Accumulator) error {
data, err := k.getProcVmstat()
if err != nil {
return err
}
fields := make(map[string]interface{})
dataFields := bytes.Fields(data)
for i, field := range dataFields {
// dataFields is an array of {"stat1_name", "stat1_value", "stat2_name",
// "stat2_value", ...}
// We only want the even number index as that contain the stat name.
if i%2 == 0 {
// Convert the stat value into an integer.
m, err := strconv.Atoi(string(dataFields[i+1]))
if err != nil {
return err
}
fields[string(field)] = int64(m)
}
}
acc.AddFields("kernel_vmstat", fields, map[string]string{})
return nil
}
func (k *KernelVmstat) getProcVmstat() ([]byte, error) {
if _, err := os.Stat(k.statFile); os.IsNotExist(err) {
return nil, fmt.Errorf("kernel_vmstat: %s does not exist!", k.statFile)
} else if err != nil {
return nil, err
}
data, err := ioutil.ReadFile(k.statFile)
if err != nil {
return nil, err
}
return data, nil
}
func init() {
inputs.Add("kernel_vmstat", func() telegraf.Input {
return &KernelVmstat{
statFile: "/proc/vmstat",
}
})
}

View File

@ -0,0 +1,315 @@
// +build linux
package system
import (
"io/ioutil"
"os"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestFullVmStatProcFile(t *testing.T) {
tmpfile := makeFakeStatFile([]byte(vmStatFile_Full))
defer os.Remove(tmpfile)
k := KernelVmstat{
statFile: tmpfile,
}
acc := testutil.Accumulator{}
err := k.Gather(&acc)
assert.NoError(t, err)
fields := map[string]interface{}{
"nr_free_pages": int64(78730),
"nr_inactive_anon": int64(426259),
"nr_active_anon": int64(2515657),
"nr_inactive_file": int64(2366791),
"nr_active_file": int64(2244914),
"nr_unevictable": int64(0),
"nr_mlock": int64(0),
"nr_anon_pages": int64(1358675),
"nr_mapped": int64(558821),
"nr_file_pages": int64(5153546),
"nr_dirty": int64(5690),
"nr_writeback": int64(0),
"nr_slab_reclaimable": int64(459806),
"nr_slab_unreclaimable": int64(47859),
"nr_page_table_pages": int64(11115),
"nr_kernel_stack": int64(579),
"nr_unstable": int64(0),
"nr_bounce": int64(0),
"nr_vmscan_write": int64(6206),
"nr_writeback_temp": int64(0),
"nr_isolated_anon": int64(0),
"nr_isolated_file": int64(0),
"nr_shmem": int64(541689),
"numa_hit": int64(5113399878),
"numa_miss": int64(0),
"numa_foreign": int64(0),
"numa_interleave": int64(35793),
"numa_local": int64(5113399878),
"numa_other": int64(0),
"nr_anon_transparent_hugepages": int64(2034),
"pgpgin": int64(219717626),
"pgpgout": int64(3495885510),
"pswpin": int64(2092),
"pswpout": int64(6206),
"pgalloc_dma": int64(0),
"pgalloc_dma32": int64(122480220),
"pgalloc_normal": int64(5233176719),
"pgalloc_movable": int64(0),
"pgfree": int64(5359765021),
"pgactivate": int64(375664931),
"pgdeactivate": int64(122735906),
"pgfault": int64(8699921410),
"pgmajfault": int64(122210),
"pgrefill_dma": int64(0),
"pgrefill_dma32": int64(1180010),
"pgrefill_normal": int64(119866676),
"pgrefill_movable": int64(0),
"pgsteal_dma": int64(0),
"pgsteal_dma32": int64(4466436),
"pgsteal_normal": int64(318463755),
"pgsteal_movable": int64(0),
"pgscan_kswapd_dma": int64(0),
"pgscan_kswapd_dma32": int64(4480608),
"pgscan_kswapd_normal": int64(287857984),
"pgscan_kswapd_movable": int64(0),
"pgscan_direct_dma": int64(0),
"pgscan_direct_dma32": int64(12256),
"pgscan_direct_normal": int64(31501600),
"pgscan_direct_movable": int64(0),
"zone_reclaim_failed": int64(0),
"pginodesteal": int64(9188431),
"slabs_scanned": int64(93775616),
"kswapd_steal": int64(291534428),
"kswapd_inodesteal": int64(29770874),
"kswapd_low_wmark_hit_quickly": int64(8756),
"kswapd_high_wmark_hit_quickly": int64(25439),
"kswapd_skip_congestion_wait": int64(0),
"pageoutrun": int64(505006),
"allocstall": int64(81496),
"pgrotated": int64(60620),
"compact_blocks_moved": int64(238196),
"compact_pages_moved": int64(6370588),
"compact_pagemigrate_failed": int64(0),
"compact_stall": int64(142092),
"compact_fail": int64(135220),
"compact_success": int64(6872),
"htlb_buddy_alloc_success": int64(0),
"htlb_buddy_alloc_fail": int64(0),
"unevictable_pgs_culled": int64(1531),
"unevictable_pgs_scanned": int64(0),
"unevictable_pgs_rescued": int64(5426),
"unevictable_pgs_mlocked": int64(6988),
"unevictable_pgs_munlocked": int64(6988),
"unevictable_pgs_cleared": int64(0),
"unevictable_pgs_stranded": int64(0),
"unevictable_pgs_mlockfreed": int64(0),
"thp_fault_alloc": int64(346219),
"thp_fault_fallback": int64(895453),
"thp_collapse_alloc": int64(24857),
"thp_collapse_alloc_failed": int64(102214),
"thp_split": int64(9817),
}
acc.AssertContainsFields(t, "kernel_vmstat", fields)
}
func TestPartialVmStatProcFile(t *testing.T) {
tmpfile := makeFakeStatFile([]byte(vmStatFile_Partial))
defer os.Remove(tmpfile)
k := KernelVmstat{
statFile: tmpfile,
}
acc := testutil.Accumulator{}
err := k.Gather(&acc)
assert.NoError(t, err)
fields := map[string]interface{}{
"unevictable_pgs_culled": int64(1531),
"unevictable_pgs_scanned": int64(0),
"unevictable_pgs_rescued": int64(5426),
"unevictable_pgs_mlocked": int64(6988),
"unevictable_pgs_munlocked": int64(6988),
"unevictable_pgs_cleared": int64(0),
"unevictable_pgs_stranded": int64(0),
"unevictable_pgs_mlockfreed": int64(0),
"thp_fault_alloc": int64(346219),
"thp_fault_fallback": int64(895453),
"thp_collapse_alloc": int64(24857),
"thp_collapse_alloc_failed": int64(102214),
"thp_split": int64(9817),
}
acc.AssertContainsFields(t, "kernel_vmstat", fields)
}
func TestInvalidVmStatProcFile1(t *testing.T) {
tmpfile := makeFakeStatFile([]byte(vmStatFile_Invalid))
defer os.Remove(tmpfile)
k := KernelVmstat{
statFile: tmpfile,
}
acc := testutil.Accumulator{}
err := k.Gather(&acc)
assert.Error(t, err)
}
func TestNoVmStatProcFile(t *testing.T) {
tmpfile := makeFakeStatFile([]byte(vmStatFile_Invalid))
os.Remove(tmpfile)
k := KernelVmstat{
statFile: tmpfile,
}
acc := testutil.Accumulator{}
err := k.Gather(&acc)
assert.Error(t, err)
assert.Contains(t, err.Error(), "does not exist")
}
const vmStatFile_Full = `nr_free_pages 78730
nr_inactive_anon 426259
nr_active_anon 2515657
nr_inactive_file 2366791
nr_active_file 2244914
nr_unevictable 0
nr_mlock 0
nr_anon_pages 1358675
nr_mapped 558821
nr_file_pages 5153546
nr_dirty 5690
nr_writeback 0
nr_slab_reclaimable 459806
nr_slab_unreclaimable 47859
nr_page_table_pages 11115
nr_kernel_stack 579
nr_unstable 0
nr_bounce 0
nr_vmscan_write 6206
nr_writeback_temp 0
nr_isolated_anon 0
nr_isolated_file 0
nr_shmem 541689
numa_hit 5113399878
numa_miss 0
numa_foreign 0
numa_interleave 35793
numa_local 5113399878
numa_other 0
nr_anon_transparent_hugepages 2034
pgpgin 219717626
pgpgout 3495885510
pswpin 2092
pswpout 6206
pgalloc_dma 0
pgalloc_dma32 122480220
pgalloc_normal 5233176719
pgalloc_movable 0
pgfree 5359765021
pgactivate 375664931
pgdeactivate 122735906
pgfault 8699921410
pgmajfault 122210
pgrefill_dma 0
pgrefill_dma32 1180010
pgrefill_normal 119866676
pgrefill_movable 0
pgsteal_dma 0
pgsteal_dma32 4466436
pgsteal_normal 318463755
pgsteal_movable 0
pgscan_kswapd_dma 0
pgscan_kswapd_dma32 4480608
pgscan_kswapd_normal 287857984
pgscan_kswapd_movable 0
pgscan_direct_dma 0
pgscan_direct_dma32 12256
pgscan_direct_normal 31501600
pgscan_direct_movable 0
zone_reclaim_failed 0
pginodesteal 9188431
slabs_scanned 93775616
kswapd_steal 291534428
kswapd_inodesteal 29770874
kswapd_low_wmark_hit_quickly 8756
kswapd_high_wmark_hit_quickly 25439
kswapd_skip_congestion_wait 0
pageoutrun 505006
allocstall 81496
pgrotated 60620
compact_blocks_moved 238196
compact_pages_moved 6370588
compact_pagemigrate_failed 0
compact_stall 142092
compact_fail 135220
compact_success 6872
htlb_buddy_alloc_success 0
htlb_buddy_alloc_fail 0
unevictable_pgs_culled 1531
unevictable_pgs_scanned 0
unevictable_pgs_rescued 5426
unevictable_pgs_mlocked 6988
unevictable_pgs_munlocked 6988
unevictable_pgs_cleared 0
unevictable_pgs_stranded 0
unevictable_pgs_mlockfreed 0
thp_fault_alloc 346219
thp_fault_fallback 895453
thp_collapse_alloc 24857
thp_collapse_alloc_failed 102214
thp_split 9817`
const vmStatFile_Partial = `unevictable_pgs_culled 1531
unevictable_pgs_scanned 0
unevictable_pgs_rescued 5426
unevictable_pgs_mlocked 6988
unevictable_pgs_munlocked 6988
unevictable_pgs_cleared 0
unevictable_pgs_stranded 0
unevictable_pgs_mlockfreed 0
thp_fault_alloc 346219
thp_fault_fallback 895453
thp_collapse_alloc 24857
thp_collapse_alloc_failed 102214
thp_split 9817`
// invalid thp_split measurement
const vmStatFile_Invalid = `unevictable_pgs_culled 1531
unevictable_pgs_scanned 0
unevictable_pgs_rescued 5426
unevictable_pgs_mlocked 6988
unevictable_pgs_munlocked 6988
unevictable_pgs_cleared 0
unevictable_pgs_stranded 0
unevictable_pgs_mlockfreed 0
thp_fault_alloc 346219
thp_fault_fallback 895453
thp_collapse_alloc 24857
thp_collapse_alloc_failed 102214
thp_split abcd`
func makeFakeVmStatFile(content []byte) string {
tmpfile, err := ioutil.TempFile("", "kernel_vmstat_test")
if err != nil {
panic(err)
}
if _, err := tmpfile.Write(content); err != nil {
panic(err)
}
if err := tmpfile.Close(); err != nil {
panic(err)
}
return tmpfile.Name()
}

View File

@ -30,6 +30,8 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
"free": vm.Free,
"cached": vm.Cached,
"buffered": vm.Buffers,
"active": vm.Active,
"inactive": vm.Inactive,
"used_percent": 100 * float64(vm.Used) / float64(vm.Total),
"available_percent": 100 * float64(vm.Available) / float64(vm.Total),
}

View File

@ -19,8 +19,8 @@ func TestMemStats(t *testing.T) {
Available: 7600,
Used: 5000,
Free: 1235,
// Active: 8134,
// Inactive: 1124,
Active: 8134,
Inactive: 1124,
// Buffers: 771,
// Cached: 4312,
// Wired: 134,
@ -52,6 +52,8 @@ func TestMemStats(t *testing.T) {
"free": uint64(1235),
"cached": uint64(0),
"buffered": uint64(0),
"active": uint64(8134),
"inactive": uint64(1124),
}
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))

View File

@ -9,7 +9,7 @@ import (
"log"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strconv"
@ -19,7 +19,7 @@ import (
type Processes struct {
execPS func() ([]byte, error)
readProcFile func(statFile string) ([]byte, error)
readProcFile func(filename string) ([]byte, error)
forcePS bool
forceProc bool
@ -128,22 +128,16 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
// get process states from /proc/(pid)/stat files
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
files, err := ioutil.ReadDir("/proc")
filenames, err := filepath.Glob("/proc/[0-9]*/stat")
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
continue
}
for _, filename := range filenames {
_, err := os.Stat(filename)
statFile := path.Join("/proc", file.Name(), "stat")
data, err := p.readProcFile(statFile)
data, err := p.readProcFile(filename)
if err != nil {
if !file.IsDir() {
continue
}
return err
}
if data == nil {
@ -159,7 +153,7 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
stats := bytes.Fields(data)
if len(stats) < 3 {
return fmt.Errorf("Something is terribly wrong with %s", statFile)
return fmt.Errorf("Something is terribly wrong with %s", filename)
}
switch stats[0][0] {
case 'R':
@ -176,7 +170,7 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
string(stats[0][0]), statFile)
string(stats[0][0]), filename)
}
fields["total"] = fields["total"].(int64) + int64(1)
@ -190,15 +184,12 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
return nil
}
func readProcFile(statFile string) ([]byte, error) {
if _, err := os.Stat(statFile); os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
data, err := ioutil.ReadFile(statFile)
func readProcFile(filename string) ([]byte, error) {
data, err := ioutil.ReadFile(filename)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}

View File

@ -84,10 +84,10 @@ func (s *systemPS) DiskUsage(
mountpoint := os.Getenv("HOST_MOUNT_PREFIX") + p.Mountpoint
if _, err := os.Stat(mountpoint); err == nil {
du, err := disk.Usage(mountpoint)
du.Path = p.Mountpoint
if err != nil {
return nil, err
}
du.Path = p.Mountpoint
// If the mount point is a member of the exclude set,
// don't gather info on it.
_, ok := fstypeExcludeSet[p.Fstype]

View File

@ -32,7 +32,8 @@ func TestTailFromBeginning(t *testing.T) {
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
require.NoError(t, tt.Gather(&acc))
time.Sleep(time.Millisecond * 50)
// arbitrary sleep to wait for message to show up
time.Sleep(time.Millisecond * 250)
acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{

View File

@ -1,227 +1,294 @@
# Telegraf plugin: zfs
# ZFS plugin
Get ZFS stat from /proc/spl/kstat/zfs
This ZFS plugin provides metrics from your ZFS filesystems. It supports ZFS on
Linux and FreeBSD. It gets ZFS stat from `/proc/spl/kstat/zfs` on Linux and
from `sysctl` and `zpool` on FreeBSD.
# Measurements
### Configuration:
Meta:
```toml
[[inputs.zfs]]
## ZFS kstat path. Ignored on FreeBSD
## If not specified, then default is:
# kstatPath = "/proc/spl/kstat/zfs"
- tags: `pools=POOL1::POOL2`
## By default, telegraf gather all zfs stats
## If not specified, then default is:
# kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
Measurement names:
## By default, don't gather zpool stats
# poolMetrics = false
```
- arcstats_hits
- arcstats_misses
### Measurements & Fields:
By default this plugin collects metrics about **Arc**, **Zfetch**, and
**Vdev cache**. All these metrics are either counters or measure sizes
in bytes. These metrics will be in the `zfs` measurement with the field
names listed bellow.
If `poolMetrics` is enabled then additional metrics will be gathered for
each pool.
- zfs
With fields listed bellow.
#### Arc Stats
- arcstats_allocated (FreeBSD only)
- arcstats_anon_evict_data (Linux only)
- arcstats_anon_evict_metadata (Linux only)
- arcstats_anon_evictable_data (FreeBSD only)
- arcstats_anon_evictable_metadata (FreeBSD only)
- arcstats_anon_size
- arcstats_arc_loaned_bytes (Linux only)
- arcstats_arc_meta_limit
- arcstats_arc_meta_max
- arcstats_arc_meta_min (FreeBSD only)
- arcstats_arc_meta_used
- arcstats_arc_no_grow (Linux only)
- arcstats_arc_prune (Linux only)
- arcstats_arc_tempreserve (Linux only)
- arcstats_c
- arcstats_c_max
- arcstats_c_min
- arcstats_data_size
- arcstats_deleted
- arcstats_demand_data_hits
- arcstats_demand_data_misses
- arcstats_demand_hit_predictive_prefetch (FreeBSD only)
- arcstats_demand_metadata_hits
- arcstats_demand_metadata_misses
- arcstats_duplicate_buffers
- arcstats_duplicate_buffers_size
- arcstats_duplicate_reads
- arcstats_evict_l2_cached
- arcstats_evict_l2_eligible
- arcstats_evict_l2_ineligible
- arcstats_evict_l2_skip (FreeBSD only)
- arcstats_evict_not_enough (FreeBSD only)
- arcstats_evict_skip
- arcstats_hash_chain_max
- arcstats_hash_chains
- arcstats_hash_collisions
- arcstats_hash_elements
- arcstats_hash_elements_max
- arcstats_hdr_size
- arcstats_hits
- arcstats_l2_abort_lowmem
- arcstats_l2_asize
- arcstats_l2_cdata_free_on_write
- arcstats_l2_cksum_bad
- arcstats_l2_compress_failures
- arcstats_l2_compress_successes
- arcstats_l2_compress_zeros
- arcstats_l2_evict_l1cached (FreeBSD only)
- arcstats_l2_evict_lock_retry
- arcstats_l2_evict_reading
- arcstats_l2_feeds
- arcstats_l2_free_on_write
- arcstats_l2_hdr_size
- arcstats_l2_hits
- arcstats_l2_io_error
- arcstats_l2_misses
- arcstats_l2_read_bytes
- arcstats_l2_rw_clash
- arcstats_l2_size
- arcstats_l2_write_buffer_bytes_scanned (FreeBSD only)
- arcstats_l2_write_buffer_iter (FreeBSD only)
- arcstats_l2_write_buffer_list_iter (FreeBSD only)
- arcstats_l2_write_buffer_list_null_iter (FreeBSD only)
- arcstats_l2_write_bytes
- arcstats_l2_write_full (FreeBSD only)
- arcstats_l2_write_in_l2 (FreeBSD only)
- arcstats_l2_write_io_in_progress (FreeBSD only)
- arcstats_l2_write_not_cacheable (FreeBSD only)
- arcstats_l2_write_passed_headroom (FreeBSD only)
- arcstats_l2_write_pios (FreeBSD only)
- arcstats_l2_write_spa_mismatch (FreeBSD only)
- arcstats_l2_write_trylock_fail (FreeBSD only)
- arcstats_l2_writes_done
- arcstats_l2_writes_error
- arcstats_l2_writes_hdr_miss (Linux only)
- arcstats_l2_writes_lock_retry (FreeBSD only)
- arcstats_l2_writes_sent
- arcstats_memory_direct_count (Linux only)
- arcstats_memory_indirect_count (Linux only)
- arcstats_memory_throttle_count
- arcstats_meta_size (Linux only)
- arcstats_mfu_evict_data (Linux only)
- arcstats_mfu_evict_metadata (Linux only)
- arcstats_mfu_ghost_evict_data (Linux only)
- arcstats_mfu_ghost_evict_metadata (Linux only)
- arcstats_metadata_size (FreeBSD only)
- arcstats_mfu_evictable_data (FreeBSD only)
- arcstats_mfu_evictable_metadata (FreeBSD only)
- arcstats_mfu_ghost_evictable_data (FreeBSD only)
- arcstats_mfu_ghost_evictable_metadata (FreeBSD only)
- arcstats_mfu_ghost_hits
- arcstats_mfu_ghost_size
- arcstats_mfu_hits
- arcstats_mfu_size
- arcstats_misses
- arcstats_mru_evict_data (Linux only)
- arcstats_mru_evict_metadata (Linux only)
- arcstats_mru_ghost_evict_data (Linux only)
- arcstats_mru_ghost_evict_metadata (Linux only)
- arcstats_mru_evictable_data (FreeBSD only)
- arcstats_mru_evictable_metadata (FreeBSD only)
- arcstats_mru_ghost_evictable_data (FreeBSD only)
- arcstats_mru_ghost_evictable_metadata (FreeBSD only)
- arcstats_mru_ghost_hits
- arcstats_mru_ghost_size
- arcstats_mru_hits
- arcstats_mru_size
- arcstats_mutex_miss
- arcstats_other_size
- arcstats_p
- arcstats_prefetch_data_hits
- arcstats_prefetch_data_misses
- arcstats_prefetch_metadata_hits
- arcstats_prefetch_metadata_misses
- arcstats_mru_hits
- arcstats_mru_ghost_hits
- arcstats_mfu_hits
- arcstats_mfu_ghost_hits
- arcstats_deleted
- arcstats_recycle_miss
- arcstats_mutex_miss
- arcstats_evict_skip
- arcstats_evict_l2_cached
- arcstats_evict_l2_eligible
- arcstats_evict_l2_ineligible
- arcstats_hash_elements
- arcstats_hash_elements_max
- arcstats_hash_collisions
- arcstats_hash_chains
- arcstats_hash_chain_max
- arcstats_p
- arcstats_c
- arcstats_c_min
- arcstats_c_max
- arcstats_recycle_miss (Linux only)
- arcstats_size
- arcstats_hdr_size
- arcstats_data_size
- arcstats_meta_size
- arcstats_other_size
- arcstats_anon_size
- arcstats_anon_evict_data
- arcstats_anon_evict_metadata
- arcstats_mru_size
- arcstats_mru_evict_data
- arcstats_mru_evict_metadata
- arcstats_mru_ghost_size
- arcstats_mru_ghost_evict_data
- arcstats_mru_ghost_evict_metadata
- arcstats_mfu_size
- arcstats_mfu_evict_data
- arcstats_mfu_evict_metadata
- arcstats_mfu_ghost_size
- arcstats_mfu_ghost_evict_data
- arcstats_mfu_ghost_evict_metadata
- arcstats_l2_hits
- arcstats_l2_misses
- arcstats_l2_feeds
- arcstats_l2_rw_clash
- arcstats_l2_read_bytes
- arcstats_l2_write_bytes
- arcstats_l2_writes_sent
- arcstats_l2_writes_done
- arcstats_l2_writes_error
- arcstats_l2_writes_hdr_miss
- arcstats_l2_evict_lock_retry
- arcstats_l2_evict_reading
- arcstats_l2_free_on_write
- arcstats_l2_cdata_free_on_write
- arcstats_l2_abort_lowmem
- arcstats_l2_cksum_bad
- arcstats_l2_io_error
- arcstats_l2_size
- arcstats_l2_asize
- arcstats_l2_hdr_size
- arcstats_l2_compress_successes
- arcstats_l2_compress_zeros
- arcstats_l2_compress_failures
- arcstats_memory_throttle_count
- arcstats_duplicate_buffers
- arcstats_duplicate_buffers_size
- arcstats_duplicate_reads
- arcstats_memory_direct_count
- arcstats_memory_indirect_count
- arcstats_arc_no_grow
- arcstats_arc_tempreserve
- arcstats_arc_loaned_bytes
- arcstats_arc_prune
- arcstats_arc_meta_used
- arcstats_arc_meta_limit
- arcstats_arc_meta_max
- arcstats_sync_wait_for_async (FreeBSD only)
#### Zfetch Stats
- zfetchstats_bogus_streams (Linux only)
- zfetchstats_colinear_hits (Linux only)
- zfetchstats_colinear_misses (Linux only)
- zfetchstats_hits
- zfetchstats_max_streams (FreeBSD only)
- zfetchstats_misses
- zfetchstats_colinear_hits
- zfetchstats_colinear_misses
- zfetchstats_stride_hits
- zfetchstats_stride_misses
- zfetchstats_reclaim_successes
- zfetchstats_reclaim_failures
- zfetchstats_streams_resets
- zfetchstats_streams_noresets
- zfetchstats_bogus_streams
- zfetchstats_reclaim_failures (Linux only)
- zfetchstats_reclaim_successes (Linux only)
- zfetchstats_streams_noresets (Linux only)
- zfetchstats_streams_resets (Linux only)
- zfetchstats_stride_hits (Linux only)
- zfetchstats_stride_misses (Linux only)
#### Vdev Cache Stats
- vdev_cache_stats_delegations
- vdev_cache_stats_hits
- vdev_cache_stats_misses
#### Pool Metrics (optional)
On Linux:
- zfs_pool
- nread (integer, )
- nwritten (integer, )
- reads (integer, )
- writes (integer, )
- wtime (integer, )
- wlentime (integer, )
- wupdate (integer, )
- rtime (integer, )
- rlentime (integer, )
- rupdate (integer, )
- wcnt (integer, )
- rcnt (integer, )
On FreeBSD:
- zfs_pool
- allocated (integer, bytes)
- capacity (integer, bytes)
- dedupratio (float, ratio)
- free (integer, bytes)
- size (integer, bytes)
- fragmentation (integer, percent)
### Tags:
- ZFS stats (`zfs`) will have the following tag:
- pools - A `::` concatenated list of all ZFS pools on the machine.
- Pool metrics (`zfs_pool`) will have the following tag:
- pool - with the name of the pool which the metrics are for.
- health - the health status of the pool. (FreeBSD only)
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter zfs -test
* Plugin: zfs, Collection 1
> zfs_pool,health=ONLINE,pool=zroot allocated=1578590208i,capacity=2i,dedupratio=1,fragmentation=1i,free=64456531968i,size=66035122176i 1464473103625653908
> zfs,pools=zroot arcstats_allocated=4167764i,arcstats_anon_evictable_data=0i,arcstats_anon_evictable_metadata=0i,arcstats_anon_size=16896i,arcstats_arc_meta_limit=10485760i,arcstats_arc_meta_max=115269568i,arcstats_arc_meta_min=8388608i,arcstats_arc_meta_used=51977456i,arcstats_c=16777216i,arcstats_c_max=41943040i,arcstats_c_min=16777216i,arcstats_data_size=0i,arcstats_deleted=1699340i,arcstats_demand_data_hits=14836131i,arcstats_demand_data_misses=2842945i,arcstats_demand_hit_predictive_prefetch=0i,arcstats_demand_metadata_hits=1655006i,arcstats_demand_metadata_misses=830074i,arcstats_duplicate_buffers=0i,arcstats_duplicate_buffers_size=0i,arcstats_duplicate_reads=123i,arcstats_evict_l2_cached=0i,arcstats_evict_l2_eligible=332172623872i,arcstats_evict_l2_ineligible=6168576i,arcstats_evict_l2_skip=0i,arcstats_evict_not_enough=12189444i,arcstats_evict_skip=195190764i,arcstats_hash_chain_max=2i,arcstats_hash_chains=10i,arcstats_hash_collisions=43134i,arcstats_hash_elements=2268i,arcstats_hash_elements_max=6136i,arcstats_hdr_size=565632i,arcstats_hits=16515778i,arcstats_l2_abort_lowmem=0i,arcstats_l2_asize=0i,arcstats_l2_cdata_free_on_write=0i,arcstats_l2_cksum_bad=0i,arcstats_l2_compress_failures=0i,arcstats_l2_compress_successes=0i,arcstats_l2_compress_zeros=0i,arcstats_l2_evict_l1cached=0i,arcstats_l2_evict_lock_retry=0i,arcstats_l2_evict_reading=0i,arcstats_l2_feeds=0i,arcstats_l2_free_on_write=0i,arcstats_l2_hdr_size=0i,arcstats_l2_hits=0i,arcstats_l2_io_error=0i,arcstats_l2_misses=0i,arcstats_l2_read_bytes=0i,arcstats_l2_rw_clash=0i,arcstats_l2_size=0i,arcstats_l2_write_buffer_bytes_scanned=0i,arcstats_l2_write_buffer_iter=0i,arcstats_l2_write_buffer_list_iter=0i,arcstats_l2_write_buffer_list_null_iter=0i,arcstats_l2_write_bytes=0i,arcstats_l2_write_full=0i,arcstats_l2_write_in_l2=0i,arcstats_l2_write_io_in_progress=0i,arcstats_l2_write_not_cacheable=380i,arcstats_l2_write_passed_headroom=0i,arcstats_l2_write_pios=0i,arcstats_l2_write_spa_mismatch=0i,arcstats_l2_write_trylock_fail=0i,arcstats_l2_writes_done=0i,arcstats_l2_writes_error=0i,arcstats_l2_writes_lock_retry=0i,arcstats_l2_writes_sent=0i,arcstats_memory_throttle_count=0i,arcstats_metadata_size=17014784i,arcstats_mfu_evictable_data=0i,arcstats_mfu_evictable_metadata=16384i,arcstats_mfu_ghost_evictable_data=5723648i,arcstats_mfu_ghost_evictable_metadata=10709504i,arcstats_mfu_ghost_hits=1315619i,arcstats_mfu_ghost_size=16433152i,arcstats_mfu_hits=7646611i,arcstats_mfu_size=305152i,arcstats_misses=3676993i,arcstats_mru_evictable_data=0i,arcstats_mru_evictable_metadata=0i,arcstats_mru_ghost_evictable_data=0i,arcstats_mru_ghost_evictable_metadata=80896i,arcstats_mru_ghost_hits=324250i,arcstats_mru_ghost_size=80896i,arcstats_mru_hits=8844526i,arcstats_mru_size=16693248i,arcstats_mutex_miss=354023i,arcstats_other_size=34397040i,arcstats_p=4172800i,arcstats_prefetch_data_hits=0i,arcstats_prefetch_data_misses=0i,arcstats_prefetch_metadata_hits=24641i,arcstats_prefetch_metadata_misses=3974i,arcstats_size=51977456i,arcstats_sync_wait_for_async=0i,vdev_cache_stats_delegations=779i,vdev_cache_stats_hits=323123i,vdev_cache_stats_misses=59929i,zfetchstats_hits=0i,zfetchstats_max_streams=0i,zfetchstats_misses=0i 1464473103634124908
```
### Description
```
arcstats_hits
Total amount of cache hits in the arc.
A short description for some of the metrics.
arcstats_misses
Total amount of cache misses in the arc.
#### Arc Stats
arcstats_demand_data_hits
Amount of cache hits for demand data, this is what matters (is good) for your application/share.
`arcstats_hits` Total amount of cache hits in the arc.
arcstats_demand_data_misses
Amount of cache misses for demand data, this is what matters (is bad) for your application/share.
`arcstats_misses` Total amount of cache misses in the arc.
arcstats_demand_metadata_hits
Ammount of cache hits for demand metadata, this matters (is good) for getting filesystem data (ls,find,…)
`arcstats_demand_data_hits` Amount of cache hits for demand data, this is what matters (is good) for your application/share.
arcstats_demand_metadata_misses
Ammount of cache misses for demand metadata, this matters (is bad) for getting filesystem data (ls,find,…)
`arcstats_demand_data_misses` Amount of cache misses for demand data, this is what matters (is bad) for your application/share.
arcstats_prefetch_data_hits
The zfs prefetcher tried to prefetch somethin, but it was allready cached (boring)
`arcstats_demand_metadata_hits` Amount of cache hits for demand metadata, this matters (is good) for getting filesystem data (ls,find,…)
arcstats_prefetch_data_misses
The zfs prefetcher prefetched something which was not in the cache (good job, could become a demand hit in the future)
`arcstats_demand_metadata_misses` Amount of cache misses for demand metadata, this matters (is bad) for getting filesystem data (ls,find,…)
arcstats_prefetch_metadata_hits
Same as above, but for metadata
`arcstats_prefetch_data_hits` The zfs prefetcher tried to prefetch something, but it was already cached (boring)
arcstats_prefetch_metadata_misses
Same as above, but for metadata
`arcstats_prefetch_data_misses` The zfs prefetcher prefetched something which was not in the cache (good job, could become a demand hit in the future)
arcstats_mru_hits
Cache hit in the “most recently used cache”, we move this to the mfu cache.
`arcstats_prefetch_metadata_hits` Same as above, but for metadata
arcstats_mru_ghost_hits
Cache hit in the “most recently used ghost list” we had this item in the cache, but evicted it, maybe we should increase the mru cache size.
`arcstats_prefetch_metadata_misses` Same as above, but for metadata
arcstats_mfu_hits
Cache hit in the “most freqently used cache” we move this to the begining of the mfu cache.
`arcstats_mru_hits` Cache hit in the “most recently used cache”, we move this to the mfu cache.
arcstats_mfu_ghost_hits
Cache hit in the “most frequently used ghost list” we had this item in the cache, but evicted it, maybe we should increase the mfu cache size.
`arcstats_mru_ghost_hits` Cache hit in the “most recently used ghost list” we had this item in the cache, but evicted it, maybe we should increase the mru cache size.
arcstats_allocated
New data is written to the cache.
`arcstats_mfu_hits` Cache hit in the “most frequently used cache” we move this to the beginning of the mfu cache.
arcstats_deleted
Old data is evicted (deleted) from the cache.
`arcstats_mfu_ghost_hits` Cache hit in the “most frequently used ghost list” we had this item in the cache, but evicted it, maybe we should increase the mfu cache size.
arcstats_evict_l2_cached
We evicted something from the arc, but its still cached in the l2 if we need it.
`arcstats_allocated` New data is written to the cache.
arcstats_evict_l2_eligible
We evicted something from the arc, and its not in the l2 this is sad. (maybe we hadnt had enough time to store it there)
`arcstats_deleted` Old data is evicted (deleted) from the cache.
arcstats_evict_l2_ineligible
We evicted something which cannot be stored in the l2.
`arcstats_evict_l2_cached` We evicted something from the arc, but its still cached in the l2 if we need it.
`arcstats_evict_l2_eligible` We evicted something from the arc, and its not in the l2 this is sad. (maybe we hadnt had enough time to store it there)
`arcstats_evict_l2_ineligible` We evicted something which cannot be stored in the l2.
Reasons could be:
We have multiple pools, we evicted something from a pool whithot an l2 device.
The zfs property secondarycache.
- We have multiple pools, we evicted something from a pool whithout an l2 device.
- The zfs property secondary cache.
arcstats_c
Arc target size, this is the size the system thinks the arc should have.
`arcstats_c` Arc target size, this is the size the system thinks the arc should have.
arcstats_size
Total size of the arc.
`arcstats_size` Total size of the arc.
arcstats_l2_hits
Hits to the L2 cache. (It was not in the arc, but in the l2 cache)
`arcstats_l2_hits` Hits to the L2 cache. (It was not in the arc, but in the l2 cache)
arcstats_l2_misses
Miss to the L2 cache. (It was not in the arc, and not in the l2 cache)
`arcstats_l2_misses` Miss to the L2 cache. (It was not in the arc, and not in the l2 cache)
arcstats_l2_size
Size of the l2 cache.
`arcstats_l2_size` Size of the l2 cache.
arcstats_l2_hdr_size
Size of the metadata in the arc (ram) used to manage (lookup if someting is in the l2) the l2 cache.
`arcstats_l2_hdr_size` Size of the metadata in the arc (ram) used to manage (lookup if something is in the l2) the l2 cache.
#### Zfetch Stats
`zfetchstats_hits` Counts the number of cache hits, to items which are in the cache because of the prefetcher.
zfetchstats_hits
Counts the number of cache hits, to items wich are in the cache because of the prefetcher.
`zfetchstats_colinear_hits` Counts the number of cache hits, to items which are in the cache because of the prefetcher (prefetched linear reads)
zfetchstats_colinear_hits
Counts the number of cache hits, to items wich are in the cache because of the prefetcher (prefetched linear reads)
`zfetchstats_stride_hits` Counts the number of cache hits, to items which are in the cache because of the prefetcher (prefetched stride reads)
zfetchstats_stride_hits
Counts the number of cache hits, to items wich are in the cache because of the prefetcher (prefetched stride reads)
#### Vdev Cache Stats
`vdev_cache_stats_hits` Hits to the vdev (device level) cache.
vdev_cache_stats_hits
Hits to the vdev (device level) cache.
vdev_cache_stats_misses
Misses to the vdev (device level) cache.
```
# Default config
```
[zfs]
# ZFS kstat path
# If not specified, then default is:
# kstatPath = "/proc/spl/kstat/zfs"
#
# By default, telegraf gather all zfs stats
# If not specified, then default is:
# kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
```
`vdev_cache_stats_misses` Misses to the vdev (device level) cache.

View File

@ -1,38 +1,27 @@
package zfs
import (
"fmt"
"path/filepath"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Sysctl func(metric string) ([]string, error)
type Zpool func() ([]string, error)
type Zfs struct {
KstatPath string
KstatMetrics []string
PoolMetrics bool
}
type poolInfo struct {
name string
ioFilename string
sysctl Sysctl
zpool Zpool
}
var sampleConfig = `
## ZFS kstat path
## ZFS kstat path. Ignored on FreeBSD
## If not specified, then default is:
kstatPath = "/proc/spl/kstat/zfs"
# kstatPath = "/proc/spl/kstat/zfs"
## By default, telegraf gather all zfs stats
## If not specified, then default is:
kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
# kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
## By default, don't gather zpool stats
poolMetrics = false
# poolMetrics = false
`
func (z *Zfs) SampleConfig() string {
@ -40,117 +29,5 @@ func (z *Zfs) SampleConfig() string {
}
func (z *Zfs) Description() string {
return "Read metrics of ZFS from arcstats, zfetchstats and vdev_cache_stats"
}
func getPools(kstatPath string) []poolInfo {
pools := make([]poolInfo, 0)
poolsDirs, _ := filepath.Glob(kstatPath + "/*/io")
for _, poolDir := range poolsDirs {
poolDirSplit := strings.Split(poolDir, "/")
pool := poolDirSplit[len(poolDirSplit)-2]
pools = append(pools, poolInfo{name: pool, ioFilename: poolDir})
}
return pools
}
func getTags(pools []poolInfo) map[string]string {
var poolNames string
for _, pool := range pools {
if len(poolNames) != 0 {
poolNames += "::"
}
poolNames += pool.name
}
return map[string]string{"pools": poolNames}
}
func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error {
lines, err := internal.ReadLines(pool.ioFilename)
if err != nil {
return err
}
if len(lines) != 3 {
return err
}
keys := strings.Fields(lines[1])
values := strings.Fields(lines[2])
keyCount := len(keys)
if keyCount != len(values) {
return fmt.Errorf("Key and value count don't match Keys:%v Values:%v", keys, values)
}
tag := map[string]string{"pool": pool.name}
fields := make(map[string]interface{})
for i := 0; i < keyCount; i++ {
value, err := strconv.ParseInt(values[i], 10, 64)
if err != nil {
return err
}
fields[keys[i]] = value
}
acc.AddFields("zfs_pool", fields, tag)
return nil
}
func (z *Zfs) Gather(acc telegraf.Accumulator) error {
kstatMetrics := z.KstatMetrics
if len(kstatMetrics) == 0 {
kstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"}
}
kstatPath := z.KstatPath
if len(kstatPath) == 0 {
kstatPath = "/proc/spl/kstat/zfs"
}
pools := getPools(kstatPath)
tags := getTags(pools)
if z.PoolMetrics {
for _, pool := range pools {
err := gatherPoolStats(pool, acc)
if err != nil {
return err
}
}
}
fields := make(map[string]interface{})
for _, metric := range kstatMetrics {
lines, err := internal.ReadLines(kstatPath + "/" + metric)
if err != nil {
return err
}
for i, line := range lines {
if i == 0 || i == 1 {
continue
}
if len(line) < 1 {
continue
}
rawData := strings.Split(line, " ")
key := metric + "_" + rawData[0]
rawValue := rawData[len(rawData)-1]
value, _ := strconv.ParseInt(rawValue, 10, 64)
fields[key] = value
}
}
acc.AddFields("zfs", fields, tags)
return nil
}
func init() {
inputs.Add("zfs", func() telegraf.Input {
return &Zfs{}
})
return "Read metrics of ZFS from arcstats, zfetchstats, vdev_cache_stats, and pools"
}

View File

@ -0,0 +1,140 @@
// +build freebsd
package zfs
import (
"bytes"
"fmt"
"os/exec"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
func (z *Zfs) gatherPoolStats(acc telegraf.Accumulator) (string, error) {
lines, err := z.zpool()
if err != nil {
return "", err
}
pools := []string{}
for _, line := range lines {
col := strings.Split(line, "\t")
pools = append(pools, col[0])
}
if z.PoolMetrics {
for _, line := range lines {
col := strings.Split(line, "\t")
tags := map[string]string{"pool": col[0], "health": col[8]}
fields := map[string]interface{}{}
size, err := strconv.ParseInt(col[1], 10, 64)
if err != nil {
return "", fmt.Errorf("Error parsing size: %s", err)
}
fields["size"] = size
alloc, err := strconv.ParseInt(col[2], 10, 64)
if err != nil {
return "", fmt.Errorf("Error parsing allocation: %s", err)
}
fields["allocated"] = alloc
free, err := strconv.ParseInt(col[3], 10, 64)
if err != nil {
return "", fmt.Errorf("Error parsing free: %s", err)
}
fields["free"] = free
frag, err := strconv.ParseInt(strings.TrimSuffix(col[5], "%"), 10, 0)
if err != nil { // This might be - for RO devs
frag = 0
}
fields["fragmentation"] = frag
capval, err := strconv.ParseInt(col[6], 10, 0)
if err != nil {
return "", fmt.Errorf("Error parsing capacity: %s", err)
}
fields["capacity"] = capval
dedup, err := strconv.ParseFloat(strings.TrimSuffix(col[7], "x"), 32)
if err != nil {
return "", fmt.Errorf("Error parsing dedupratio: %s", err)
}
fields["dedupratio"] = dedup
acc.AddFields("zfs_pool", fields, tags)
}
}
return strings.Join(pools, "::"), nil
}
func (z *Zfs) Gather(acc telegraf.Accumulator) error {
kstatMetrics := z.KstatMetrics
if len(kstatMetrics) == 0 {
kstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"}
}
tags := map[string]string{}
poolNames, err := z.gatherPoolStats(acc)
if err != nil {
return err
}
tags["pools"] = poolNames
fields := make(map[string]interface{})
for _, metric := range kstatMetrics {
stdout, err := z.sysctl(metric)
if err != nil {
return err
}
for _, line := range stdout {
rawData := strings.Split(line, ": ")
key := metric + "_" + strings.Split(rawData[0], ".")[4]
value, _ := strconv.ParseInt(rawData[1], 10, 64)
fields[key] = value
}
}
acc.AddFields("zfs", fields, tags)
return nil
}
func run(command string, args ...string) ([]string, error) {
cmd := exec.Command(command, args...)
var outbuf, errbuf bytes.Buffer
cmd.Stdout = &outbuf
cmd.Stderr = &errbuf
err := cmd.Run()
stdout := strings.TrimSpace(outbuf.String())
stderr := strings.TrimSpace(errbuf.String())
if _, ok := err.(*exec.ExitError); ok {
return nil, fmt.Errorf("%s error: %s", command, stderr)
}
return strings.Split(stdout, "\n"), nil
}
func zpool() ([]string, error) {
return run("zpool", []string{"list", "-Hp"}...)
}
func sysctl(metric string) ([]string, error) {
return run("sysctl", []string{"-q", fmt.Sprintf("kstat.zfs.misc.%s", metric)}...)
}
func init() {
inputs.Add("zfs", func() telegraf.Input {
return &Zfs{
sysctl: sysctl,
zpool: zpool,
}
})
}

View File

@ -0,0 +1,148 @@
// +build freebsd
package zfs
import (
"fmt"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
// $ zpool list -Hp
var zpool_output = []string{
"freenas-boot 30601641984 2022177280 28579464704 - - 6 1.00x ONLINE -",
"red1 8933531975680 1126164848640 7807367127040 - 8% 12 1.83x ONLINE /mnt",
"temp1 2989297238016 1626309320704 1362987917312 - 38% 54 1.28x ONLINE /mnt",
"temp2 2989297238016 626958278656 2362338959360 - 12% 20 1.00x ONLINE /mnt",
}
func mock_zpool() ([]string, error) {
return zpool_output, nil
}
// sysctl -q kstat.zfs.misc.arcstats
// sysctl -q kstat.zfs.misc.vdev_cache_stats
var kstat_vdev_cache_stats_output = []string{
"kstat.zfs.misc.vdev_cache_stats.misses: 87789",
"kstat.zfs.misc.vdev_cache_stats.hits: 465583",
"kstat.zfs.misc.vdev_cache_stats.delegations: 6952",
}
// sysctl -q kstat.zfs.misc.zfetchstats
var kstat_zfetchstats_output = []string{
"kstat.zfs.misc.zfetchstats.max_streams: 0",
"kstat.zfs.misc.zfetchstats.misses: 0",
"kstat.zfs.misc.zfetchstats.hits: 0",
}
func mock_sysctl(metric string) ([]string, error) {
if metric == "vdev_cache_stats" {
return kstat_vdev_cache_stats_output, nil
}
if metric == "zfetchstats" {
return kstat_zfetchstats_output, nil
}
return []string{}, fmt.Errorf("Invalid arg")
}
func TestZfsPoolMetrics(t *testing.T) {
var acc testutil.Accumulator
z := &Zfs{
KstatMetrics: []string{"vdev_cache_stats"},
sysctl: mock_sysctl,
zpool: mock_zpool,
}
err := z.Gather(&acc)
require.NoError(t, err)
require.False(t, acc.HasMeasurement("zfs_pool"))
acc.Metrics = nil
z = &Zfs{
KstatMetrics: []string{"vdev_cache_stats"},
PoolMetrics: true,
sysctl: mock_sysctl,
zpool: mock_zpool,
}
err = z.Gather(&acc)
require.NoError(t, err)
//one pool, all metrics
tags := map[string]string{
"pool": "freenas-boot",
"health": "ONLINE",
}
poolMetrics := getFreeNasBootPoolMetrics()
acc.AssertContainsTaggedFields(t, "zfs_pool", poolMetrics, tags)
}
func TestZfsGeneratesMetrics(t *testing.T) {
var acc testutil.Accumulator
z := &Zfs{
KstatMetrics: []string{"vdev_cache_stats"},
sysctl: mock_sysctl,
zpool: mock_zpool,
}
err := z.Gather(&acc)
require.NoError(t, err)
//four pool, vdev_cache_stats metrics
tags := map[string]string{
"pools": "freenas-boot::red1::temp1::temp2",
}
intMetrics := getKstatMetricsVdevOnly()
acc.AssertContainsTaggedFields(t, "zfs", intMetrics, tags)
acc.Metrics = nil
z = &Zfs{
KstatMetrics: []string{"zfetchstats", "vdev_cache_stats"},
sysctl: mock_sysctl,
zpool: mock_zpool,
}
err = z.Gather(&acc)
require.NoError(t, err)
//four pool, vdev_cache_stats and zfetchstatus metrics
intMetrics = getKstatMetricsVdevAndZfetch()
acc.AssertContainsTaggedFields(t, "zfs", intMetrics, tags)
}
func getFreeNasBootPoolMetrics() map[string]interface{} {
return map[string]interface{}{
"allocated": int64(2022177280),
"capacity": int64(6),
"dedupratio": float64(1),
"free": int64(28579464704),
"size": int64(30601641984),
"fragmentation": int64(0),
}
}
func getKstatMetricsVdevOnly() map[string]interface{} {
return map[string]interface{}{
"vdev_cache_stats_misses": int64(87789),
"vdev_cache_stats_hits": int64(465583),
"vdev_cache_stats_delegations": int64(6952),
}
}
func getKstatMetricsVdevAndZfetch() map[string]interface{} {
return map[string]interface{}{
"vdev_cache_stats_misses": int64(87789),
"vdev_cache_stats_hits": int64(465583),
"vdev_cache_stats_delegations": int64(6952),
"zfetchstats_max_streams": int64(0),
"zfetchstats_misses": int64(0),
"zfetchstats_hits": int64(0),
}
}

View File

@ -0,0 +1,131 @@
// +build linux
package zfs
import (
"fmt"
"path/filepath"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
type poolInfo struct {
name string
ioFilename string
}
func getPools(kstatPath string) []poolInfo {
pools := make([]poolInfo, 0)
poolsDirs, _ := filepath.Glob(kstatPath + "/*/io")
for _, poolDir := range poolsDirs {
poolDirSplit := strings.Split(poolDir, "/")
pool := poolDirSplit[len(poolDirSplit)-2]
pools = append(pools, poolInfo{name: pool, ioFilename: poolDir})
}
return pools
}
func getTags(pools []poolInfo) map[string]string {
var poolNames string
for _, pool := range pools {
if len(poolNames) != 0 {
poolNames += "::"
}
poolNames += pool.name
}
return map[string]string{"pools": poolNames}
}
func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error {
lines, err := internal.ReadLines(pool.ioFilename)
if err != nil {
return err
}
if len(lines) != 3 {
return err
}
keys := strings.Fields(lines[1])
values := strings.Fields(lines[2])
keyCount := len(keys)
if keyCount != len(values) {
return fmt.Errorf("Key and value count don't match Keys:%v Values:%v", keys, values)
}
tag := map[string]string{"pool": pool.name}
fields := make(map[string]interface{})
for i := 0; i < keyCount; i++ {
value, err := strconv.ParseInt(values[i], 10, 64)
if err != nil {
return err
}
fields[keys[i]] = value
}
acc.AddFields("zfs_pool", fields, tag)
return nil
}
func (z *Zfs) Gather(acc telegraf.Accumulator) error {
kstatMetrics := z.KstatMetrics
if len(kstatMetrics) == 0 {
kstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"}
}
kstatPath := z.KstatPath
if len(kstatPath) == 0 {
kstatPath = "/proc/spl/kstat/zfs"
}
pools := getPools(kstatPath)
tags := getTags(pools)
if z.PoolMetrics {
for _, pool := range pools {
err := gatherPoolStats(pool, acc)
if err != nil {
return err
}
}
}
fields := make(map[string]interface{})
for _, metric := range kstatMetrics {
lines, err := internal.ReadLines(kstatPath + "/" + metric)
if err != nil {
return err
}
for i, line := range lines {
if i == 0 || i == 1 {
continue
}
if len(line) < 1 {
continue
}
rawData := strings.Split(line, " ")
key := metric + "_" + rawData[0]
rawValue := rawData[len(rawData)-1]
value, _ := strconv.ParseInt(rawValue, 10, 64)
fields[key] = value
}
}
acc.AddFields("zfs", fields, tags)
return nil
}
func init() {
inputs.Add("zfs", func() telegraf.Input {
return &Zfs{}
})
}

View File

@ -1,3 +1,5 @@
// +build linux
package zfs
import (

View File

@ -0,0 +1,18 @@
// +build !linux,!freebsd
package zfs
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
func (z *Zfs) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("zfs", func() telegraf.Input {
return &Zfs{}
})
}

View File

@ -7,6 +7,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"

View File

@ -0,0 +1,14 @@
# Graylog Output Plugin
This plugin writes to a Graylog instance using the "gelf" format.
It requires a `servers` name.
### Configuration:
```toml
# Send telegraf metrics to graylog(s)
[[outputs.graylog]]
## Udp endpoint for your graylog instance.
servers = ["127.0.0.1:12201", "192.168.1.1:12201"]
```

View File

@ -0,0 +1,247 @@
package graylog
import (
"bytes"
"compress/zlib"
"crypto/rand"
"encoding/binary"
ejson "encoding/json"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"io"
"math"
"net"
"os"
)
const (
defaultGraylogEndpoint = "127.0.0.1:12201"
defaultConnection = "wan"
defaultMaxChunkSizeWan = 1420
defaultMaxChunkSizeLan = 8154
)
type GelfConfig struct {
GraylogEndpoint string
Connection string
MaxChunkSizeWan int
MaxChunkSizeLan int
}
type Gelf struct {
GelfConfig
}
func NewGelfWriter(config GelfConfig) *Gelf {
if config.GraylogEndpoint == "" {
config.GraylogEndpoint = defaultGraylogEndpoint
}
if config.Connection == "" {
config.Connection = defaultConnection
}
if config.MaxChunkSizeWan == 0 {
config.MaxChunkSizeWan = defaultMaxChunkSizeWan
}
if config.MaxChunkSizeLan == 0 {
config.MaxChunkSizeLan = defaultMaxChunkSizeLan
}
g := &Gelf{GelfConfig: config}
return g
}
func (g *Gelf) Write(message []byte) (n int, err error) {
compressed := g.compress(message)
chunksize := g.GelfConfig.MaxChunkSizeWan
length := compressed.Len()
if length > chunksize {
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
id := make([]byte, 8)
rand.Read(id)
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
_, err = g.send(packet.Bytes())
if err != nil {
return 0, err
}
}
} else {
_, err = g.send(compressed.Bytes())
if err != nil {
return 0, err
}
}
n = len(message)
return
}
func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer {
var packet bytes.Buffer
chunksize := g.getChunksize()
packet.Write(g.intToBytes(30))
packet.Write(g.intToBytes(15))
packet.Write(id)
packet.Write(g.intToBytes(index))
packet.Write(g.intToBytes(chunkCountInt))
packet.Write(compressed.Next(chunksize))
return packet
}
func (g *Gelf) getChunksize() int {
if g.GelfConfig.Connection == "wan" {
return g.GelfConfig.MaxChunkSizeWan
}
if g.GelfConfig.Connection == "lan" {
return g.GelfConfig.MaxChunkSizeLan
}
return g.GelfConfig.MaxChunkSizeWan
}
func (g *Gelf) intToBytes(i int) []byte {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, int8(i))
return buf.Bytes()
}
func (g *Gelf) compress(b []byte) bytes.Buffer {
var buf bytes.Buffer
comp := zlib.NewWriter(&buf)
comp.Write(b)
comp.Close()
return buf
}
func (g *Gelf) send(b []byte) (n int, err error) {
udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint)
if err != nil {
return
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return
}
n, err = conn.Write(b)
return
}
type Graylog struct {
Servers []string
writer io.Writer
}
var sampleConfig = `
## Udp endpoint for your graylog instance.
servers = ["127.0.0.1:12201", "192.168.1.1:12201"]
`
func (g *Graylog) Connect() error {
writers := []io.Writer{}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:12201")
}
for _, server := range g.Servers {
w := NewGelfWriter(GelfConfig{GraylogEndpoint: server})
writers = append(writers, w)
}
g.writer = io.MultiWriter(writers...)
return nil
}
func (g *Graylog) Close() error {
return nil
}
func (g *Graylog) SampleConfig() string {
return sampleConfig
}
func (g *Graylog) Description() string {
return "Send telegraf metrics to graylog(s)"
}
func (g *Graylog) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
values, err := serialize(metric)
if err != nil {
return err
}
for _, value := range values {
_, err := g.writer.Write([]byte(value))
if err != nil {
return fmt.Errorf("FAILED to write message: %s, %s", value, err)
}
}
}
return nil
}
func serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
m := make(map[string]interface{})
m["version"] = "1.1"
m["timestamp"] = metric.UnixNano() / 1000000000
m["short_message"] = " "
m["name"] = metric.Name()
if host, ok := metric.Tags()["host"]; ok {
m["host"] = host
} else {
host, err := os.Hostname()
if err != nil {
return []string{}, err
}
m["host"] = host
}
for key, value := range metric.Fields() {
nkey := fmt.Sprintf("_%s", key)
m[nkey] = value
}
serialized, err := ejson.Marshal(m)
if err != nil {
return []string{}, err
}
out = append(out, string(serialized))
return out, nil
}
func init() {
outputs.Add("graylog", func() telegraf.Output {
return &Graylog{}
})
}

View File

@ -0,0 +1,58 @@
package graylog
import (
"bytes"
"compress/zlib"
"encoding/json"
"io"
"net"
"sync"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestWrite(t *testing.T) {
var wg sync.WaitGroup
var wg2 sync.WaitGroup
wg.Add(1)
wg2.Add(1)
go UDPServer(t, &wg, &wg2)
wg2.Wait()
i := Graylog{
Servers: []string{"127.0.0.1:12201"},
}
i.Connect()
metrics := testutil.MockMetrics()
i.Write(metrics)
wg.Wait()
i.Close()
}
type GelfObject map[string]interface{}
func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) {
serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201")
udpServer, _ := net.ListenUDP("udp", serverAddr)
defer wg.Done()
bufR := make([]byte, 1024)
wg2.Done()
n, _, _ := udpServer.ReadFromUDP(bufR)
b := bytes.NewReader(bufR[0:n])
r, _ := zlib.NewReader(b)
bufW := bytes.NewBuffer(nil)
io.Copy(bufW, r)
r.Close()
var obj GelfObject
json.Unmarshal(bufW.Bytes(), &obj)
assert.Equal(t, obj["_value"], float64(1))
}

View File

@ -85,7 +85,7 @@ targets = {
supported_builds = {
"darwin": [ "amd64" ],
"windows": [ "amd64" ],
"linux": [ "amd64", "i386", "armhf", "armel", "arm64" ],
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
"freebsd": [ "amd64" ]
}
@ -553,7 +553,7 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
build_root = os.path.join(tmp_build_dir,
platform,
arch,
'{}-{}-{}'.format(PACKAGE_NAME, version, iteration))
PACKAGE_NAME)
os.makedirs(build_root)
# Copy packaging scripts to build directory