From c6706a86f138b3d9769d306aa48656448d7619ad Mon Sep 17 00:00:00 2001 From: arthtux Date: Thu, 3 Mar 2016 20:20:03 -0500 Subject: [PATCH 01/22] add README.md for redis --- plugins/inputs/redis/README.md | 86 ++++++++++++++++++++++++++++++++++ plugins/inputs/redis/redis.go | 1 + 2 files changed, 87 insertions(+) create mode 100644 plugins/inputs/redis/README.md diff --git a/plugins/inputs/redis/README.md b/plugins/inputs/redis/README.md new file mode 100644 index 000000000..d7d98ccc9 --- /dev/null +++ b/plugins/inputs/redis/README.md @@ -0,0 +1,86 @@ +# Telegraf Plugin: Redis + +### Configuration: + +``` +# Read Redis's basic status information +[[inputs.redis]] + ## specify servers via a url matching: + ## [protocol://][:password]@address[:port] + ## e.g. + ## tcp://localhost:6379 + ## tcp://:password@192.168.99.100 + ## + ## If no servers are specified, then localhost is used as the host. + ## If no port is specified, 6379 is used + servers = ["tcp://localhost:6379"] +``` + +### Measurements & Fields: + +- Measurement + - uptime_in_seconds + - connected_clients + - used_memory + - used_memory_rss + - used_memory_peak + - used_memory_lua + - rdb_changes_since_last_save + - total_connections_received + - total_commands_processed + - instantaneous_ops_per_sec + - instantaneous_input_kbps + - instantaneous_output_kbps + - sync_full + - sync_partial_ok + - sync_partial_err + - expired_keys + - evicted_keys + - keyspace_hits + - keyspace_misses + - pubsub_channels + - pubsub_patterns + - latest_fork_usec + - connected_slaves + - master_repl_offset + - repl_backlog_active + - repl_backlog_size + - repl_backlog_histlen + - mem_fragmentation_ratio + - used_cpu_sys + - used_cpu_user + - used_cpu_sys_children + - used_cpu_user_children + +### Tags: + +- All measurements have the following tags: + - port + - server + +### Example Output: + +Using this configuration: +``` +[[inputs.nginx]] + ## specify servers via a url matching: + ## [protocol://][:password]@address[:port] + ## e.g. + ## tcp://localhost:6379 + ## tcp://:password@192.168.99.100 + ## + ## If no servers are specified, then localhost is used as the host. + ## If no port is specified, 6379 is used + servers = ["tcp://localhost:6379"] +``` + +When run with: +``` +./telegraf -config telegraf.conf -input-filter redis -test +``` + +It produces: +``` +* Plugin: redis, Collection 1 +> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383 +``` diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index b8862f6bc..df8dfe2f2 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -19,6 +19,7 @@ type Redis struct { } var sampleConfig = ` +[[inputs.redis]] ## specify servers via a url matching: ## [protocol://][:password]@address[:port] ## e.g. From 20999979de2e31b0d56bd129ab62fdccdb7c2160 Mon Sep 17 00:00:00 2001 From: Arthur Deschamps Date: Fri, 4 Mar 2016 07:22:54 -0500 Subject: [PATCH 02/22] Update redis.go --- plugins/inputs/redis/redis.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index df8dfe2f2..b8862f6bc 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -19,7 +19,6 @@ type Redis struct { } var sampleConfig = ` -[[inputs.redis]] ## specify servers via a url matching: ## [protocol://][:password]@address[:port] ## e.g. From 1c76d5d096f968a077ef4228a5a3dbe16ec33cfe Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Tue, 23 Feb 2016 23:58:14 -0500 Subject: [PATCH 03/22] Improve docker input plugin closes #754 --- CHANGELOG.md | 1 + plugins/inputs/docker/README.md | 46 ++++++- plugins/inputs/docker/docker.go | 124 +++++++++++++++++- plugins/inputs/docker/docker_test.go | 185 +++++++++++++++++++++++++++ 4 files changed, 353 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf71e79aa..e02c3bb9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#758](https://github.com/influxdata/telegraf/pull/758): UDP Listener input plugin, thanks @whatyouhide! - [#769](https://github.com/influxdata/telegraf/issues/769): httpjson plugin: allow specifying SSL configuration. - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! +- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/docker/README.md b/plugins/inputs/docker/README.md index 6086c89e8..97450e2aa 100644 --- a/plugins/inputs/docker/README.md +++ b/plugins/inputs/docker/README.md @@ -95,18 +95,50 @@ on the availability of per-cpu stats on your system. - io_serviced_recursive_sync - io_serviced_recursive_total - io_serviced_recursive_write +- docker_ + - n_used_file_descriptors + - n_cpus + - n_containers + - n_images + - n_goroutines + - n_listener_events + - memory_total + - pool_blocksize +- docker_data + - available + - total + - used +- docker_metadata + - available + - total + - used + ### Tags: -- All stats have the following tags: +- docker (memory_total) + - unit=bytes +- docker (pool_blocksize) + - unit=bytes +- docker_data + - unit=bytes +- docker_metadata + - unit=bytes + +- docker_cpu specific: - cont_id (container ID) - cont_image (container image) - cont_name (container name) -- docker_cpu specific: - cpu - docker_net specific: + - cont_id (container ID) + - cont_image (container image) + - cont_name (container name) - network - docker_blkio specific: + - cont_id (container ID) + - cont_image (container image) + - cont_name (container name) - device ### Example Output: @@ -114,6 +146,16 @@ on the availability of per-cpu stats on your system. ``` % ./telegraf -config ~/ws/telegraf.conf -input-filter docker -test * Plugin: docker, Collection 1 +> docker n_cpus=8i 1456926671065383978 +> docker n_used_file_descriptors=15i 1456926671065383978 +> docker n_containers=7i 1456926671065383978 +> docker n_images=152i 1456926671065383978 +> docker n_goroutines=36i 1456926671065383978 +> docker n_listener_events=0i 1456926671065383978 +> docker,unit=bytes memory_total=18935443456i 1456926671065383978 +> docker,unit=bytes pool_blocksize=65540i 1456926671065383978 +> docker_data,unit=bytes available=24340000000i,total=107400000000i,used=14820000000i 1456926671065383978 +> docker_metadata,unit=bytes available=2126999999i,total=2146999999i,used=20420000i 145692667106538 > docker_mem,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\ cont_image=spotify/kafka,cont_name=kafka \ active_anon=52568064i,active_file=6926336i,cache=12038144i,fail_count=0i,\ diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 0d89979c1..cdc8ec1e5 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -1,8 +1,11 @@ package system import ( + "encoding/json" "fmt" "log" + "regexp" + "strconv" "strings" "sync" "time" @@ -17,9 +20,29 @@ type Docker struct { Endpoint string ContainerNames []string - client *docker.Client + client DockerClient } +type DockerClient interface { + // Docker Client wrapper + // Useful for test + Info() (*docker.Env, error) + ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) + Stats(opts docker.StatsOptions) error +} + +const ( + KB = 1000 + MB = 1000 * KB + GB = 1000 * MB + TB = 1000 * GB + PB = 1000 * TB +) + +var ( + sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`) +) + var sampleConfig = ` ## Docker Endpoint ## To use TCP, set endpoint = "tcp://[ip]:[port]" @@ -58,12 +81,20 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { d.client = c } + // Get daemon info + err := d.gatherInfo(acc) + if err != nil { + fmt.Println(err.Error()) + } + + // List containers opts := docker.ListContainersOptions{} containers, err := d.client.ListContainers(opts) if err != nil { return err } + // Get container data var wg sync.WaitGroup wg.Add(len(containers)) for _, container := range containers { @@ -81,6 +112,76 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { return nil } +func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { + // Init vars + var driverStatus [][]string + dataFields := make(map[string]interface{}) + metadataFields := make(map[string]interface{}) + now := time.Now() + // Get info from docker daemon + info, err := d.client.Info() + if err != nil { + return err + } + + fields := map[string]interface{}{ + "n_cpus": info.GetInt64("NCPU"), + "n_used_file_descriptors": info.GetInt64("NFd"), + "n_containers": info.GetInt64("Containers"), + "n_images": info.GetInt64("Images"), + "n_goroutines": info.GetInt64("NGoroutines"), + "n_listener_events": info.GetInt64("NEventsListener"), + } + // Add metrics + acc.AddFields("docker", + fields, + nil, + now) + acc.AddFields("docker", + map[string]interface{}{"memory_total": info.GetInt64("MemTotal")}, + map[string]string{"unit": "bytes"}, + now) + // Get storage metrics + driverStatusRaw := []byte(info.Get("DriverStatus")) + json.Unmarshal(driverStatusRaw, &driverStatus) + for _, rawData := range driverStatus { + // Try to convert string to int (bytes) + value, err := parseSize(rawData[1]) + if err != nil { + continue + } + name := strings.ToLower(strings.Replace(rawData[0], " ", "_", -1)) + if name == "pool_blocksize" { + // pool blocksize + acc.AddFields("docker", + map[string]interface{}{"pool_blocksize": value}, + map[string]string{"unit": "bytes"}, + now) + } else if strings.HasPrefix(name, "data_space_") { + // data space + field_name := strings.TrimPrefix(name, "data_space_") + dataFields[field_name] = value + } else if strings.HasPrefix(name, "metadata_space_") { + // metadata space + field_name := strings.TrimPrefix(name, "metadata_space_") + metadataFields[field_name] = value + } + } + if len(dataFields) > 0 { + acc.AddFields("docker_data", + dataFields, + map[string]string{"unit": "bytes"}, + now) + } + if len(metadataFields) > 0 { + acc.AddFields("docker_metadata", + metadataFields, + map[string]string{"unit": "bytes"}, + now) + } + return nil +} + func (d *Docker) gatherContainer( container docker.APIContainers, acc telegraf.Accumulator, @@ -334,6 +435,27 @@ func sliceContains(in string, sl []string) bool { return false } +// Parses the human-readable size string into the amount it represents. +func parseSize(sizeStr string) (int64, error) { + matches := sizeRegex.FindStringSubmatch(sizeStr) + if len(matches) != 4 { + return -1, fmt.Errorf("invalid size: '%s'", sizeStr) + } + + size, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return -1, err + } + + uMap := map[string]int64{"k": KB, "m": MB, "g": GB, "t": TB, "p": PB} + unitPrefix := strings.ToLower(matches[3]) + if mul, ok := uMap[unitPrefix]; ok { + size *= float64(mul) + } + + return int64(size), nil +} + func init() { inputs.Add("docker", func() telegraf.Input { return &Docker{} diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index aebe8102e..23fd0bb34 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -1,12 +1,14 @@ package system import ( + "encoding/json" "testing" "time" "github.com/influxdata/telegraf/testutil" "github.com/fsouza/go-dockerclient" + "github.com/stretchr/testify/require" ) func TestDockerGatherContainerStats(t *testing.T) { @@ -194,3 +196,186 @@ func testStats() *docker.Stats { return stats } + +type FakeDockerClient struct { +} + +func (d FakeDockerClient) Info() (*docker.Env, error) { + env := docker.Env{"Containers=108", "OomKillDisable=false", "SystemTime=2016-02-24T00:55:09.15073105-05:00", "NEventsListener=0", "ID=5WQQ:TFWR:FDNG:OKQ3:37Y4:FJWG:QIKK:623T:R3ME:QTKB:A7F7:OLHD", "Debug=false", "LoggingDriver=json-file", "KernelVersion=4.3.0-1-amd64", "IndexServerAddress=https://index.docker.io/v1/", "MemTotal=3840757760", "Images=199", "CpuCfsQuota=true", "Name=absol", "SwapLimit=false", "IPv4Forwarding=true", "ExecutionDriver=native-0.2", "InitSha1=23a51f3c916d2b5a3bbb31caf301fd2d14edd518", "ExperimentalBuild=false", "CpuCfsPeriod=true", "RegistryConfig={\"IndexConfigs\":{\"docker.io\":{\"Mirrors\":null,\"Name\":\"docker.io\",\"Official\":true,\"Secure\":true}},\"InsecureRegistryCIDRs\":[\"127.0.0.0/8\"],\"Mirrors\":null}", "OperatingSystem=Linux Mint LMDE (containerized)", "BridgeNfIptables=true", "HttpsProxy=", "Labels=null", "MemoryLimit=false", "DriverStatus=[[\"Pool Name\",\"docker-8:1-1182287-pool\"],[\"Pool Blocksize\",\"65.54 kB\"],[\"Backing Filesystem\",\"extfs\"],[\"Data file\",\"/dev/loop0\"],[\"Metadata file\",\"/dev/loop1\"],[\"Data Space Used\",\"17.3 GB\"],[\"Data Space Total\",\"107.4 GB\"],[\"Data Space Available\",\"36.53 GB\"],[\"Metadata Space Used\",\"20.97 MB\"],[\"Metadata Space Total\",\"2.147 GB\"],[\"Metadata Space Available\",\"2.127 GB\"],[\"Udev Sync Supported\",\"true\"],[\"Deferred Removal Enabled\",\"false\"],[\"Data loop file\",\"/var/lib/docker/devicemapper/devicemapper/data\"],[\"Metadata loop file\",\"/var/lib/docker/devicemapper/devicemapper/metadata\"],[\"Library Version\",\"1.02.115 (2016-01-25)\"]]", "NFd=19", "HttpProxy=", "Driver=devicemapper", "NGoroutines=39", "InitPath=/usr/lib/docker.io/dockerinit", "NCPU=4", "DockerRootDir=/var/lib/docker", "NoProxy=", "BridgeNfIp6tables=true"} + return &env, nil +} + +func (d FakeDockerClient) ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) { + container1 := docker.APIContainers{ + ID: "e2173b9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296b7dfb", + Image: "quay.io/coreos/etcd:v2.2.2", + Command: "/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379", + Created: 1455941930, + Status: "Up 4 hours", + Ports: []docker.APIPort{ + docker.APIPort{ + PrivatePort: 7001, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 4001, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2380, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2379, + PublicPort: 2379, + Type: "tcp", + IP: "0.0.0.0", + }, + }, + SizeRw: 0, + SizeRootFs: 0, + Names: []string{"/etcd"}, + } + container2 := docker.APIContainers{ + ID: "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", + Image: "quay.io/coreos/etcd:v2.2.2", + Command: "/etcd -name etcd2 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379", + Created: 1455941933, + Status: "Up 4 hours", + Ports: []docker.APIPort{ + docker.APIPort{ + PrivatePort: 7002, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 4002, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2381, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2382, + PublicPort: 2382, + Type: "tcp", + IP: "0.0.0.0", + }, + }, + SizeRw: 0, + SizeRootFs: 0, + Names: []string{"/etcd2"}, + } + + containers := []docker.APIContainers{container1, container2} + return containers, nil + + //#{e6a96c84ca91a5258b7cb752579fb68826b68b49ff957487695cd4d13c343b44 titilambert/snmpsim /bin/sh -c 'snmpsimd --agent-udpv4-endpoint=0.0.0.0:31161 --process-user=root --process-group=user' 1455724831 Up 4 hours [{31161 31161 udp 0.0.0.0}] 0 0 [/snmp] map[]}]2016/02/24 01:05:01 Gathered metrics, (3s interval), from 1 inputs in 1.233836656s +} + +func (d FakeDockerClient) Stats(opts docker.StatsOptions) error { + jsonStat := `{"read":"2016-02-24T11:42:27.472459608-05:00","memory_stats":{"stats":{},"limit":18935443456},"blkio_stats":{"io_service_bytes_recursive":[{"major":252,"minor":1,"op":"Read","value":753664},{"major":252,"minor":1,"op":"Write"},{"major":252,"minor":1,"op":"Sync"},{"major":252,"minor":1,"op":"Async","value":753664},{"major":252,"minor":1,"op":"Total","value":753664}],"io_serviced_recursive":[{"major":252,"minor":1,"op":"Read","value":26},{"major":252,"minor":1,"op":"Write"},{"major":252,"minor":1,"op":"Sync"},{"major":252,"minor":1,"op":"Async","value":26},{"major":252,"minor":1,"op":"Total","value":26}]},"cpu_stats":{"cpu_usage":{"percpu_usage":[17871,4959158,1646137,1231652,11829401,244656,369972,0],"usage_in_usermode":10000000,"total_usage":20298847},"system_cpu_usage":24052607520000000,"throttling_data":{}},"precpu_stats":{"cpu_usage":{"percpu_usage":[17871,4959158,1646137,1231652,11829401,244656,369972,0],"usage_in_usermode":10000000,"total_usage":20298847},"system_cpu_usage":24052599550000000,"throttling_data":{}}}` + var stat docker.Stats + json.Unmarshal([]byte(jsonStat), &stat) + opts.Stats <- &stat + return nil +} + +func TestDockerGatherInfo(t *testing.T) { + var acc testutil.Accumulator + client := FakeDockerClient{} + d := Docker{client: client} + + err := d.Gather(&acc) + + require.NoError(t, err) + + acc.AssertContainsTaggedFields(t, + "docker", + map[string]interface{}{ + "n_listener_events": int64(0), + "n_cpus": int64(4), + "n_used_file_descriptors": int64(19), + "n_containers": int64(108), + "n_images": int64(199), + "n_goroutines": int64(39), + }, + map[string]string{}, + ) + + acc.AssertContainsTaggedFields(t, + "docker_data", + map[string]interface{}{ + "used": int64(17300000000), + "total": int64(107400000000), + "available": int64(36530000000), + }, + map[string]string{ + "unit": "bytes", + }, + ) + acc.AssertContainsTaggedFields(t, + "docker_cpu", + map[string]interface{}{ + "usage_total": uint64(1231652), + }, + map[string]string{ + "cont_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", + "cont_name": "etcd2", + "cont_image": "quay.io/coreos/etcd:v2.2.2", + "cpu": "cpu3", + }, + ) + acc.AssertContainsTaggedFields(t, + "docker_mem", + map[string]interface{}{ + "total_pgpgout": uint64(0), + "usage_percent": float64(0), + "rss": uint64(0), + "total_writeback": uint64(0), + "active_anon": uint64(0), + "total_pgmafault": uint64(0), + "total_rss": uint64(0), + "total_unevictable": uint64(0), + "active_file": uint64(0), + "total_mapped_file": uint64(0), + "pgpgin": uint64(0), + "total_active_file": uint64(0), + "total_active_anon": uint64(0), + "total_cache": uint64(0), + "inactive_anon": uint64(0), + "pgmajfault": uint64(0), + "total_inactive_anon": uint64(0), + "total_rss_huge": uint64(0), + "rss_huge": uint64(0), + "hierarchical_memory_limit": uint64(0), + "pgpgout": uint64(0), + "unevictable": uint64(0), + "total_inactive_file": uint64(0), + "writeback": uint64(0), + "total_pgfault": uint64(0), + "total_pgpgin": uint64(0), + "cache": uint64(0), + "mapped_file": uint64(0), + "inactive_file": uint64(0), + "max_usage": uint64(0), + "fail_count": uint64(0), + "pgfault": uint64(0), + "usage": uint64(0), + "limit": uint64(18935443456), + }, + map[string]string{ + "cont_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", + "cont_name": "etcd2", + "cont_image": "quay.io/coreos/etcd:v2.2.2", + }, + ) + + //fmt.Print(info) +} From 8f98c20c51b4b93cb6f33c9bc385a4c3db242bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=85=89=E6=9D=83?= Date: Fri, 4 Mar 2016 00:09:49 +0800 Subject: [PATCH 04/22] Add flags -usage-list to print all plugins inputs for telegraf --- cmd/telegraf/telegraf.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index a65c5607c..ec8e6315a 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" - + "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" ) @@ -34,6 +34,7 @@ var fOutputFilters = flag.String("output-filter", "", "filter the outputs to enable, separator is :") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") +var fUsageList = flag.Bool("usage-list", false, "print all the plugins inputs") var fInputFiltersLegacy = flag.String("filter", "", "filter the inputs to enable, separator is :") @@ -61,6 +62,7 @@ The flags are: -input-filter filter the input plugins to enable, separator is : -output-filter filter the output plugins to enable, separator is : -usage print usage for a plugin, ie, 'telegraf -usage mysql' + -usage-list print all the plugins input -debug print metrics as they're generated to stdout -quiet run in quiet mode -version print the version to stdout @@ -135,6 +137,13 @@ func main() { return } + if *fUsageList { + fmt.Println("The plugin inputs avaiable:") + for k, _ := range inputs.Inputs { + fmt.Printf(" %s\n", k) + } + } + var ( c *config.Config err error From 3249030257e1382d02ed899de9c2752a66fd3a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=85=89=E6=9D=83?= Date: Sun, 6 Mar 2016 20:08:51 +0800 Subject: [PATCH 05/22] add flags '-input-list' and '-output-list' for telegraf command --- cmd/telegraf/telegraf.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index ec8e6315a..b07f0d303 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" + "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" ) @@ -30,12 +31,13 @@ var fSampleConfig = flag.Bool("sample-config", false, var fPidfile = flag.String("pidfile", "", "file to write our pid to") var fInputFilters = flag.String("input-filter", "", "filter the inputs to enable, separator is :") +var fInpuList = flag.Bool("input-list", false, "print all the plugins inputs") var fOutputFilters = flag.String("output-filter", "", "filter the outputs to enable, separator is :") +var fOutputList = flag.Bool("output-list", false, + "print all the available outputs") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") -var fUsageList = flag.Bool("usage-list", false, "print all the plugins inputs") - var fInputFiltersLegacy = flag.String("filter", "", "filter the inputs to enable, separator is :") var fOutputFiltersLegacy = flag.String("outputfilter", "", @@ -60,9 +62,10 @@ The flags are: -sample-config print out full sample configuration to stdout -config-directory directory containing additional *.conf files -input-filter filter the input plugins to enable, separator is : + -input-list print all the plugins inputs -output-filter filter the output plugins to enable, separator is : + -output-list print all the available outputs -usage print usage for a plugin, ie, 'telegraf -usage mysql' - -usage-list print all the plugins input -debug print metrics as they're generated to stdout -quiet run in quiet mode -version print the version to stdout @@ -117,6 +120,13 @@ func main() { outputFilters = strings.Split(":"+outputFilter+":", ":") } + if *fOutputList { + fmt.Println("The outputs available:") + for k, _ := range outputs.Outputs { + fmt.Printf(" %s\n", k) + } + } + if *fVersion { v := fmt.Sprintf("Telegraf - Version %s", Version) fmt.Println(v) @@ -137,8 +147,8 @@ func main() { return } - if *fUsageList { - fmt.Println("The plugin inputs avaiable:") + if *fInpuList { + fmt.Println("The plugin inputs available:") for k, _ := range inputs.Inputs { fmt.Printf(" %s\n", k) } From fe44fa648a98f5fb58c7c111481fda65d7140989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=85=89=E6=9D=83?= Date: Mon, 7 Mar 2016 17:41:57 +0800 Subject: [PATCH 06/22] Fix the incorrect indent of input-list help message --- cmd/telegraf/telegraf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index b07f0d303..dea80cde3 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -62,7 +62,7 @@ The flags are: -sample-config print out full sample configuration to stdout -config-directory directory containing additional *.conf files -input-filter filter the input plugins to enable, separator is : - -input-list print all the plugins inputs + -input-list print all the plugins inputs -output-filter filter the output plugins to enable, separator is : -output-list print all the available outputs -usage print usage for a plugin, ie, 'telegraf -usage mysql' From 7e312797eca487d744b2f887d43459e1934dd75a Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 7 Mar 2016 11:42:01 +0100 Subject: [PATCH 07/22] Grammar corrections and consistency for output-list, input-list closes #788 --- CHANGELOG.md | 1 + cmd/telegraf/telegraf.go | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e02c3bb9c..73a942730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#769](https://github.com/influxdata/telegraf/issues/769): httpjson plugin: allow specifying SSL configuration. - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! +- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index dea80cde3..d54aaa4e3 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -31,11 +31,12 @@ var fSampleConfig = flag.Bool("sample-config", false, var fPidfile = flag.String("pidfile", "", "file to write our pid to") var fInputFilters = flag.String("input-filter", "", "filter the inputs to enable, separator is :") -var fInpuList = flag.Bool("input-list", false, "print all the plugins inputs") +var fInputList = flag.Bool("input-list", false, + "print available output plugins.") var fOutputFilters = flag.String("output-filter", "", "filter the outputs to enable, separator is :") var fOutputList = flag.Bool("output-list", false, - "print all the available outputs") + "print available output plugins.") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") var fInputFiltersLegacy = flag.String("filter", "", @@ -64,7 +65,7 @@ The flags are: -input-filter filter the input plugins to enable, separator is : -input-list print all the plugins inputs -output-filter filter the output plugins to enable, separator is : - -output-list print all the available outputs + -output-list print all the available outputs -usage print usage for a plugin, ie, 'telegraf -usage mysql' -debug print metrics as they're generated to stdout -quiet run in quiet mode @@ -121,10 +122,19 @@ func main() { } if *fOutputList { - fmt.Println("The outputs available:") + fmt.Println("Available Output Plugins:") for k, _ := range outputs.Outputs { fmt.Printf(" %s\n", k) } + return + } + + if *fInputList { + fmt.Println("Available Input Plugins:") + for k, _ := range inputs.Inputs { + fmt.Printf(" %s\n", k) + } + return } if *fVersion { @@ -147,13 +157,6 @@ func main() { return } - if *fInpuList { - fmt.Println("The plugin inputs available:") - for k, _ := range inputs.Inputs { - fmt.Printf(" %s\n", k) - } - } - var ( c *config.Config err error From 3cca312e61cde1c68a692c780edd8777b0e17a9d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 2 Mar 2016 15:31:46 +0000 Subject: [PATCH 08/22] Adding a TCP input listener closes #481 --- CHANGELOG.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/tcp_listener/README.md | 30 ++ plugins/inputs/tcp_listener/tcp_listener.go | 264 ++++++++++++++++++ .../inputs/tcp_listener/tcp_listener_test.go | 259 +++++++++++++++++ 5 files changed, 555 insertions(+) create mode 100644 plugins/inputs/tcp_listener/README.md create mode 100644 plugins/inputs/tcp_listener/tcp_listener.go create mode 100644 plugins/inputs/tcp_listener/tcp_listener_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 73a942730..fe87e41dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! +- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 262de37ac..2808ce2b5 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -47,6 +47,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/system" + _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" diff --git a/plugins/inputs/tcp_listener/README.md b/plugins/inputs/tcp_listener/README.md new file mode 100644 index 000000000..63a7dea3c --- /dev/null +++ b/plugins/inputs/tcp_listener/README.md @@ -0,0 +1,30 @@ +# TCP listener service input plugin + +The TCP listener is a service input plugin that listens for messages on a TCP +socket and adds those messages to InfluxDB. +The plugin expects messages in the +[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +# Generic TCP listener +[[inputs.tcp_listener]] + ## Address and port to host TCP listener on + service_address = ":8094" + + ## Number of TCP messages allowed to queue up. Once filled, the + ## TCP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## Maximum number of concurrent TCP connections to allow + max_tcp_connections = 250 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go new file mode 100644 index 000000000..dd239fedf --- /dev/null +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -0,0 +1,264 @@ +package tcp_listener + +import ( + "bufio" + "fmt" + "log" + "net" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type TcpListener struct { + ServiceAddress string + AllowedPendingMessages int + MaxTCPConnections int `toml:"max_tcp_connections"` + + sync.Mutex + // Lock for preventing a data race during resource cleanup + cleanup sync.Mutex + wg sync.WaitGroup + + in chan []byte + done chan struct{} + // accept channel tracks how many active connections there are, if there + // is an available bool in accept, then we are below the maximum and can + // accept the connection + accept chan bool + + // track the listener here so we can close it in Stop() + listener *net.TCPListener + // track current connections so we can close them in Stop() + conns map[string]*net.TCPConn + + parser parsers.Parser + acc telegraf.Accumulator +} + +var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + + "You may want to increase allowed_pending_messages in the config\n" + +const sampleConfig = ` + ## Address and port to host TCP listener on + service_address = ":8094" + + ## Number of TCP messages allowed to queue up. Once filled, the + ## TCP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## Maximum number of concurrent TCP connections to allow + max_tcp_connections = 250 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (t *TcpListener) SampleConfig() string { + return sampleConfig +} + +func (t *TcpListener) Description() string { + return "Generic TCP listener" +} + +// All the work is done in the Start() function, so this is just a dummy +// function. +func (t *TcpListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (t *TcpListener) SetParser(parser parsers.Parser) { + t.parser = parser +} + +// Start starts the tcp listener service. +func (t *TcpListener) Start(acc telegraf.Accumulator) error { + t.Lock() + defer t.Unlock() + + t.acc = acc + t.in = make(chan []byte, t.AllowedPendingMessages) + t.done = make(chan struct{}) + t.accept = make(chan bool, t.MaxTCPConnections) + t.conns = make(map[string]*net.TCPConn) + for i := 0; i < t.MaxTCPConnections; i++ { + t.accept <- true + } + + // Start listener + var err error + address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress) + t.listener, err = net.ListenTCP("tcp", address) + if err != nil { + log.Fatalf("ERROR: ListenUDP - %s", err) + return err + } + log.Println("TCP server listening on: ", t.listener.Addr().String()) + + t.wg.Add(2) + go t.tcpListen() + go t.tcpParser() + + log.Printf("Started TCP listener service on %s\n", t.ServiceAddress) + return nil +} + +// Stop cleans up all resources +func (t *TcpListener) Stop() { + t.Lock() + defer t.Unlock() + close(t.done) + t.listener.Close() + + // Close all open TCP connections + // - get all conns from the t.conns map and put into slice + // - this is so the forget() function doesnt conflict with looping + // over the t.conns map + var conns []*net.TCPConn + t.cleanup.Lock() + for _, conn := range t.conns { + conns = append(conns, conn) + } + t.cleanup.Unlock() + for _, conn := range conns { + conn.Close() + } + + t.wg.Wait() + close(t.in) + log.Println("Stopped TCP listener service on ", t.ServiceAddress) +} + +// tcpListen listens for incoming TCP connections. +func (t *TcpListener) tcpListen() error { + defer t.wg.Done() + + for { + select { + case <-t.done: + return nil + default: + // Accept connection: + conn, err := t.listener.AcceptTCP() + if err != nil { + return err + } + + log.Printf("Received TCP Connection from %s", conn.RemoteAddr()) + + select { + case <-t.accept: + // not over connection limit, handle the connection properly. + t.wg.Add(1) + // generate a random id for this TCPConn + id := internal.RandomString(6) + t.remember(id, conn) + go t.handler(conn, id) + default: + // We are over the connection limit, refuse & close. + t.refuser(conn) + } + } + } +} + +// refuser refuses a TCP connection +func (t *TcpListener) refuser(conn *net.TCPConn) { + // Tell the connection why we are closing. + fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+ + " reached, closing.\nYou may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", t.MaxTCPConnections) + conn.Close() + log.Printf("Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("WARNING: Maximum TCP Connections reached, you may want to" + + " adjust max_tcp_connections") +} + +// handler handles a single TCP Connection +func (t *TcpListener) handler(conn *net.TCPConn, id string) { + // connection cleanup function + defer func() { + t.wg.Done() + conn.Close() + log.Printf("Closed TCP Connection from %s", conn.RemoteAddr()) + // Add one connection potential back to channel when this one closes + t.accept <- true + t.forget(id) + }() + + scanner := bufio.NewScanner(conn) + for { + select { + case <-t.done: + return + default: + if !scanner.Scan() { + return + } + buf := scanner.Bytes() + select { + case t.in <- buf: + default: + log.Printf(dropwarn, string(buf)) + } + } + } +} + +// tcpParser parses the incoming tcp byte packets +func (t *TcpListener) tcpParser() error { + defer t.wg.Done() + for { + select { + case <-t.done: + return nil + case packet := <-t.in: + if len(packet) == 0 { + continue + } + metrics, err := t.parser.Parse(packet) + if err == nil { + t.storeMetrics(metrics) + } else { + log.Printf("Malformed packet: [%s], Error: %s\n", + string(packet), err) + } + } + } +} + +func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error { + t.Lock() + defer t.Unlock() + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + return nil +} + +// forget a TCP connection +func (t *TcpListener) forget(id string) { + t.cleanup.Lock() + defer t.cleanup.Unlock() + delete(t.conns, id) +} + +// remember a TCP connection +func (t *TcpListener) remember(id string, conn *net.TCPConn) { + t.cleanup.Lock() + defer t.cleanup.Unlock() + t.conns[id] = conn +} + +func init() { + inputs.Add("tcp_listener", func() telegraf.Input { + return &TcpListener{} + }) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go new file mode 100644 index 000000000..b4aec9dd2 --- /dev/null +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -0,0 +1,259 @@ +package tcp_listener + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" + + testMsgs = ` +cpu_load_short,host=server02 value=12.0 1422568543702900257 +cpu_load_short,host=server03 value=12.0 1422568543702900257 +cpu_load_short,host=server04 value=12.0 1422568543702900257 +cpu_load_short,host=server05 value=12.0 1422568543702900257 +cpu_load_short,host=server06 value=12.0 1422568543702900257 +` +) + +func newTestTcpListener() (*TcpListener, chan []byte) { + in := make(chan []byte, 1500) + listener := &TcpListener{ + ServiceAddress: ":8194", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + in: in, + done: make(chan struct{}), + } + return listener, in +} + +func TestConnectTCP(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8194", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("tcp", "127.0.0.1:8194") + require.NoError(t, err) + + // send single message to socket + fmt.Fprintf(conn, testMsg) + time.Sleep(time.Millisecond * 15) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) + + // send multiple messages to socket + fmt.Fprintf(conn, testMsgs) + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +// Test that MaxTCPConections is respected +func TestConcurrentConns(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8195", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8195") + buf := make([]byte, 1500) + n, err := conn.Read(buf) + assert.NoError(t, err) + assert.Equal(t, + "Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+ + "You may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", + string(buf[:n])) + + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected when max==1 +func TestConcurrentConns1(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8196", + AllowedPendingMessages: 10000, + MaxTCPConnections: 1, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8196") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8196") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8196") + buf := make([]byte, 1500) + n, err := conn.Read(buf) + assert.NoError(t, err) + assert.Equal(t, + "Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+ + "You may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", + string(buf[:n])) + + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected +func TestCloseConcurrentConns(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8195", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + + listener.Stop() +} + +func TestRunParser(t *testing.T) { + var testmsg = []byte(testMsg) + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + if a := acc.NFields(); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) +} + +func TestRunParserInvalidMsg(t *testing.T) { + var testmsg = []byte("cpu_load_short") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + + if a := acc.NFields(); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +func TestRunParserGraphiteMsg(t *testing.T) { + var testmsg = []byte("cpu.load.graphite 12 1454780029") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "cpu_load_graphite", + map[string]interface{}{"value": float64(12)}) +} + +func TestRunParserJSONMsg(t *testing.T) { + var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "udp_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} From 6139a69fa86e84961b8717984e3c947477daa5e7 Mon Sep 17 00:00:00 2001 From: Matt Morrison Date: Mon, 7 Mar 2016 17:13:29 +1300 Subject: [PATCH 09/22] [SNMP Input] SNMPMap() loops forever if table has more than 32 entries closes #800 closes #801 --- plugins/inputs/snmp/snmp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go index 3d4827fc1..2af293d57 100644 --- a/plugins/inputs/snmp/snmp.go +++ b/plugins/inputs/snmp/snmp.go @@ -464,13 +464,14 @@ func (h *Host) SNMPMap(acc telegraf.Accumulator) error { // To get mapping between instance id // and instance name oid_asked := table.mappingTable + oid_next := oid_asked need_more_requests := true // Set max repetition maxRepetition := uint8(32) // Launch requests for need_more_requests { // Launch request - result, err3 := snmpClient.GetBulk([]string{oid_asked}, 0, maxRepetition) + result, err3 := snmpClient.GetBulk([]string{oid_next}, 0, maxRepetition) if err3 != nil { return err3 } @@ -572,6 +573,7 @@ func (h *Host) SNMPMap(acc telegraf.Accumulator) error { // Determine if we need more requests if strings.HasPrefix(lastOid, oid_asked) { need_more_requests = true + oid_next = lastOid } else { need_more_requests = false } From 41534c73f0af3512465d85a8606566225bcd2b14 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 7 Mar 2016 13:56:10 +0100 Subject: [PATCH 10/22] mqtt_consumer: option to set persistent session and client ID closes #797 --- CHANGELOG.md | 1 + plugins/inputs/mqtt_consumer/mqtt_consumer.go | 22 ++++++++- .../mqtt_consumer/mqtt_consumer_test.go | 48 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe87e41dc..2640d3f21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! - [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. +- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 42cadfd60..e36889703 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -26,6 +26,9 @@ type MQTTConsumer struct { // Legacy metric buffer support MetricBuffer int + PersistentSession bool + ClientID string `toml:"client_id"` + // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file @@ -57,6 +60,13 @@ var sampleConfig = ` "sensors/#", ] + # if true, messages that can't be delivered while the subscriber is offline + # will be delivered when it comes back (such as on service restart). + # NOTE: if true, client_id MUST be set + persistent_session = false + # If empty, a random client ID will be generated. + client_id = "" + ## username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" @@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() + if m.PersistentSession && m.ClientID == "" { + return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + + " = true, you MUST also set client_id") + } + m.acc = acc if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) @@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() - opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + if m.ClientID == "" { + opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + } else { + opts.SetClientID(m.ClientID) + } tlsCfg, err := internal.GetTLSConfig( m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) @@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { } opts.SetAutoReconnect(true) opts.SetKeepAlive(time.Second * 60) + opts.SetCleanSession(!m.PersistentSession) return opts, nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index b1dd59bcf..e926ebbb2 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -7,6 +7,8 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" ) @@ -28,6 +30,52 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { return n, in } +// Test that default client has random ID +func TestRandomClientID(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}} + opts, err := m1.createOpts() + assert.NoError(t, err) + + m2 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}} + opts2, err2 := m2.createOpts() + assert.NoError(t, err2) + + assert.NotEqual(t, opts.ClientID, opts2.ClientID) +} + +// Test that default client has random ID +func TestClientID(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + ClientID: "telegraf-test", + } + opts, err := m1.createOpts() + assert.NoError(t, err) + + m2 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + ClientID: "telegraf-test", + } + opts2, err2 := m2.createOpts() + assert.NoError(t, err2) + + assert.Equal(t, "telegraf-test", opts2.ClientID) + assert.Equal(t, "telegraf-test", opts.ClientID) +} + +// Test that Start() fails if client ID is not set but persistent is +func TestPersistentClientIDFail(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + PersistentSession: true, + } + acc := testutil.Accumulator{} + err := m1.Start(&acc) + assert.Error(t, err) +} + // Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestMQTTConsumer() From 240f99478ad847def48f0a4144fe095d50bb1c79 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 7 Mar 2016 15:46:23 +0100 Subject: [PATCH 11/22] Prevent Inf and NaN from being added, and unit test Accumulator closes #803 --- agent/accumulator.go | 7 +- agent/accumulator_test.go | 302 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 agent/accumulator_test.go diff --git a/agent/accumulator.go b/agent/accumulator.go index b04ff2b53..7ec22cd7f 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -105,7 +105,6 @@ func (ac *accumulator) AddFields( continue } } - result[k] = v // Validate uint64 and float64 fields switch val := v.(type) { @@ -116,6 +115,7 @@ func (ac *accumulator) AddFields( } else { result[k] = int64(9223372036854775807) } + continue case float64: // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { @@ -127,6 +127,8 @@ func (ac *accumulator) AddFields( continue } } + + result[k] = v } fields = nil if len(result) == 0 { @@ -168,5 +170,8 @@ func (ac *accumulator) setDefaultTags(tags map[string]string) { } func (ac *accumulator) addDefaultTag(key, value string) { + if ac.defaultTags == nil { + ac.defaultTags = make(map[string]string) + } ac.defaultTags[key] = value } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go new file mode 100644 index 000000000..05f9b02aa --- /dev/null +++ b/agent/accumulator_test.go @@ -0,0 +1,302 @@ +package agent + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/models" + + "github.com/stretchr/testify/assert" +) + +func TestAdd(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + actual) +} + +func TestAddDefaultTags(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,default=tag value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()), + actual) +} + +func TestAddFields(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": float64(99), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest usage=99") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test usage=99") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()), + actual) +} + +// Test that all Inf fields get dropped, and not added to metrics channel +func TestAddInfFields(t *testing.T) { + inf := math.Inf(1) + ninf := math.Inf(-1) + + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": inf, + "nusage": ninf, + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + assert.Len(t, a.metrics, 0) + + // test that non-inf fields are kept and not dropped + fields["notinf"] = float64(100) + a.AddFields("acctest", fields, map[string]string{}) + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest notinf=100") +} + +// Test that nan fields are dropped and not added +func TestAddNaNFields(t *testing.T) { + nan := math.NaN() + + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": nan, + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + assert.Len(t, a.metrics, 0) + + // test that non-nan fields are kept and not dropped + fields["notnan"] = float64(100) + a.AddFields("acctest", fields, map[string]string{}) + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest notnan=100") +} + +func TestAddUint64Fields(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": uint64(99), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest usage=99i") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test usage=99i") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test usage=99i %d", now.UnixNano()), + actual) +} + +func TestAddUint64Overflow(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": uint64(9223372036854775808), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest usage=9223372036854775807i") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test usage=9223372036854775807i") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test usage=9223372036854775807i %d", now.UnixNano()), + actual) +} + +func TestAddInts(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", int(101), map[string]string{}) + a.Add("acctest", int32(101), map[string]string{"acc": "test"}) + a.Add("acctest", int64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,default=tag value=101i") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=101i") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=101i %d", now.UnixNano()), + actual) +} + +func TestAddFloats(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", float32(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()), + actual) +} + +func TestAddStrings(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", "test", map[string]string{"acc": "test"}) + a.Add("acctest", "foo", map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=\"test\"") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=\"foo\" %d", now.UnixNano()), + actual) +} + +func TestAddBools(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", true, map[string]string{"acc": "test"}) + a.Add("acctest", false, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=true") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()), + actual) +} From cd66e203bd8e00d6cfa2e70b143562af27f2159f Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Sun, 6 Mar 2016 01:42:14 -0500 Subject: [PATCH 12/22] Improve procstat closes #799 --- CHANGELOG.md | 1 + plugins/inputs/procstat/README.md | 4 ++++ plugins/inputs/procstat/spec_processor.go | 10 ++++++++++ 3 files changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2640d3f21..8632e7cb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! - [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. - [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. +- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index 90552c2a6..ef96500a3 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -35,6 +35,10 @@ The above configuration would result in output like: # Measurements Note: prefix can be set by the user, per process. + +Threads related measurement names: +- procstat_[prefix_]num_threads value=5 + File descriptor related measurement names: - procstat_[prefix_]num_fds value=4 diff --git a/plugins/inputs/procstat/spec_processor.go b/plugins/inputs/procstat/spec_processor.go index b09ed4f21..bb248f003 100644 --- a/plugins/inputs/procstat/spec_processor.go +++ b/plugins/inputs/procstat/spec_processor.go @@ -52,6 +52,7 @@ func NewSpecProcessor( } func (p *SpecProcessor) pushMetrics() { + p.pushNThreadsStats() p.pushFDStats() p.pushCtxStats() p.pushIOStats() @@ -60,6 +61,15 @@ func (p *SpecProcessor) pushMetrics() { p.flush() } +func (p *SpecProcessor) pushNThreadsStats() error { + numThreads, err := p.proc.NumThreads() + if err != nil { + return fmt.Errorf("NumThreads error: %s\n", err) + } + p.add("num_threads", numThreads) + return nil +} + func (p *SpecProcessor) pushFDStats() error { fds, err := p.proc.NumFDs() if err != nil { From 0060df987752ad561b6c0684a684d326f89bf5e5 Mon Sep 17 00:00:00 2001 From: Prune Sebastien THOMAS Date: Tue, 1 Mar 2016 14:11:18 -0500 Subject: [PATCH 13/22] added zookeeper_chroot option added a plugin option zookeeper_chroot to set up the kafka endpoint in zookeeper, which may not be / (default). This chroot is then configured in the consumergroup config.Zookeeper.Chroot This is workaround the fact that this plugins does not handle the urls like "zookeeper_server:port/chroot" As the peers are stored in an array, it makes no sens to have them beeing URL. Peers should all be members of the same cluster, so they all have the same chroot. --- plugins/inputs/kafka_consumer/kafka_consumer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bc0d225c6..0d2a49f89 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,6 +17,7 @@ type Kafka struct { ConsumerGroup string Topics []string ZookeeperPeers []string + ZookeeperChroot string Consumer *consumergroup.ConsumerGroup // Legacy metric buffer support @@ -48,6 +49,8 @@ var sampleConfig = ` topics = ["telegraf"] ## an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "/" ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc config := consumergroup.NewConfig() + config.Zookeeper.Chroot = k.ZookeeperChroot switch strings.ToLower(k.Offset) { case "oldest", "": config.Offsets.Initial = sarama.OffsetOldest From bd3d0c330f6284b4b47954bc53fb7546a711d3e0 Mon Sep 17 00:00:00 2001 From: Prune Sebastien THOMAS Date: Tue, 1 Mar 2016 14:38:12 -0500 Subject: [PATCH 14/22] parsed with gofmt closes #776 --- CHANGELOG.md | 1 + plugins/inputs/kafka_consumer/kafka_consumer.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8632e7cb6..2eb9fa652 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. - [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. - [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! +- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998! ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 0d2a49f89..07c87199f 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -14,11 +14,11 @@ import ( ) type Kafka struct { - ConsumerGroup string - Topics []string - ZookeeperPeers []string + ConsumerGroup string + Topics []string + ZookeeperPeers []string ZookeeperChroot string - Consumer *consumergroup.ConsumerGroup + Consumer *consumergroup.ConsumerGroup // Legacy metric buffer support MetricBuffer int From 805db7ca5071eaa2eb9275ca212dab10a230c7b9 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Mar 2016 13:11:59 +0100 Subject: [PATCH 15/22] Break out fcgi code into orig Go files, don't ignore errs closes #816 --- plugins/inputs/phpfpm/child.go | 331 ++++++++++++++++++ .../inputs/phpfpm/{phpfpm_fcgi.go => fcgi.go} | 76 ---- plugins/inputs/phpfpm/fcgi_client.go | 86 +++++ plugins/inputs/phpfpm/fcgi_test.go | 280 +++++++++++++++ plugins/inputs/phpfpm/phpfpm.go | 10 +- 5 files changed, 705 insertions(+), 78 deletions(-) create mode 100644 plugins/inputs/phpfpm/child.go rename plugins/inputs/phpfpm/{phpfpm_fcgi.go => fcgi.go} (79%) create mode 100644 plugins/inputs/phpfpm/fcgi_client.go create mode 100644 plugins/inputs/phpfpm/fcgi_test.go diff --git a/plugins/inputs/phpfpm/child.go b/plugins/inputs/phpfpm/child.go new file mode 100644 index 000000000..2ebdf2ffb --- /dev/null +++ b/plugins/inputs/phpfpm/child.go @@ -0,0 +1,331 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package phpfpm + +// This file implements FastCGI from the perspective of a child process. + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/http/cgi" + "os" + "strings" + "sync" + "time" +) + +// request holds the state for an in-progress request. As soon as it's complete, +// it's converted to an http.Request. +type request struct { + pw *io.PipeWriter + reqId uint16 + params map[string]string + buf [1024]byte + rawParams []byte + keepConn bool +} + +func newRequest(reqId uint16, flags uint8) *request { + r := &request{ + reqId: reqId, + params: map[string]string{}, + keepConn: flags&flagKeepConn != 0, + } + r.rawParams = r.buf[:0] + return r +} + +// parseParams reads an encoded []byte into Params. +func (r *request) parseParams() { + text := r.rawParams + r.rawParams = nil + for len(text) > 0 { + keyLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + valLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + if int(keyLen)+int(valLen) > len(text) { + return + } + key := readString(text, keyLen) + text = text[keyLen:] + val := readString(text, valLen) + text = text[valLen:] + r.params[key] = val + } +} + +// response implements http.ResponseWriter. +type response struct { + req *request + header http.Header + w *bufWriter + wroteHeader bool +} + +func newResponse(c *child, req *request) *response { + return &response{ + req: req, + header: http.Header{}, + w: newWriter(c.conn, typeStdout, req.reqId), + } +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) Write(data []byte) (int, error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + return r.w.Write(data) +} + +func (r *response) WriteHeader(code int) { + if r.wroteHeader { + return + } + r.wroteHeader = true + if code == http.StatusNotModified { + // Must not have body. + r.header.Del("Content-Type") + r.header.Del("Content-Length") + r.header.Del("Transfer-Encoding") + } else if r.header.Get("Content-Type") == "" { + r.header.Set("Content-Type", "text/html; charset=utf-8") + } + + if r.header.Get("Date") == "" { + r.header.Set("Date", time.Now().UTC().Format(http.TimeFormat)) + } + + fmt.Fprintf(r.w, "Status: %d %s\r\n", code, http.StatusText(code)) + r.header.Write(r.w) + r.w.WriteString("\r\n") +} + +func (r *response) Flush() { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + r.w.Flush() +} + +func (r *response) Close() error { + r.Flush() + return r.w.Close() +} + +type child struct { + conn *conn + handler http.Handler + + mu sync.Mutex // protects requests: + requests map[uint16]*request // keyed by request ID +} + +func newChild(rwc io.ReadWriteCloser, handler http.Handler) *child { + return &child{ + conn: newConn(rwc), + handler: handler, + requests: make(map[uint16]*request), + } +} + +func (c *child) serve() { + defer c.conn.Close() + defer c.cleanUp() + var rec record + for { + if err := rec.read(c.conn.rwc); err != nil { + return + } + if err := c.handleRecord(&rec); err != nil { + return + } + } +} + +var errCloseConn = errors.New("fcgi: connection should be closed") + +var emptyBody = ioutil.NopCloser(strings.NewReader("")) + +// ErrRequestAborted is returned by Read when a handler attempts to read the +// body of a request that has been aborted by the web server. +var ErrRequestAborted = errors.New("fcgi: request aborted by web server") + +// ErrConnClosed is returned by Read when a handler attempts to read the body of +// a request after the connection to the web server has been closed. +var ErrConnClosed = errors.New("fcgi: connection to web server closed") + +func (c *child) handleRecord(rec *record) error { + c.mu.Lock() + req, ok := c.requests[rec.h.Id] + c.mu.Unlock() + if !ok && rec.h.Type != typeBeginRequest && rec.h.Type != typeGetValues { + // The spec says to ignore unknown request IDs. + return nil + } + + switch rec.h.Type { + case typeBeginRequest: + if req != nil { + // The server is trying to begin a request with the same ID + // as an in-progress request. This is an error. + return errors.New("fcgi: received ID that is already in-flight") + } + + var br beginRequest + if err := br.read(rec.content()); err != nil { + return err + } + if br.role != roleResponder { + c.conn.writeEndRequest(rec.h.Id, 0, statusUnknownRole) + return nil + } + req = newRequest(rec.h.Id, br.flags) + c.mu.Lock() + c.requests[rec.h.Id] = req + c.mu.Unlock() + return nil + case typeParams: + // NOTE(eds): Technically a key-value pair can straddle the boundary + // between two packets. We buffer until we've received all parameters. + if len(rec.content()) > 0 { + req.rawParams = append(req.rawParams, rec.content()...) + return nil + } + req.parseParams() + return nil + case typeStdin: + content := rec.content() + if req.pw == nil { + var body io.ReadCloser + if len(content) > 0 { + // body could be an io.LimitReader, but it shouldn't matter + // as long as both sides are behaving. + body, req.pw = io.Pipe() + } else { + body = emptyBody + } + go c.serveRequest(req, body) + } + if len(content) > 0 { + // TODO(eds): This blocks until the handler reads from the pipe. + // If the handler takes a long time, it might be a problem. + req.pw.Write(content) + } else if req.pw != nil { + req.pw.Close() + } + return nil + case typeGetValues: + values := map[string]string{"FCGI_MPXS_CONNS": "1"} + c.conn.writePairs(typeGetValuesResult, 0, values) + return nil + case typeData: + // If the filter role is implemented, read the data stream here. + return nil + case typeAbortRequest: + c.mu.Lock() + delete(c.requests, rec.h.Id) + c.mu.Unlock() + c.conn.writeEndRequest(rec.h.Id, 0, statusRequestComplete) + if req.pw != nil { + req.pw.CloseWithError(ErrRequestAborted) + } + if !req.keepConn { + // connection will close upon return + return errCloseConn + } + return nil + default: + b := make([]byte, 8) + b[0] = byte(rec.h.Type) + c.conn.writeRecord(typeUnknownType, 0, b) + return nil + } +} + +func (c *child) serveRequest(req *request, body io.ReadCloser) { + r := newResponse(c, req) + httpReq, err := cgi.RequestFromMap(req.params) + if err != nil { + // there was an error reading the request + r.WriteHeader(http.StatusInternalServerError) + c.conn.writeRecord(typeStderr, req.reqId, []byte(err.Error())) + } else { + httpReq.Body = body + c.handler.ServeHTTP(r, httpReq) + } + r.Close() + c.mu.Lock() + delete(c.requests, req.reqId) + c.mu.Unlock() + c.conn.writeEndRequest(req.reqId, 0, statusRequestComplete) + + // Consume the entire body, so the host isn't still writing to + // us when we close the socket below in the !keepConn case, + // otherwise we'd send a RST. (golang.org/issue/4183) + // TODO(bradfitz): also bound this copy in time. Or send + // some sort of abort request to the host, so the host + // can properly cut off the client sending all the data. + // For now just bound it a little and + io.CopyN(ioutil.Discard, body, 100<<20) + body.Close() + + if !req.keepConn { + c.conn.Close() + } +} + +func (c *child) cleanUp() { + c.mu.Lock() + defer c.mu.Unlock() + for _, req := range c.requests { + if req.pw != nil { + // race with call to Close in c.serveRequest doesn't matter because + // Pipe(Reader|Writer).Close are idempotent + req.pw.CloseWithError(ErrConnClosed) + } + } +} + +// Serve accepts incoming FastCGI connections on the listener l, creating a new +// goroutine for each. The goroutine reads requests and then calls handler +// to reply to them. +// If l is nil, Serve accepts connections from os.Stdin. +// If handler is nil, http.DefaultServeMux is used. +func Serve(l net.Listener, handler http.Handler) error { + if l == nil { + var err error + l, err = net.FileListener(os.Stdin) + if err != nil { + return err + } + defer l.Close() + } + if handler == nil { + handler = http.DefaultServeMux + } + for { + rw, err := l.Accept() + if err != nil { + return err + } + c := newChild(rw, handler) + go c.serve() + } +} diff --git a/plugins/inputs/phpfpm/phpfpm_fcgi.go b/plugins/inputs/phpfpm/fcgi.go similarity index 79% rename from plugins/inputs/phpfpm/phpfpm_fcgi.go rename to plugins/inputs/phpfpm/fcgi.go index 03aac7634..689660ea0 100644 --- a/plugins/inputs/phpfpm/phpfpm_fcgi.go +++ b/plugins/inputs/phpfpm/fcgi.go @@ -17,11 +17,6 @@ import ( "errors" "io" "sync" - - "net" - "strconv" - - "strings" ) // recType is a record type, as defined by @@ -277,74 +272,3 @@ func (w *streamWriter) Close() error { // send empty record to close the stream return w.c.writeRecord(w.recType, w.reqId, nil) } - -func NewClient(h string, args ...interface{}) (fcgi *conn, err error) { - var con net.Conn - if len(args) != 1 { - err = errors.New("fcgi: not enough params") - return - } - switch args[0].(type) { - case int: - addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) - con, err = net.Dial("tcp", addr) - case string: - laddr := net.UnixAddr{Name: args[0].(string), Net: h} - con, err = net.DialUnix(h, nil, &laddr) - default: - err = errors.New("fcgi: we only accept int (port) or string (socket) params.") - } - fcgi = &conn{ - rwc: con, - } - return -} - -func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) { - defer client.rwc.Close() - var reqId uint16 = 1 - - err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) - if err != nil { - return - } - - err = client.writePairs(typeParams, reqId, env) - if err != nil { - return - } - - if len(requestData) > 0 { - if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { - return - } - } - - rec := &record{} - var err1 error - - // recive untill EOF or FCGI_END_REQUEST -READ_LOOP: - for { - err1 = rec.read(client.rwc) - if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { - if err1 != io.EOF { - err = err1 - } - break - } - - switch { - case rec.h.Type == typeStdout: - retout = append(retout, rec.content()...) - case rec.h.Type == typeStderr: - reterr = append(reterr, rec.content()...) - case rec.h.Type == typeEndRequest: - fallthrough - default: - break READ_LOOP - } - } - - return -} diff --git a/plugins/inputs/phpfpm/fcgi_client.go b/plugins/inputs/phpfpm/fcgi_client.go new file mode 100644 index 000000000..56978ad3a --- /dev/null +++ b/plugins/inputs/phpfpm/fcgi_client.go @@ -0,0 +1,86 @@ +package phpfpm + +import ( + "errors" + "io" + "net" + "strconv" + "strings" +) + +// Create an fcgi client +func newFcgiClient(h string, args ...interface{}) (*conn, error) { + var con net.Conn + if len(args) != 1 { + return nil, errors.New("fcgi: not enough params") + } + + var err error + switch args[0].(type) { + case int: + addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) + con, err = net.Dial("tcp", addr) + case string: + laddr := net.UnixAddr{Name: args[0].(string), Net: h} + con, err = net.DialUnix(h, nil, &laddr) + default: + err = errors.New("fcgi: we only accept int (port) or string (socket) params.") + } + fcgi := &conn{ + rwc: con, + } + + return fcgi, err +} + +func (client *conn) Request( + env map[string]string, + requestData string, +) (retout []byte, reterr []byte, err error) { + defer client.rwc.Close() + var reqId uint16 = 1 + + err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) + if err != nil { + return + } + + err = client.writePairs(typeParams, reqId, env) + if err != nil { + return + } + + if len(requestData) > 0 { + if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { + return + } + } + + rec := &record{} + var err1 error + + // recive untill EOF or FCGI_END_REQUEST +READ_LOOP: + for { + err1 = rec.read(client.rwc) + if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { + if err1 != io.EOF { + err = err1 + } + break + } + + switch { + case rec.h.Type == typeStdout: + retout = append(retout, rec.content()...) + case rec.h.Type == typeStderr: + reterr = append(reterr, rec.content()...) + case rec.h.Type == typeEndRequest: + fallthrough + default: + break READ_LOOP + } + } + + return +} diff --git a/plugins/inputs/phpfpm/fcgi_test.go b/plugins/inputs/phpfpm/fcgi_test.go new file mode 100644 index 000000000..15e0030a7 --- /dev/null +++ b/plugins/inputs/phpfpm/fcgi_test.go @@ -0,0 +1,280 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package phpfpm + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "net/http" + "testing" +) + +var sizeTests = []struct { + size uint32 + bytes []byte +}{ + {0, []byte{0x00}}, + {127, []byte{0x7F}}, + {128, []byte{0x80, 0x00, 0x00, 0x80}}, + {1000, []byte{0x80, 0x00, 0x03, 0xE8}}, + {33554431, []byte{0x81, 0xFF, 0xFF, 0xFF}}, +} + +func TestSize(t *testing.T) { + b := make([]byte, 4) + for i, test := range sizeTests { + n := encodeSize(b, test.size) + if !bytes.Equal(b[:n], test.bytes) { + t.Errorf("%d expected %x, encoded %x", i, test.bytes, b) + } + size, n := readSize(test.bytes) + if size != test.size { + t.Errorf("%d expected %d, read %d", i, test.size, size) + } + if len(test.bytes) != n { + t.Errorf("%d did not consume all the bytes", i) + } + } +} + +var streamTests = []struct { + desc string + recType recType + reqId uint16 + content []byte + raw []byte +}{ + {"single record", typeStdout, 1, nil, + []byte{1, byte(typeStdout), 0, 1, 0, 0, 0, 0}, + }, + // this data will have to be split into two records + {"two records", typeStdin, 300, make([]byte, 66000), + bytes.Join([][]byte{ + // header for the first record + {1, byte(typeStdin), 0x01, 0x2C, 0xFF, 0xFF, 1, 0}, + make([]byte, 65536), + // header for the second + {1, byte(typeStdin), 0x01, 0x2C, 0x01, 0xD1, 7, 0}, + make([]byte, 472), + // header for the empty record + {1, byte(typeStdin), 0x01, 0x2C, 0, 0, 0, 0}, + }, + nil), + }, +} + +type nilCloser struct { + io.ReadWriter +} + +func (c *nilCloser) Close() error { return nil } + +func TestStreams(t *testing.T) { + var rec record +outer: + for _, test := range streamTests { + buf := bytes.NewBuffer(test.raw) + var content []byte + for buf.Len() > 0 { + if err := rec.read(buf); err != nil { + t.Errorf("%s: error reading record: %v", test.desc, err) + continue outer + } + content = append(content, rec.content()...) + } + if rec.h.Type != test.recType { + t.Errorf("%s: got type %d expected %d", test.desc, rec.h.Type, test.recType) + continue + } + if rec.h.Id != test.reqId { + t.Errorf("%s: got request ID %d expected %d", test.desc, rec.h.Id, test.reqId) + continue + } + if !bytes.Equal(content, test.content) { + t.Errorf("%s: read wrong content", test.desc) + continue + } + buf.Reset() + c := newConn(&nilCloser{buf}) + w := newWriter(c, test.recType, test.reqId) + if _, err := w.Write(test.content); err != nil { + t.Errorf("%s: error writing record: %v", test.desc, err) + continue + } + if err := w.Close(); err != nil { + t.Errorf("%s: error closing stream: %v", test.desc, err) + continue + } + if !bytes.Equal(buf.Bytes(), test.raw) { + t.Errorf("%s: wrote wrong content", test.desc) + } + } +} + +type writeOnlyConn struct { + buf []byte +} + +func (c *writeOnlyConn) Write(p []byte) (int, error) { + c.buf = append(c.buf, p...) + return len(p), nil +} + +func (c *writeOnlyConn) Read(p []byte) (int, error) { + return 0, errors.New("conn is write-only") +} + +func (c *writeOnlyConn) Close() error { + return nil +} + +func TestGetValues(t *testing.T) { + var rec record + rec.h.Type = typeGetValues + + wc := new(writeOnlyConn) + c := newChild(wc, nil) + err := c.handleRecord(&rec) + if err != nil { + t.Fatalf("handleRecord: %v", err) + } + + const want = "\x01\n\x00\x00\x00\x12\x06\x00" + + "\x0f\x01FCGI_MPXS_CONNS1" + + "\x00\x00\x00\x00\x00\x00\x01\n\x00\x00\x00\x00\x00\x00" + if got := string(wc.buf); got != want { + t.Errorf(" got: %q\nwant: %q\n", got, want) + } +} + +func nameValuePair11(nameData, valueData string) []byte { + return bytes.Join( + [][]byte{ + {byte(len(nameData)), byte(len(valueData))}, + []byte(nameData), + []byte(valueData), + }, + nil, + ) +} + +func makeRecord( + recordType recType, + requestId uint16, + contentData []byte, +) []byte { + requestIdB1 := byte(requestId >> 8) + requestIdB0 := byte(requestId) + + contentLength := len(contentData) + contentLengthB1 := byte(contentLength >> 8) + contentLengthB0 := byte(contentLength) + return bytes.Join([][]byte{ + {1, byte(recordType), requestIdB1, requestIdB0, contentLengthB1, + contentLengthB0, 0, 0}, + contentData, + }, + nil) +} + +// a series of FastCGI records that start a request and begin sending the +// request body +var streamBeginTypeStdin = bytes.Join([][]byte{ + // set up request 1 + makeRecord(typeBeginRequest, 1, + []byte{0, byte(roleResponder), 0, 0, 0, 0, 0, 0}), + // add required parameters to request 1 + makeRecord(typeParams, 1, nameValuePair11("REQUEST_METHOD", "GET")), + makeRecord(typeParams, 1, nameValuePair11("SERVER_PROTOCOL", "HTTP/1.1")), + makeRecord(typeParams, 1, nil), + // begin sending body of request 1 + makeRecord(typeStdin, 1, []byte("0123456789abcdef")), +}, + nil) + +var cleanUpTests = []struct { + input []byte + err error +}{ + // confirm that child.handleRecord closes req.pw after aborting req + { + bytes.Join([][]byte{ + streamBeginTypeStdin, + makeRecord(typeAbortRequest, 1, nil), + }, + nil), + ErrRequestAborted, + }, + // confirm that child.serve closes all pipes after error reading record + { + bytes.Join([][]byte{ + streamBeginTypeStdin, + nil, + }, + nil), + ErrConnClosed, + }, +} + +type nopWriteCloser struct { + io.ReadWriter +} + +func (nopWriteCloser) Close() error { + return nil +} + +// Test that child.serve closes the bodies of aborted requests and closes the +// bodies of all requests before returning. Causes deadlock if either condition +// isn't met. See issue 6934. +func TestChildServeCleansUp(t *testing.T) { + for _, tt := range cleanUpTests { + input := make([]byte, len(tt.input)) + copy(input, tt.input) + rc := nopWriteCloser{bytes.NewBuffer(input)} + done := make(chan bool) + c := newChild(rc, http.HandlerFunc(func( + w http.ResponseWriter, + r *http.Request, + ) { + // block on reading body of request + _, err := io.Copy(ioutil.Discard, r.Body) + if err != tt.err { + t.Errorf("Expected %#v, got %#v", tt.err, err) + } + // not reached if body of request isn't closed + done <- true + })) + go c.serve() + // wait for body of request to be closed or all goroutines to block + <-done + } +} + +type rwNopCloser struct { + io.Reader + io.Writer +} + +func (rwNopCloser) Close() error { + return nil +} + +// Verifies it doesn't crash. Issue 11824. +func TestMalformedParams(t *testing.T) { + input := []byte{ + // beginRequest, requestId=1, contentLength=8, role=1, keepConn=1 + 1, 1, 0, 1, 0, 8, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, + // params, requestId=1, contentLength=10, k1Len=50, v1Len=50 (malformed, wrong length) + 1, 4, 0, 1, 0, 10, 0, 0, 50, 50, 3, 4, 5, 6, 7, 8, 9, 10, + // end of params + 1, 4, 0, 1, 0, 0, 0, 0, + } + rw := rwNopCloser{bytes.NewReader(input), ioutil.Discard} + c := newChild(rw, http.DefaultServeMux) + c.serve() +} diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index c07262342..199b0005b 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -112,6 +112,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { statusPath string ) + var err error if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") { u, err := url.Parse(addr) if err != nil { @@ -120,7 +121,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { socketAddr := strings.Split(u.Host, ":") fcgiIp := socketAddr[0] fcgiPort, _ := strconv.Atoi(socketAddr[1]) - fcgi, _ = NewClient(fcgiIp, fcgiPort) + fcgi, err = newFcgiClient(fcgiIp, fcgiPort) } else { socketAddr := strings.Split(addr, ":") if len(socketAddr) >= 2 { @@ -134,8 +135,13 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if _, err := os.Stat(socketPath); os.IsNotExist(err) { return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) } - fcgi, _ = NewClient("unix", socketPath) + fcgi, err = newFcgiClient("unix", socketPath) } + + if err != nil { + return err + } + return g.gatherFcgi(fcgi, statusPath, acc) } From 845abcdd77c0099e2acaa0f74a67f89b3824480b Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Mar 2016 14:44:32 +0100 Subject: [PATCH 16/22] Only log the overwritten metric warning on 1st overwrite per buffer see #807 --- internal/models/running_output.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 9d111c757..33fa4e120 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -82,9 +82,11 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { } } } else { - log.Printf("WARNING: overwriting cached metrics, you may want to " + - "increase the metric_buffer_limit setting in your [agent] " + - "config if you do not wish to overwrite metrics.\n") + if ro.overwriteI == 0 { + log.Printf("WARNING: overwriting cached metrics, you may want to " + + "increase the metric_buffer_limit setting in your [agent] " + + "config if you do not wish to overwrite metrics.\n") + } if ro.overwriteI == len(ro.metrics) { ro.overwriteI = 0 } From b102ae141accd23211bc3009e4a4cf6a3e8547be Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Mar 2016 15:46:37 +0100 Subject: [PATCH 17/22] CONFIGURATION drop->fielddrop --- docs/CONFIGURATION.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 58dbdf261..853dc6d05 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -97,7 +97,7 @@ fields which begin with `time_`. percpu = true totalcpu = false # filter all fields beginning with 'time_' - drop = ["time_*"] + fielddrop = ["time_*"] ``` #### Input Config: tagpass and tagdrop @@ -106,7 +106,7 @@ fields which begin with `time_`. [[inputs.cpu]] percpu = true totalcpu = false - drop = ["cpu_time"] + fielddrop = ["cpu_time"] # Don't collect CPU data for cpu6 & cpu7 [inputs.cpu.tagdrop] cpu = [ "cpu6", "cpu7" ] @@ -199,7 +199,7 @@ to avoid measurement collisions: percpu = true totalcpu = false name_override = "percpu_usage" - drop = ["cpu_time*"] + fielddrop = ["cpu_time*"] ``` ## `[outputs.xxx]` Configuration From 5ffa2a30be196bebdf8038970bbf7ed74895174e Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Sun, 6 Mar 2016 01:04:54 -0500 Subject: [PATCH 18/22] Add processes status stats in system input plugin --- plugins/inputs/system/processes.go | 61 +++++++++++++++++++++++++ plugins/inputs/system/processes_test.go | 21 +++++++++ 2 files changed, 82 insertions(+) create mode 100644 plugins/inputs/system/processes.go create mode 100644 plugins/inputs/system/processes_test.go diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go new file mode 100644 index 000000000..c4b791e3c --- /dev/null +++ b/plugins/inputs/system/processes.go @@ -0,0 +1,61 @@ +package system + +import ( + "fmt" + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/shirou/gopsutil/process" +) + +type Processes struct { +} + +func (_ *Processes) Description() string { + return "Get the number of processes and group them by status (Linux only)" +} + +func (_ *Processes) SampleConfig() string { return "" } + +func (s *Processes) Gather(acc telegraf.Accumulator) error { + pids, err := process.Pids() + if err != nil { + return fmt.Errorf("error getting pids list: %s", err) + } + // TODO handle other OS (Windows/BSD/Solaris/OSX) + fields := map[string]interface{}{ + "paging": uint64(0), + "blocked": uint64(0), + "zombie": uint64(0), + "stopped": uint64(0), + "running": uint64(0), + "sleeping": uint64(0), + } + for _, pid := range pids { + process, err := process.NewProcess(pid) + if err != nil { + log.Printf("Can not get process %d status: %s", pid, err) + continue + } + status, err := process.Status() + if err != nil { + log.Printf("Can not get process %d status: %s\n", pid, err) + continue + } + _, exists := fields[status] + if !exists { + log.Printf("Status '%s' for process with pid: %d\n", status, pid) + continue + } + fields[status] = fields[status].(uint64) + uint64(1) + } + + acc.AddFields("processes", fields, nil) + return nil +} +func init() { + inputs.Add("processes", func() telegraf.Input { + return &Processes{} + }) +} diff --git a/plugins/inputs/system/processes_test.go b/plugins/inputs/system/processes_test.go new file mode 100644 index 000000000..246884711 --- /dev/null +++ b/plugins/inputs/system/processes_test.go @@ -0,0 +1,21 @@ +package system + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcesses(t *testing.T) { + processes := &Processes{} + var acc testutil.Accumulator + + err := processes.Gather(&acc) + require.NoError(t, err) + + assert.True(t, acc.HasUIntField("processes", "running")) + assert.True(t, acc.HasUIntField("processes", "sleeping")) + assert.True(t, acc.HasUIntField("processes", "stopped")) +} From 2f45b8b7f54ce516fc5b4ed2f3191b3f3b726f95 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 8 Mar 2016 11:42:31 +0100 Subject: [PATCH 19/22] Cross platform support for the 'processes' plugin closes #798 --- CHANGELOG.md | 2 + README.md | 4 +- etc/telegraf.conf | 4 + plugins/inputs/system/PROCESSES_README.md | 58 ++++++ plugins/inputs/system/processes.go | 219 ++++++++++++++++++---- plugins/inputs/system/processes_test.go | 133 ++++++++++++- scripts/circle-test.sh | 2 +- 7 files changed, 384 insertions(+), 38 deletions(-) create mode 100644 plugins/inputs/system/PROCESSES_README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eb9fa652..5ef68bd45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. - [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! - [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998! +- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert! ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" @@ -24,6 +25,7 @@ - [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert! - [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package - [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory +- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable. ## v0.10.4.1 diff --git a/README.md b/README.md index e9c20996a..fb9363100 100644 --- a/README.md +++ b/README.md @@ -214,11 +214,13 @@ Currently implemented sources: * disk * diskio * swap + * processes Telegraf can also collect metrics via the following service plugins: * statsd -* udp listener +* udp_listener +* tcp_listener * mqtt_consumer * kafka_consumer * nats_consumer diff --git a/etc/telegraf.conf b/etc/telegraf.conf index a6057ecd2..3deb7f895 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -123,6 +123,10 @@ [[inputs.mem]] # no configuration +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration + # Read metrics about swap memory usage [[inputs.swap]] # no configuration diff --git a/plugins/inputs/system/PROCESSES_README.md b/plugins/inputs/system/PROCESSES_README.md new file mode 100644 index 000000000..006e043fb --- /dev/null +++ b/plugins/inputs/system/PROCESSES_README.md @@ -0,0 +1,58 @@ +# Processes Input Plugin + +This plugin gathers info about the total number of processes and groups +them by status (zombie, sleeping, running, etc.) + +On linux this plugin requires access to procfs (/proc), on other OSes +it requires access to execute `ps`. + +### Configuration: + +```toml +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration +``` + +### Measurements & Fields: + +- processes + - blocked (aka disk sleep or uninterruptible sleep) + - running + - sleeping + - stopped + - total + - zombie + - wait (freebsd only) + - idle (bsd only) + - paging (linux only) + - total_threads (linux only) + +### Process State Mappings + +Different OSes use slightly different State codes for their processes, these +state codes are documented in `man ps`, and I will give a mapping of what major +OS state codes correspond to in telegraf metrics: + +``` +Linux FreeBSD Darwin meaning + R R R running + S S S sleeping + Z Z Z zombie + T T T stopped + none I I idle (sleeping for longer than about 20 seconds) + D D,L U blocked (waiting in uninterruptible sleep, or locked) + W W none paging (linux kernel < 2.6 only), wait (freebsd) +``` + +### Tags: + +None + +### Example Output: + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test +* Plugin: processes, Collection 1 +> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042 +``` diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go index c4b791e3c..b7ee32066 100644 --- a/plugins/inputs/system/processes.go +++ b/plugins/inputs/system/processes.go @@ -1,61 +1,216 @@ +// +build !windows + package system import ( + "bytes" "fmt" + "io/ioutil" "log" + "os" + "os/exec" + "path" + "runtime" + "strconv" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/shirou/gopsutil/process" ) type Processes struct { + execPS func() ([]byte, error) + readProcFile func(statFile string) ([]byte, error) + + forcePS bool + forceProc bool } -func (_ *Processes) Description() string { - return "Get the number of processes and group them by status (Linux only)" +func (p *Processes) Description() string { + return "Get the number of processes and group them by status" } -func (_ *Processes) SampleConfig() string { return "" } +func (p *Processes) SampleConfig() string { return "" } -func (s *Processes) Gather(acc telegraf.Accumulator) error { - pids, err := process.Pids() - if err != nil { - return fmt.Errorf("error getting pids list: %s", err) +func (p *Processes) Gather(acc telegraf.Accumulator) error { + // Get an empty map of metric fields + fields := getEmptyFields() + + // Decide if we will use 'ps' to get stats (use procfs otherwise) + usePS := true + if runtime.GOOS == "linux" { + usePS = false } - // TODO handle other OS (Windows/BSD/Solaris/OSX) - fields := map[string]interface{}{ - "paging": uint64(0), - "blocked": uint64(0), - "zombie": uint64(0), - "stopped": uint64(0), - "running": uint64(0), - "sleeping": uint64(0), + if p.forcePS { + usePS = true + } else if p.forceProc { + usePS = false } - for _, pid := range pids { - process, err := process.NewProcess(pid) - if err != nil { - log.Printf("Can not get process %d status: %s", pid, err) - continue + + // Gather stats from 'ps' or procfs + if usePS { + if err := p.gatherFromPS(fields); err != nil { + return err } - status, err := process.Status() - if err != nil { - log.Printf("Can not get process %d status: %s\n", pid, err) - continue + } else { + if err := p.gatherFromProc(fields); err != nil { + return err } - _, exists := fields[status] - if !exists { - log.Printf("Status '%s' for process with pid: %d\n", status, pid) - continue - } - fields[status] = fields[status].(uint64) + uint64(1) } acc.AddFields("processes", fields, nil) return nil } + +// Gets empty fields of metrics based on the OS +func getEmptyFields() map[string]interface{} { + fields := map[string]interface{}{ + "blocked": int64(0), + "zombie": int64(0), + "stopped": int64(0), + "running": int64(0), + "sleeping": int64(0), + "total": int64(0), + } + switch runtime.GOOS { + case "freebsd": + fields["idle"] = int64(0) + fields["wait"] = int64(0) + case "darwin": + fields["idle"] = int64(0) + case "openbsd": + fields["idle"] = int64(0) + case "linux": + fields["paging"] = int64(0) + fields["total_threads"] = int64(0) + } + return fields +} + +// exec `ps` to get all process states +func (p *Processes) gatherFromPS(fields map[string]interface{}) error { + out, err := p.execPS() + if err != nil { + return err + } + + for i, status := range bytes.Fields(out) { + if i == 0 && string(status) == "STAT" { + // This is a header, skip it + continue + } + switch status[0] { + case 'W': + fields["wait"] = fields["wait"].(int64) + int64(1) + case 'U', 'D', 'L': + // Also known as uninterruptible sleep or disk sleep + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombie"] = fields["zombie"].(int64) + int64(1) + case 'T': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'I': + fields["idle"] = fields["idle"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] from ps", + string(status[0])) + } + fields["total"] = fields["total"].(int64) + int64(1) + } + return nil +} + +// get process states from /proc/(pid)/stat files +func (p *Processes) gatherFromProc(fields map[string]interface{}) error { + files, err := ioutil.ReadDir("/proc") + if err != nil { + return err + } + + for _, file := range files { + if !file.IsDir() { + continue + } + + statFile := path.Join("/proc", file.Name(), "stat") + data, err := p.readProcFile(statFile) + if err != nil { + return err + } + if data == nil { + continue + } + + stats := bytes.Fields(data) + if len(stats) < 3 { + return fmt.Errorf("Something is terribly wrong with %s", statFile) + } + switch stats[2][0] { + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'D': + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombies"] = fields["zombies"].(int64) + int64(1) + case 'T', 't': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'W': + fields["paging"] = fields["paging"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] in file %s", + string(stats[2][0]), statFile) + } + fields["total"] = fields["total"].(int64) + int64(1) + + threads, err := strconv.Atoi(string(stats[19])) + if err != nil { + log.Printf("processes: Error parsing thread count: %s", err) + continue + } + fields["total_threads"] = fields["total_threads"].(int64) + int64(threads) + } + return nil +} + +func readProcFile(statFile string) ([]byte, error) { + if _, err := os.Stat(statFile); os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + data, err := ioutil.ReadFile(statFile) + if err != nil { + return nil, err + } + + return data, nil +} + +func execPS() ([]byte, error) { + bin, err := exec.LookPath("ps") + if err != nil { + return nil, err + } + + out, err := exec.Command(bin, "axo", "state").Output() + if err != nil { + return nil, err + } + + return out, err +} + func init() { inputs.Add("processes", func() telegraf.Input { - return &Processes{} + return &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } }) } diff --git a/plugins/inputs/system/processes_test.go b/plugins/inputs/system/processes_test.go index 246884711..0e2b5e105 100644 --- a/plugins/inputs/system/processes_test.go +++ b/plugins/inputs/system/processes_test.go @@ -1,6 +1,8 @@ package system import ( + "fmt" + "runtime" "testing" "github.com/influxdata/telegraf/testutil" @@ -9,13 +11,136 @@ import ( ) func TestProcesses(t *testing.T) { - processes := &Processes{} + processes := &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } var acc testutil.Accumulator err := processes.Gather(&acc) require.NoError(t, err) - assert.True(t, acc.HasUIntField("processes", "running")) - assert.True(t, acc.HasUIntField("processes", "sleeping")) - assert.True(t, acc.HasUIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "running")) + assert.True(t, acc.HasIntField("processes", "sleeping")) + assert.True(t, acc.HasIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "total")) + total, ok := acc.Get("processes") + require.True(t, ok) + assert.True(t, total.Fields["total"].(int64) > 0) } + +func TestFromPS(t *testing.T) { + processes := &Processes{ + execPS: testExecPS, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["blocked"] = int64(1) + fields["running"] = int64(4) + fields["sleeping"] = int64(34) + fields["total"] = int64(39) + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func TestFromPSError(t *testing.T) { + processes := &Processes{ + execPS: testExecPSError, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.Error(t, err) +} + +func TestFromProcFiles(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("This test only runs on linux") + } + tester := tester{} + processes := &Processes{ + readProcFile: tester.testProcFile, + forceProc: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["sleeping"] = tester.calls + fields["total_threads"] = tester.calls * 2 + fields["total"] = tester.calls + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func testExecPS() ([]byte, error) { + return []byte(testPSOut), nil +} + +// struct for counting calls to testProcFile +type tester struct { + calls int64 +} + +func (t *tester) testProcFile(_ string) ([]byte, error) { + t.calls++ + return []byte(fmt.Sprintf(testProcStat, "S", "2")), nil +} + +func testExecPSError() ([]byte, error) { + return []byte(testPSOut), fmt.Errorf("ERROR!") +} + +const testPSOut = ` +STAT +S +S +S +S +R +R +S +S +Ss +Ss +S +SNs +Ss +Ss +S +R+ +S +U +S +S +S +S +Ss +S+ +Ss +S +S+ +S+ +Ss +S+ +Ss +S +R+ +Ss +S +S+ +S+ +Ss +S+ +` + +const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +` diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 9a3e0e678..f0288c73e 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml exit_if_fail telegraf -config $tmpdir/config.toml \ -test -input-filter cpu:mem -mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS +cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz eval "git describe --exact-match HEAD" if [ $? -eq 0 ]; then From 7b09623fa8be8e5fd8046ff47da2cc4923f43f10 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Mar 2016 17:22:34 +0100 Subject: [PATCH 20/22] Add number of users to 'system' plugin see #235 --- CHANGELOG.md | 1 + plugins/inputs/system/SYSTEM_README.md | 35 ++++++++++++++++++++++++++ plugins/inputs/system/system.go | 6 +++++ 3 files changed, 42 insertions(+) create mode 100644 plugins/inputs/system/SYSTEM_README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef68bd45..28ce825c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! - [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998! - [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert! +- [#235](https://github.com/influxdata/telegraf/issues/235): Add number of users to the `system` input plugin. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/system/SYSTEM_README.md b/plugins/inputs/system/SYSTEM_README.md new file mode 100644 index 000000000..fc873c7e8 --- /dev/null +++ b/plugins/inputs/system/SYSTEM_README.md @@ -0,0 +1,35 @@ +# System Input Plugin + +The system plugin gathers general stats on system load, uptime, +and number of users logged in. It is basically equivalent +to the unix `uptime` command. + +### Configuration: + +```toml +# Read metrics about system load & uptime +[[inputs.system]] + # no configuration +``` + +### Measurements & Fields: + +- system + - load1 (float) + - load15 (float) + - load5 (float) + - n_users (integer) + - uptime (integer, seconds) + - uptime_format (string) + +### Tags: + +None + +### Example Output: + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter system -test +* Plugin: system, Collection 1 +> system load1=2.05,load15=2.38,load5=2.03,n_users=4i,uptime=239043i,uptime_format="2 days, 18:24" 1457546165399253452 +``` diff --git a/plugins/inputs/system/system.go b/plugins/inputs/system/system.go index 9922d5a92..42b0310a4 100644 --- a/plugins/inputs/system/system.go +++ b/plugins/inputs/system/system.go @@ -31,11 +31,17 @@ func (_ *SystemStats) Gather(acc telegraf.Accumulator) error { return err } + users, err := host.Users() + if err != nil { + return err + } + fields := map[string]interface{}{ "load1": loadavg.Load1, "load5": loadavg.Load5, "load15": loadavg.Load15, "uptime": hostinfo.Uptime, + "n_users": len(users), "uptime_format": format_uptime(hostinfo.Uptime), } acc.AddFields("system", fields, nil) From aa15e7916e3c705fbde0d42857d30c04363b1b32 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Mar 2016 22:55:26 +0100 Subject: [PATCH 21/22] processes: Fix zombie process procfs panic fixes #822 --- plugins/inputs/system/processes.go | 4 ++-- plugins/inputs/system/processes_test.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go index b7ee32066..aae0e6ba4 100644 --- a/plugins/inputs/system/processes.go +++ b/plugins/inputs/system/processes.go @@ -65,7 +65,7 @@ func (p *Processes) Gather(acc telegraf.Accumulator) error { func getEmptyFields() map[string]interface{} { fields := map[string]interface{}{ "blocked": int64(0), - "zombie": int64(0), + "zombies": int64(0), "stopped": int64(0), "running": int64(0), "sleeping": int64(0), @@ -105,7 +105,7 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error { // Also known as uninterruptible sleep or disk sleep fields["blocked"] = fields["blocked"].(int64) + int64(1) case 'Z': - fields["zombie"] = fields["zombie"].(int64) + int64(1) + fields["zombies"] = fields["zombies"].(int64) + int64(1) case 'T': fields["stopped"] = fields["stopped"].(int64) + int64(1) case 'R': diff --git a/plugins/inputs/system/processes_test.go b/plugins/inputs/system/processes_test.go index 0e2b5e105..de9b6aa5b 100644 --- a/plugins/inputs/system/processes_test.go +++ b/plugins/inputs/system/processes_test.go @@ -40,10 +40,11 @@ func TestFromPS(t *testing.T) { require.NoError(t, err) fields := getEmptyFields() - fields["blocked"] = int64(1) + fields["blocked"] = int64(4) + fields["zombies"] = int64(1) fields["running"] = int64(4) fields["sleeping"] = int64(34) - fields["total"] = int64(39) + fields["total"] = int64(43) acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) } @@ -139,6 +140,10 @@ S S+ S+ Ss +L +U +Z +D S+ ` From 3f2a04b25b5262fdeb8ff3ae996ead7f43287662 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Wed, 9 Mar 2016 11:50:06 -0600 Subject: [PATCH 22/22] Fix build-for-docker Makefile target syntax. closes #819 --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index ef316bd03..c87f78b55 100644 --- a/Makefile +++ b/Makefile @@ -22,8 +22,8 @@ build-windows: ./cmd/telegraf/telegraf.go build-for-docker: - CGO_ENABLED=0 GOOS=linux go -o telegraf -ldflags \ - "-X main.Version=$(VERSION)" \ + CGO_ENABLED=0 GOOS=linux go build -installsuffix cgo -o telegraf -ldflags \ + "-s -X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go # Build with race detector