This commit is contained in:
ncohensm 2016-06-24 08:13:31 -07:00
commit 5866acd48d
41 changed files with 1205 additions and 29 deletions

View File

@ -32,11 +32,16 @@ should now look like:
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
- [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests.
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. - [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin.
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
### Bugfixes ### Bugfixes
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]
@ -56,6 +61,7 @@ should now look like:
- [#1335](https://github.com/influxdata/telegraf/issues/1335): Fix overall ping timeout to be calculated based on per-ping timeout. - [#1335](https://github.com/influxdata/telegraf/issues/1335): Fix overall ping timeout to be calculated based on per-ping timeout.
- [#1374](https://github.com/influxdata/telegraf/pull/1374): Change "default" retention policy to "". - [#1374](https://github.com/influxdata/telegraf/pull/1374): Change "default" retention policy to "".
- [#1377](https://github.com/influxdata/telegraf/issues/1377): Graphite output mangling '%' character. - [#1377](https://github.com/influxdata/telegraf/issues/1377): Graphite output mangling '%' character.
- [#1396](https://github.com/influxdata/telegraf/pull/1396): Prometheus input plugin now supports x509 certs authentication
## v1.0 beta 1 [2016-06-07] ## v1.0 beta 1 [2016-06-07]

View File

@ -221,6 +221,9 @@ Telegraf can also collect metrics via the following service plugins:
* [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks)
* [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API. want to add support for another service or third-party API.

View File

@ -526,6 +526,19 @@
# socket_suffix = "asok" # socket_suffix = "asok"
# # Read specific statistics per cgroup
# [[inputs.cgroup]]
# ## Directories in which to look for files, globs are supported.
# # paths = [
# # "/cgroup/memory",
# # "/cgroup/memory/child1",
# # "/cgroup/memory/child2/*",
# # ]
# ## cgroup stat fields, as file names, globs are supported.
# ## these file names are appended to each path from above.
# # files = ["memory.*usage*", "memory.limit_in_bytes"]
# # Pull Metric Statistics from Amazon CloudWatch # # Pull Metric Statistics from Amazon CloudWatch
# [[inputs.cloudwatch]] # [[inputs.cloudwatch]]
# ## Amazon Region # ## Amazon Region
@ -679,6 +692,13 @@
# #
# ## set cluster_health to true when you want to also obtain cluster level stats # ## set cluster_health to true when you want to also obtain cluster level stats
# cluster_health = false # cluster_health = false
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
# # Read metrics from one or more commands that can output to stdout # # Read metrics from one or more commands that can output to stdout
@ -1259,10 +1279,15 @@
# ## An array of urls to scrape metrics from. # ## An array of urls to scrape metrics from.
# urls = ["http://localhost:9100/metrics"] # urls = ["http://localhost:9100/metrics"]
# #
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
# ## Use bearer token for authorization # ## Use bearer token for authorization
# # bearer_token = /path/to/bearer/token # # bearer_token = /path/to/bearer/token
#
# ## Optional SSL Config
# # ssl_ca = /path/to/cafile
# # ssl_cert = /path/to/certfile
# # ssl_key = /path/to/keyfile
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
# # Reads last_run_summary.yaml file and converts to measurments # # Reads last_run_summary.yaml file and converts to measurments

View File

@ -539,6 +539,13 @@ func (c *Config) LoadConfig(path string) error {
return nil return nil
} }
// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatability only.
// see https://github.com/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}
// parseFile loads a TOML configuration from a provided path and // parseFile loads a TOML configuration from a provided path and
// returns the AST produced from the TOML parser. When loading the file, it // returns the AST produced from the TOML parser. When loading the file, it
// will find environment variables and replace them. // will find environment variables and replace them.
@ -547,6 +554,8 @@ func parseFile(fpath string) (*ast.Table, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// ugh windows why
contents = trimBOM(contents)
env_vars := envVarRe.FindAll(contents, -1) env_vars := envVarRe.FindAll(contents, -1)
for _, env_var := range env_vars { for _, env_var := range env_vars {

View File

@ -6,6 +6,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
_ "github.com/influxdata/telegraf/plugins/inputs/ceph" _ "github.com/influxdata/telegraf/plugins/inputs/ceph"
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
_ "github.com/influxdata/telegraf/plugins/inputs/chrony" _ "github.com/influxdata/telegraf/plugins/inputs/chrony"
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/inputs/conntrack" _ "github.com/influxdata/telegraf/plugins/inputs/conntrack"
@ -41,6 +42,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq" _ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nstat" _ "github.com/influxdata/telegraf/plugins/inputs/nstat"
_ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/ntpq"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger" _ "github.com/influxdata/telegraf/plugins/inputs/passenger"

View File

@ -0,0 +1,58 @@
# CGroup Input Plugin For Telegraf Agent
This input plugin will capture specific statistics per cgroup.
Following file formats are supported:
* Single value
```
VAL\n
```
* New line separated values
```
VAL0\n
VAL1\n
```
* Space separated values
```
VAL0 VAL1 ...\n
```
* New line separated key-space-value's
```
KEY0 VAL0\n
KEY1 VAL1\n
```
### Tags:
All measurements have the following tags:
- path
### Configuration:
```
# [[inputs.cgroup]]
# paths = [
# "/cgroup/memory", # root cgroup
# "/cgroup/memory/child1", # container cgroup
# "/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself
# ]
# files = ["memory.*usage*", "memory.limit_in_bytes"]
# [[inputs.cgroup]]
# paths = [
# "/cgroup/cpu", # root cgroup
# "/cgroup/cpu/*", # all container cgroups
# "/cgroup/cpu/*/*", # all children cgroups under each container cgroup
# ]
# files = ["cpuacct.usage", "cpu.cfs_period_us", "cpu.cfs_quota_us"]
```

View File

@ -0,0 +1,35 @@
package cgroup
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type CGroup struct {
Paths []string `toml:"paths"`
Files []string `toml:"files"`
}
var sampleConfig = `
## Directories in which to look for files, globs are supported.
# paths = [
# "/cgroup/memory",
# "/cgroup/memory/child1",
# "/cgroup/memory/child2/*",
# ]
## cgroup stat fields, as file names, globs are supported.
## these file names are appended to each path from above.
# files = ["memory.*usage*", "memory.limit_in_bytes"]
`
func (g *CGroup) SampleConfig() string {
return sampleConfig
}
func (g *CGroup) Description() string {
return "Read specific statistics per cgroup"
}
func init() {
inputs.Add("cgroup", func() telegraf.Input { return &CGroup{} })
}

View File

@ -0,0 +1,244 @@
// +build linux
package cgroup
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"github.com/influxdata/telegraf"
)
const metricName = "cgroup"
func (g *CGroup) Gather(acc telegraf.Accumulator) error {
list := make(chan pathInfo)
go g.generateDirs(list)
for dir := range list {
if dir.err != nil {
return dir.err
}
if err := g.gatherDir(dir.path, acc); err != nil {
return err
}
}
return nil
}
func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error {
fields := make(map[string]interface{})
list := make(chan pathInfo)
go g.generateFiles(dir, list)
for file := range list {
if file.err != nil {
return file.err
}
raw, err := ioutil.ReadFile(file.path)
if err != nil {
return err
}
if len(raw) == 0 {
continue
}
fd := fileData{data: raw, path: file.path}
if err := fd.parse(fields); err != nil {
return err
}
}
tags := map[string]string{"path": dir}
acc.AddFields(metricName, fields, tags)
return nil
}
// ======================================================================
type pathInfo struct {
path string
err error
}
func isDir(path string) (bool, error) {
result, err := os.Stat(path)
if err != nil {
return false, err
}
return result.IsDir(), nil
}
func (g *CGroup) generateDirs(list chan<- pathInfo) {
for _, dir := range g.Paths {
// getting all dirs that match the pattern 'dir'
items, err := filepath.Glob(dir)
if err != nil {
list <- pathInfo{err: err}
return
}
for _, item := range items {
ok, err := isDir(item)
if err != nil {
list <- pathInfo{err: err}
return
}
// supply only dirs
if ok {
list <- pathInfo{path: item}
}
}
}
close(list)
}
func (g *CGroup) generateFiles(dir string, list chan<- pathInfo) {
for _, file := range g.Files {
// getting all file paths that match the pattern 'dir + file'
// path.Base make sure that file variable does not contains part of path
items, err := filepath.Glob(path.Join(dir, path.Base(file)))
if err != nil {
list <- pathInfo{err: err}
return
}
for _, item := range items {
ok, err := isDir(item)
if err != nil {
list <- pathInfo{err: err}
return
}
// supply only files not dirs
if !ok {
list <- pathInfo{path: item}
}
}
}
close(list)
}
// ======================================================================
type fileData struct {
data []byte
path string
}
func (fd *fileData) format() (*fileFormat, error) {
for _, ff := range fileFormats {
ok, err := ff.match(fd.data)
if err != nil {
return nil, err
}
if ok {
return &ff, nil
}
}
return nil, fmt.Errorf("%v: unknown file format", fd.path)
}
func (fd *fileData) parse(fields map[string]interface{}) error {
format, err := fd.format()
if err != nil {
return err
}
format.parser(filepath.Base(fd.path), fields, fd.data)
return nil
}
// ======================================================================
type fileFormat struct {
name string
pattern string
parser func(measurement string, fields map[string]interface{}, b []byte)
}
const keyPattern = "[[:alpha:]_]+"
const valuePattern = "[\\d-]+"
var fileFormats = [...]fileFormat{
// VAL\n
fileFormat{
name: "Single value",
pattern: "^" + valuePattern + "\n$",
parser: func(measurement string, fields map[string]interface{}, b []byte) {
re := regexp.MustCompile("^(" + valuePattern + ")\n$")
matches := re.FindAllStringSubmatch(string(b), -1)
fields[measurement] = numberOrString(matches[0][1])
},
},
// VAL0\n
// VAL1\n
// ...
fileFormat{
name: "New line separated values",
pattern: "^(" + valuePattern + "\n){2,}$",
parser: func(measurement string, fields map[string]interface{}, b []byte) {
re := regexp.MustCompile("(" + valuePattern + ")\n")
matches := re.FindAllStringSubmatch(string(b), -1)
for i, v := range matches {
fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1])
}
},
},
// VAL0 VAL1 ...\n
fileFormat{
name: "Space separated values",
pattern: "^(" + valuePattern + " )+\n$",
parser: func(measurement string, fields map[string]interface{}, b []byte) {
re := regexp.MustCompile("(" + valuePattern + ") ")
matches := re.FindAllStringSubmatch(string(b), -1)
for i, v := range matches {
fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1])
}
},
},
// KEY0 VAL0\n
// KEY1 VAL1\n
// ...
fileFormat{
name: "New line separated key-space-value's",
pattern: "^(" + keyPattern + " " + valuePattern + "\n)+$",
parser: func(measurement string, fields map[string]interface{}, b []byte) {
re := regexp.MustCompile("(" + keyPattern + ") (" + valuePattern + ")\n")
matches := re.FindAllStringSubmatch(string(b), -1)
for _, v := range matches {
fields[measurement+"."+v[1]] = numberOrString(v[2])
}
},
},
}
func numberOrString(s string) interface{} {
i, err := strconv.Atoi(s)
if err == nil {
return i
}
return s
}
func (f fileFormat) match(b []byte) (bool, error) {
ok, err := regexp.Match(f.pattern, b)
if err != nil {
return false, err
}
if ok {
return true, nil
}
return false, nil
}

View File

@ -0,0 +1,11 @@
// +build !linux
package cgroup
import (
"github.com/influxdata/telegraf"
)
func (g *CGroup) Gather(acc telegraf.Accumulator) error {
return nil
}

View File

@ -0,0 +1,182 @@
// +build linux
package cgroup
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var cg1 = &CGroup{
Paths: []string{"testdata/memory"},
Files: []string{
"memory.empty",
"memory.max_usage_in_bytes",
"memory.limit_in_bytes",
"memory.stat",
"memory.use_hierarchy",
"notify_on_release",
},
}
func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator
err := cg1.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.stat.cache": 1739362304123123123,
"memory.stat.rss": 1775325184,
"memory.stat.rss_huge": 778043392,
"memory.stat.mapped_file": 421036032,
"memory.stat.dirty": -307200,
"memory.max_usage_in_bytes.0": 0,
"memory.max_usage_in_bytes.1": -1,
"memory.max_usage_in_bytes.2": 2,
"memory.limit_in_bytes": 223372036854771712,
"memory.use_hierarchy": "12-781",
"notify_on_release": 0,
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
// ======================================================================
var cg2 = &CGroup{
Paths: []string{"testdata/cpu"},
Files: []string{"cpuacct.usage_percpu"},
}
func TestCgroupStatistics_2(t *testing.T) {
var acc testutil.Accumulator
err := cg2.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/cpu",
}
fields := map[string]interface{}{
"cpuacct.usage_percpu.0": -1452543795404,
"cpuacct.usage_percpu.1": 1376681271659,
"cpuacct.usage_percpu.2": 1450950799997,
"cpuacct.usage_percpu.3": -1473113374257,
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
// ======================================================================
var cg3 = &CGroup{
Paths: []string{"testdata/memory/*"},
Files: []string{"memory.limit_in_bytes"},
}
func TestCgroupStatistics_3(t *testing.T) {
var acc testutil.Accumulator
err := cg3.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
"path": "testdata/memory/group_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
// ======================================================================
var cg4 = &CGroup{
Paths: []string{"testdata/memory/*/*", "testdata/memory/group_2"},
Files: []string{"memory.limit_in_bytes"},
}
func TestCgroupStatistics_4(t *testing.T) {
var acc testutil.Accumulator
err := cg4.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
"path": "testdata/memory/group_1/group_1_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
"path": "testdata/memory/group_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
// ======================================================================
var cg5 = &CGroup{
Paths: []string{"testdata/memory/*/group_1_1"},
Files: []string{"memory.limit_in_bytes"},
}
func TestCgroupStatistics_5(t *testing.T) {
var acc testutil.Accumulator
err := cg5.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
"path": "testdata/memory/group_2/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
// ======================================================================
var cg6 = &CGroup{
Paths: []string{"testdata/memory"},
Files: []string{"memory.us*", "*/memory.kmem.*"},
}
func TestCgroupStatistics_6(t *testing.T) {
var acc testutil.Accumulator
err := cg6.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.usage_in_bytes": 3513667584,
"memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": 9223372036854771712,
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}

View File

@ -0,0 +1 @@
Total 0

View File

@ -0,0 +1,131 @@
11:0 Read 0
11:0 Write 0
11:0 Sync 0
11:0 Async 0
11:0 Total 0
8:0 Read 49134
8:0 Write 216703
8:0 Sync 177906
8:0 Async 87931
8:0 Total 265837
7:7 Read 0
7:7 Write 0
7:7 Sync 0
7:7 Async 0
7:7 Total 0
7:6 Read 0
7:6 Write 0
7:6 Sync 0
7:6 Async 0
7:6 Total 0
7:5 Read 0
7:5 Write 0
7:5 Sync 0
7:5 Async 0
7:5 Total 0
7:4 Read 0
7:4 Write 0
7:4 Sync 0
7:4 Async 0
7:4 Total 0
7:3 Read 0
7:3 Write 0
7:3 Sync 0
7:3 Async 0
7:3 Total 0
7:2 Read 0
7:2 Write 0
7:2 Sync 0
7:2 Async 0
7:2 Total 0
7:1 Read 0
7:1 Write 0
7:1 Sync 0
7:1 Async 0
7:1 Total 0
7:0 Read 0
7:0 Write 0
7:0 Sync 0
7:0 Async 0
7:0 Total 0
1:15 Read 3
1:15 Write 0
1:15 Sync 0
1:15 Async 3
1:15 Total 3
1:14 Read 3
1:14 Write 0
1:14 Sync 0
1:14 Async 3
1:14 Total 3
1:13 Read 3
1:13 Write 0
1:13 Sync 0
1:13 Async 3
1:13 Total 3
1:12 Read 3
1:12 Write 0
1:12 Sync 0
1:12 Async 3
1:12 Total 3
1:11 Read 3
1:11 Write 0
1:11 Sync 0
1:11 Async 3
1:11 Total 3
1:10 Read 3
1:10 Write 0
1:10 Sync 0
1:10 Async 3
1:10 Total 3
1:9 Read 3
1:9 Write 0
1:9 Sync 0
1:9 Async 3
1:9 Total 3
1:8 Read 3
1:8 Write 0
1:8 Sync 0
1:8 Async 3
1:8 Total 3
1:7 Read 3
1:7 Write 0
1:7 Sync 0
1:7 Async 3
1:7 Total 3
1:6 Read 3
1:6 Write 0
1:6 Sync 0
1:6 Async 3
1:6 Total 3
1:5 Read 3
1:5 Write 0
1:5 Sync 0
1:5 Async 3
1:5 Total 3
1:4 Read 3
1:4 Write 0
1:4 Sync 0
1:4 Async 3
1:4 Total 3
1:3 Read 3
1:3 Write 0
1:3 Sync 0
1:3 Async 3
1:3 Total 3
1:2 Read 3
1:2 Write 0
1:2 Sync 0
1:2 Async 3
1:2 Total 3
1:1 Read 3
1:1 Write 0
1:1 Sync 0
1:1 Async 3
1:1 Total 3
1:0 Read 3
1:0 Write 0
1:0 Sync 0
1:0 Async 3
1:0 Total 3
Total 265885

View File

@ -0,0 +1 @@
-1

View File

@ -0,0 +1 @@
-1452543795404 1376681271659 1450950799997 -1473113374257

View File

@ -0,0 +1 @@
223372036854771712

View File

@ -0,0 +1,5 @@
cache 1739362304123123123
rss 1775325184
rss_huge 778043392
mapped_file 421036032
dirty -307200

View File

@ -0,0 +1 @@
223372036854771712

View File

@ -0,0 +1,5 @@
cache 1739362304123123123
rss 1775325184
rss_huge 778043392
mapped_file 421036032
dirty -307200

View File

@ -0,0 +1 @@
9223372036854771712

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1 @@
223372036854771712

View File

@ -0,0 +1,5 @@
cache 1739362304123123123
rss 1775325184
rss_huge 778043392
mapped_file 421036032
dirty -307200

View File

@ -0,0 +1 @@
223372036854771712

View File

@ -0,0 +1,5 @@
cache 1739362304123123123
rss 1775325184
rss_huge 778043392
mapped_file 421036032
dirty -307200

View File

@ -0,0 +1 @@
223372036854771712

View File

@ -0,0 +1,5 @@
cache 1739362304123123123
rss 1775325184
rss_huge 778043392
mapped_file 421036032
dirty -307200

View File

View File

@ -0,0 +1 @@
9223372036854771712

View File

@ -0,0 +1 @@
223372036854771712

View File

@ -0,0 +1,3 @@
0
-1
2

View File

@ -0,0 +1,8 @@
total=858067 N0=858067
file=406254 N0=406254
anon=451792 N0=451792
unevictable=21 N0=21
hierarchical_total=858067 N0=858067
hierarchical_file=406254 N0=406254
hierarchical_anon=451792 N0=451792
hierarchical_unevictable=21 N0=21

View File

@ -0,0 +1,5 @@
cache 1739362304123123123
rss 1775325184
rss_huge 778043392
mapped_file 421036032
dirty -307200

View File

@ -0,0 +1 @@
3513667584

View File

@ -0,0 +1 @@
12-781

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1,25 @@
# NSQ Consumer Input Plugin
The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.
## Configuration
```toml
# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
## An array of NSQD HTTP API endpoints
server = "localhost:4150"
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
## Testing
The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies.

View File

@ -0,0 +1,99 @@
package nsq_consumer
import (
"log"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nsqio/go-nsq"
)
//NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string
Topic string
Channel string
MaxInFlight int
parser parsers.Parser
consumer *nsq.Consumer
acc telegraf.Accumulator
}
var sampleConfig = `
## An string representing the NSQD TCP Endpoint
server = "localhost:4150"
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func init() {
inputs.Add("nsq_consumer", func() telegraf.Input {
return &NSQConsumer{}
})
}
// SetParser takes the data_format from the config and finds the right parser for that format
func (n *NSQConsumer) SetParser(parser parsers.Parser) {
n.parser = parser
}
// SampleConfig returns config values for generating a sample configuration file
func (n *NSQConsumer) SampleConfig() string {
return sampleConfig
}
// Description prints description string
func (n *NSQConsumer) Description() string {
return "Read NSQ topic for metrics."
}
// Start pulls data from nsq
func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
n.acc = acc
n.connect()
n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
metrics, err := n.parser.Parse(message.Body)
if err != nil {
log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())
return nil
}
for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
message.Finish()
return nil
}), n.MaxInFlight)
n.consumer.ConnectToNSQD(n.Server)
return nil
}
// Stop processing messages
func (n *NSQConsumer) Stop() {
n.consumer.Stop()
}
// Gather is a noop
func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error {
return nil
}
func (n *NSQConsumer) connect() error {
if n.consumer == nil {
config := nsq.NewConfig()
config.MaxInFlight = n.MaxInFlight
consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config)
if err != nil {
return err
}
n.consumer = consumer
}
return nil
}

View File

@ -0,0 +1,245 @@
package nsq_consumer
import (
"bufio"
"bytes"
"encoding/binary"
"io"
"log"
"net"
"strconv"
"testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)
// This test is modeled after the kafka consumer integration test
func TestReadsMetricsFromNSQ(t *testing.T) {
msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"))
script := []instruction{
// SUB
instruction{0, nsq.FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, nsq.FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4155")
newMockNSQD(script, addr.String())
consumer := &NSQConsumer{
Server: "127.0.0.1:4155",
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
}
p, _ := parsers.NewInfluxParser()
consumer.SetParser(p)
var acc testutil.Accumulator
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := consumer.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer consumer.Stop()
}
waitForPoint(&acc, t)
if len(acc.Metrics) == 1 {
point := acc.Metrics[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
} else {
t.Errorf("No points found in accumulator, expected 1")
}
}
// Waits for the metric that was sent to the kafka broker to arrive at the kafka
// consumer
func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()
counter := 0
for {
select {
case <-ticker.C:
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if acc.NFields() == 1 {
return
}
}
}
}
func newMockNSQD(script []instruction, addr string) *mockNSQD {
n := &mockNSQD{
script: script,
exitChan: make(chan int),
}
tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
}
n.tcpListener = tcpListener
n.tcpAddr = tcpListener.Addr().(*net.TCPAddr)
go n.listen()
return n
}
// The code below allows us to mock the interactions with nsqd. This is taken from:
// https://github.com/nsqio/go-nsq/blob/master/mock_test.go
type instruction struct {
delay time.Duration
frameType int32
body []byte
}
type mockNSQD struct {
script []instruction
got [][]byte
tcpAddr *net.TCPAddr
tcpListener net.Listener
exitChan chan int
}
func (n *mockNSQD) listen() {
for {
conn, err := n.tcpListener.Accept()
if err != nil {
break
}
go n.handle(conn)
}
close(n.exitChan)
}
func (n *mockNSQD) handle(conn net.Conn) {
var idx int
buf := make([]byte, 4)
_, err := io.ReadFull(conn, buf)
if err != nil {
log.Fatalf("ERROR: failed to read protocol version - %s", err)
}
readChan := make(chan []byte)
readDoneChan := make(chan int)
scriptTime := time.After(n.script[0].delay)
rdr := bufio.NewReader(conn)
go func() {
for {
line, err := rdr.ReadBytes('\n')
if err != nil {
return
}
// trim the '\n'
line = line[:len(line)-1]
readChan <- line
<-readDoneChan
}
}()
var rdyCount int
for idx < len(n.script) {
select {
case line := <-readChan:
n.got = append(n.got, line)
params := bytes.Split(line, []byte(" "))
switch {
case bytes.Equal(params[0], []byte("IDENTIFY")):
l := make([]byte, 4)
_, err := io.ReadFull(rdr, l)
if err != nil {
log.Printf(err.Error())
goto exit
}
size := int32(binary.BigEndian.Uint32(l))
b := make([]byte, size)
_, err = io.ReadFull(rdr, b)
if err != nil {
log.Printf(err.Error())
goto exit
}
case bytes.Equal(params[0], []byte("RDY")):
rdy, _ := strconv.Atoi(string(params[1]))
rdyCount = rdy
case bytes.Equal(params[0], []byte("FIN")):
case bytes.Equal(params[0], []byte("REQ")):
}
readDoneChan <- 1
case <-scriptTime:
inst := n.script[idx]
if bytes.Equal(inst.body, []byte("exit")) {
goto exit
}
if inst.frameType == nsq.FrameTypeMessage {
if rdyCount == 0 {
scriptTime = time.After(n.script[idx+1].delay)
continue
}
rdyCount--
}
_, err := conn.Write(framedResponse(inst.frameType, inst.body))
if err != nil {
log.Printf(err.Error())
goto exit
}
scriptTime = time.After(n.script[idx+1].delay)
idx++
}
}
exit:
n.tcpListener.Close()
conn.Close()
}
func framedResponse(frameType int32, data []byte) []byte {
var w bytes.Buffer
beBuf := make([]byte, 4)
size := uint32(len(data)) + 4
binary.BigEndian.PutUint32(beBuf, size)
_, err := w.Write(beBuf)
if err != nil {
return nil
}
binary.BigEndian.PutUint32(beBuf, uint32(frameType))
_, err = w.Write(beBuf)
if err != nil {
return nil
}
w.Write(data)
return w.Bytes()
}
func frameMessage(m *nsq.Message) []byte {
var b bytes.Buffer
m.WriteTo(&b)
return b.Bytes()
}

View File

@ -30,6 +30,26 @@ to filter and some tags
kubeservice = "kube-apiserver" kubeservice = "kube-apiserver"
``` ```
```toml
# Authorize with a bearer token skipping cert verification
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
bearer_token = '/path/to/bearer/token'
insecure_skip_verify = true
```
```toml
# Authorize using x509 certs
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["https://my-kube-apiserver:8080/metrics"]
ssl_ca = '/path/to/cafile'
ssl_cert = '/path/to/certfile'
ssl_key = '/path/to/keyfile'
```
### Measurements & Fields & Tags: ### Measurements & Fields & Tags:
Measurements and fields could be any thing. Measurements and fields could be any thing.

View File

@ -1,10 +1,10 @@
package prometheus package prometheus
import ( import (
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"io/ioutil" "io/ioutil"
"net" "net"
@ -16,20 +16,32 @@ import (
type Prometheus struct { type Prometheus struct {
Urls []string Urls []string
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// Bearer Token authorization file path // Bearer Token authorization file path
BearerToken string `toml:"bearer_token"` BearerToken string `toml:"bearer_token"`
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
} }
var sampleConfig = ` var sampleConfig = `
## An array of urls to scrape metrics from. ## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"] urls = ["http://localhost:9100/metrics"]
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Use bearer token for authorization ## Use bearer token for authorization
# bearer_token = /path/to/bearer/token # bearer_token = /path/to/bearer/token
## Optional SSL Config
# ssl_ca = /path/to/cafile
# ssl_cert = /path/to/certfile
# ssl_key = /path/to/keyfile
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
` `
func (p *Prometheus) SampleConfig() string { func (p *Prometheus) SampleConfig() string {
@ -78,16 +90,21 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var token []byte var token []byte
var resp *http.Response var resp *http.Response
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return err
}
var rt http.RoundTripper = &http.Transport{ var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{ Dial: (&net.Dialer{
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second, KeepAlive: 30 * time.Second,
}).Dial, }).Dial,
TLSHandshakeTimeout: 5 * time.Second, TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: &tls.Config{ TLSClientConfig: tlsCfg,
InsecureSkipVerify: p.InsecureSkipVerify,
},
ResponseHeaderTimeout: time.Duration(3 * time.Second), ResponseHeaderTimeout: time.Duration(3 * time.Second),
DisableKeepAlives: true,
} }
if p.BearerToken != "" { if p.BearerToken != "" {

View File

@ -26,7 +26,6 @@ var (
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
metrics map[string]*prometheus.UntypedVec
} }
var sampleConfig = ` var sampleConfig = `
@ -35,6 +34,14 @@ var sampleConfig = `
` `
func (p *PrometheusClient) Start() error { func (p *PrometheusClient) Start() error {
defer func() {
if r := recover(); r != nil {
// recovering from panic here because there is no way to stop a
// running http go server except by a kill signal. Since the server
// does not stop on SIGHUP, Start() will panic when the process
// is reloaded.
}
}()
if p.Listen == "" { if p.Listen == "" {
p.Listen = "localhost:9126" p.Listen = "localhost:9126"
} }
@ -44,7 +51,6 @@ func (p *PrometheusClient) Start() error {
Addr: p.Listen, Addr: p.Listen,
} }
p.metrics = make(map[string]*prometheus.UntypedVec)
go server.ListenAndServe() go server.ListenAndServe()
return nil return nil
} }
@ -118,24 +124,26 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
continue continue
} }
// Create a new metric if it hasn't been created yet. mVec := prometheus.NewUntypedVec(
if _, ok := p.metrics[mname]; !ok {
p.metrics[mname] = prometheus.NewUntypedVec(
prometheus.UntypedOpts{ prometheus.UntypedOpts{
Name: mname, Name: mname,
Help: "Telegraf collected metric", Help: "Telegraf collected metric",
}, },
labels, labels,
) )
if err := prometheus.Register(p.metrics[mname]); err != nil { collector, err := prometheus.RegisterOrGet(mVec)
if err != nil {
log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err) log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err)
continue continue
} }
mVec, ok := collector.(*prometheus.UntypedVec)
if !ok {
continue
} }
switch val := val.(type) { switch val := val.(type) {
case int64: case int64:
m, err := p.metrics[mname].GetMetricWith(l) m, err := mVec.GetMetricWith(l)
if err != nil { if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+ log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",
@ -144,7 +152,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
} }
m.Set(float64(val)) m.Set(float64(val))
case float64: case float64:
m, err := p.metrics[mname].GetMetricWith(l) m, err := mVec.GetMetricWith(l)
if err != nil { if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+ log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",