diff --git a/CHANGELOG.md b/CHANGELOG.md index dda3ba750..4c39f6c53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,12 +3,15 @@ ### Features - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. +- [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio. ### Bugfixes - [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. - [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic. - [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes. +- [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters. +- [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does). ## v1.0 beta 3 [2016-07-18] @@ -60,6 +63,7 @@ should now look like: - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() +- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data ### Bugfixes diff --git a/Godeps b/Godeps index 5caa6a9e2..2b4fce555 100644 --- a/Godeps +++ b/Godeps @@ -44,7 +44,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f -github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c +github.com/shirou/gopsutil ee66bc560c366dd33b9a4046ba0b644caba46bed github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 diff --git a/README.md b/README.md index aa8d9e039..9d2ee3ce1 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,7 @@ Currently implemented sources: * [exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios) * [filestat](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/filestat) * [haproxy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/haproxy) +* [hddtemp](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/hddtemp) * [http_response](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/http_response) * [httpjson](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/httpjson) (generic JSON-emitting http service plugin) * [influxdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/influxdb) diff --git a/accumulator.go b/accumulator.go index 15c5485f8..1fdba8f99 100644 --- a/accumulator.go +++ b/accumulator.go @@ -16,6 +16,8 @@ type Accumulator interface { tags map[string]string, t ...time.Time) + AddError(err error) + Debug() bool SetDebug(enabled bool) diff --git a/agent/accumulator.go b/agent/accumulator.go index 504731720..d80affe68 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "math" + "sync/atomic" "time" "github.com/influxdata/telegraf" @@ -32,9 +33,9 @@ type accumulator struct { inputConfig *internal_models.InputConfig - prefix string - precision time.Duration + + errCount uint64 } func (ac *accumulator) Add( @@ -146,10 +147,6 @@ func (ac *accumulator) AddFields( } timestamp = timestamp.Round(ac.precision) - if ac.prefix != "" { - measurement = ac.prefix + measurement - } - m, err := telegraf.NewMetric(measurement, tags, result, timestamp) if err != nil { log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) @@ -161,6 +158,17 @@ func (ac *accumulator) AddFields( ac.metrics <- m } +// AddError passes a runtime error to the accumulator. +// The error will be tagged with the plugin name and written to the log. +func (ac *accumulator) AddError(err error) { + if err == nil { + return + } + atomic.AddUint64(&ac.errCount, 1) + //TODO suppress/throttle consecutive duplicate errors? + log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err) +} + func (ac *accumulator) Debug() bool { return ac.debug } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 9bf681192..8618d327d 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -1,8 +1,11 @@ package agent import ( + "bytes" "fmt" + "log" "math" + "os" "testing" "time" @@ -10,6 +13,7 @@ import ( "github.com/influxdata/telegraf/internal/models" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAdd(t *testing.T) { @@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) { fmt.Sprintf("acctest value=101 %d", now.UnixNano()), actual) } + +func TestAccAddError(t *testing.T) { + errBuf := bytes.NewBuffer(nil) + log.SetOutput(errBuf) + defer log.SetOutput(os.Stderr) + + a := accumulator{} + a.inputConfig = &internal_models.InputConfig{} + a.inputConfig.Name = "mock_plugin" + + a.AddError(fmt.Errorf("foo")) + a.AddError(fmt.Errorf("bar")) + a.AddError(fmt.Errorf("baz")) + + errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) + assert.EqualValues(t, 3, a.errCount) + require.Len(t, errs, 4) // 4 because of trailing newline + assert.Contains(t, string(errs[0]), "mock_plugin") + assert.Contains(t, string(errs[0]), "foo") + assert.Contains(t, string(errs[1]), "mock_plugin") + assert.Contains(t, string(errs[1]), "bar") + assert.Contains(t, string(errs[2]), "mock_plugin") + assert.Contains(t, string(errs[2]), "baz") +} diff --git a/agent/agent.go b/agent/agent.go index ae520b89e..5ee73512b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -215,6 +215,9 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } + if acc.errCount > 0 { + return fmt.Errorf("Errors encountered during processing") + } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 10e949302..5189d2e3f 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -197,7 +197,7 @@ # # Configuration for Graphite server to send metrics to # [[outputs.graphite]] # ## TCP endpoint for your graphite instance. -# ## If multiple endpoints are configured, the output will be load balanced. +# ## If multiple endpoints are configured, output will be load balanced. # ## Only one of the endpoints will be written to with each iteration. # servers = ["localhost:2003"] # ## Prefix metrics name @@ -436,8 +436,8 @@ ## disk partitions. ## Setting devices will restrict the stats to the specified devices. # devices = ["sda", "sdb"] - ## Uncomment the following line if you do not need disk serial numbers. - # skip_serial_number = true + ## Uncomment the following line if you need disk serial numbers. + # skip_serial_number = false # Get kernel statistics from /proc/stat @@ -465,7 +465,7 @@ # no configuration -# # Read stats from an aerospike server +# # Read stats from aerospike server(s) # [[inputs.aerospike]] # ## Aerospike servers to connect to (with port) # ## This plugin will query all namespaces the aerospike @@ -666,6 +666,13 @@ # container_names = [] # ## Timeout for docker list, info, and stats commands # timeout = "5s" +# +# ## Whether to report for each container per-device blkio (8:0, 8:1...) and +# ## network (eth0, eth1, ...) stats or not +# perdevice = true +# ## Whether to report for each container total blkio and network stats or not +# total = false +# # # Read statistics from one or many dovecot servers @@ -782,9 +789,11 @@ # [[inputs.haproxy]] # ## An array of address to gather stats about. Specify an ip on hostname # ## with optional port. ie localhost, 10.10.3.33:1936, etc. -# -# ## If no servers are specified, then default to 127.0.0.1:1936 -# servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] +# ## Make sure you specify the complete path to the stats endpoint +# ## ie 10.10.3.33:1936/haproxy?stats +# # +# ## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats +# servers = ["http://myhaproxy.com:1936/haproxy?stats"] # ## Or you can also use local socket # ## servers = ["socket:/run/haproxy/admin.sock"] @@ -970,21 +979,35 @@ # # Telegraf plugin for gathering metrics from N Mesos masters # [[inputs.mesos]] -# # Timeout, in ms. +# ## Timeout, in ms. # timeout = 100 -# # A list of Mesos masters, default value is localhost:5050. +# ## A list of Mesos masters. # masters = ["localhost:5050"] -# # Metrics groups to be collected, by default, all enabled. +# ## Master metrics groups to be collected, by default, all enabled. # master_collections = [ # "resources", # "master", # "system", -# "slaves", +# "agents", # "frameworks", +# "tasks", # "messages", # "evqueue", # "registrar", # ] +# ## A list of Mesos slaves, default is [] +# # slaves = [] +# ## Slave metrics groups to be collected, by default, all enabled. +# # slave_collections = [ +# # "resources", +# # "agent", +# # "system", +# # "executors", +# # "tasks", +# # "messages", +# # ] +# ## Include mesos tasks statistics, default is false +# # slave_tasks = true # # Read metrics from one or many MongoDB servers @@ -995,6 +1018,7 @@ # ## mongodb://10.10.3.33:18832, # ## 10.0.0.1:10000, etc. # servers = ["127.0.0.1:27017"] +# gather_perdb_stats = false # # Read metrics from one or many mysql servers @@ -1101,9 +1125,9 @@ # ## file paths for proc files. If empty default paths will be used: # ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6 # ## These can also be overridden with env variables, see README. -# proc_net_netstat = "" -# proc_net_snmp = "" -# proc_net_snmp6 = "" +# proc_net_netstat = "/proc/net/netstat" +# proc_net_snmp = "/proc/net/snmp" +# proc_net_snmp6 = "/proc/net/snmp6" # ## dump metrics with 0 values too # dump_zeros = true @@ -1305,6 +1329,13 @@ # # username = "guest" # # password = "guest" # +# ## Optional SSL Config +# # ssl_ca = "/etc/telegraf/ca.pem" +# # ssl_cert = "/etc/telegraf/cert.pem" +# # ssl_key = "/etc/telegraf/key.pem" +# ## Use SSL but skip chain & host verification +# # insecure_skip_verify = false +# # ## A list of nodes to pull metrics about. If not specified, metrics for # ## all nodes are gathered. # # nodes = ["rabbit@node1", "rabbit@node2"] @@ -1323,6 +1354,7 @@ # ## e.g. # ## tcp://localhost:6379 # ## tcp://:password@192.168.99.100 +# ## unix:///var/run/redis.sock # ## # ## If no servers are specified, then localhost is used as the host. # ## If no port is specified, 6379 is used @@ -1559,6 +1591,8 @@ # ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) # ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) # patterns = ["%{INFLUXDB_HTTPD_LOG}"] +# ## Name of the outputted measurement name. +# measurement = "influxdb_log" # ## Full path(s) to custom pattern files. # custom_pattern_files = [] # ## Custom patterns can also be defined here. Put one pattern per line. @@ -1622,6 +1656,21 @@ # data_format = "influx" +# # Read NSQ topic for metrics. +# [[inputs.nsq_consumer]] +# ## An string representing the NSQD TCP Endpoint +# server = "localhost:4150" +# topic = "telegraf" +# channel = "consumer" +# max_in_flight = 100 +# +# ## Data format to consume. +# ## Each data format has it's own unique set of configuration options, read +# ## more about them here: +# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +# data_format = "influx" + + # # Statsd Server # [[inputs.statsd]] # ## Address and port to host UDP listener on @@ -1725,6 +1774,9 @@ # [inputs.webhooks.github] # path = "/github" # +# [inputs.webhooks.mandrill] +# path = "/mandrill" +# # [inputs.webhooks.rollbar] # path = "/rollbar" diff --git a/internal/config/config.go b/internal/config/config.go index 8f7821624..9408d9efd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -139,7 +139,7 @@ func (c *Config) InputNames() []string { return name } -// Outputs returns a list of strings of the configured inputs. +// Outputs returns a list of strings of the configured outputs. func (c *Config) OutputNames() []string { var name []string for _, output := range c.Outputs { diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 529a13bae..ddb7d4039 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -22,6 +22,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/filestat" _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" + _ "github.com/influxdata/telegraf/plugins/inputs/hddtemp" _ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index dfd768c1a..e3876bd64 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -25,6 +25,8 @@ type Docker struct { Endpoint string ContainerNames []string Timeout internal.Duration + PerDevice bool `toml:"perdevice"` + Total bool `toml:"total"` client DockerClient } @@ -58,6 +60,13 @@ var sampleConfig = ` container_names = [] ## Timeout for docker list, info, and stats commands timeout = "5s" + + ## Whether to report for each container per-device blkio (8:0, 8:1...) and + ## network (eth0, eth1, ...) stats or not + perdevice = true + ## Whether to report for each container total blkio and network stats or not + total = false + ` // Description returns input description @@ -246,7 +255,7 @@ func (d *Docker) gatherContainer( tags[k] = label } - gatherContainerStats(v, acc, tags, container.ID) + gatherContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total) return nil } @@ -256,6 +265,8 @@ func gatherContainerStats( acc telegraf.Accumulator, tags map[string]string, id string, + perDevice bool, + total bool, ) { now := stat.Read @@ -323,6 +334,7 @@ func gatherContainerStats( acc.AddFields("docker_container_cpu", fields, percputags, now) } + totalNetworkStatMap := make(map[string]interface{}) for network, netstats := range stat.Networks { netfields := map[string]interface{}{ "rx_dropped": netstats.RxDropped, @@ -336,12 +348,35 @@ func gatherContainerStats( "container_id": id, } // Create a new network tag dictionary for the "network" tag - nettags := copyTags(tags) - nettags["network"] = network - acc.AddFields("docker_container_net", netfields, nettags, now) + if perDevice { + nettags := copyTags(tags) + nettags["network"] = network + acc.AddFields("docker_container_net", netfields, nettags, now) + } + if total { + for field, value := range netfields { + if field == "container_id" { + continue + } + _, ok := totalNetworkStatMap[field] + if ok { + totalNetworkStatMap[field] = totalNetworkStatMap[field].(uint64) + value.(uint64) + } else { + totalNetworkStatMap[field] = value + } + } + } } - gatherBlockIOMetrics(stat, acc, tags, now, id) + // totalNetworkStatMap could be empty if container is running with --net=host. + if total && len(totalNetworkStatMap) != 0 { + nettags := copyTags(tags) + nettags["network"] = "total" + totalNetworkStatMap["container_id"] = id + acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, now) + } + + gatherBlockIOMetrics(stat, acc, tags, now, id, perDevice, total) } func calculateMemPercent(stat *types.StatsJSON) float64 { @@ -370,6 +405,8 @@ func gatherBlockIOMetrics( tags map[string]string, now time.Time, id string, + perDevice bool, + total bool, ) { blkioStats := stat.BlkioStats // Make a map of devices to their block io stats @@ -431,11 +468,33 @@ func gatherBlockIOMetrics( deviceStatMap[device]["sectors_recursive"] = metric.Value } + totalStatMap := make(map[string]interface{}) for device, fields := range deviceStatMap { - iotags := copyTags(tags) - iotags["device"] = device fields["container_id"] = id - acc.AddFields("docker_container_blkio", fields, iotags, now) + if perDevice { + iotags := copyTags(tags) + iotags["device"] = device + acc.AddFields("docker_container_blkio", fields, iotags, now) + } + if total { + for field, value := range fields { + if field == "container_id" { + continue + } + _, ok := totalStatMap[field] + if ok { + totalStatMap[field] = totalStatMap[field].(uint64) + value.(uint64) + } else { + totalStatMap[field] = value + } + } + } + } + if total { + totalStatMap["container_id"] = id + iotags := copyTags(tags) + iotags["device"] = "total" + acc.AddFields("docker_container_blkio", totalStatMap, iotags, now) } } @@ -480,7 +539,8 @@ func parseSize(sizeStr string) (int64, error) { func init() { inputs.Add("docker", func() telegraf.Input { return &Docker{ - Timeout: internal.Duration{Duration: time.Second * 5}, + PerDevice: true, + Timeout: internal.Duration{Duration: time.Second * 5}, } }) } diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index b1c76f5af..9f2e97f73 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -24,7 +24,7 @@ func TestDockerGatherContainerStats(t *testing.T) { "container_name": "redis", "container_image": "redis/image", } - gatherContainerStats(stats, &acc, tags, "123456789") + gatherContainerStats(stats, &acc, tags, "123456789", true, true) // test docker_container_net measurement netfields := map[string]interface{}{ @@ -42,6 +42,21 @@ func TestDockerGatherContainerStats(t *testing.T) { nettags["network"] = "eth0" acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags) + netfields = map[string]interface{}{ + "rx_dropped": uint64(6), + "rx_bytes": uint64(8), + "rx_errors": uint64(10), + "tx_packets": uint64(12), + "tx_dropped": uint64(6), + "rx_packets": uint64(8), + "tx_errors": uint64(10), + "tx_bytes": uint64(12), + "container_id": "123456789", + } + nettags = copyTags(tags) + nettags["network"] = "total" + acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags) + // test docker_blkio measurement blkiotags := copyTags(tags) blkiotags["device"] = "6:0" @@ -52,6 +67,15 @@ func TestDockerGatherContainerStats(t *testing.T) { } acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags) + blkiotags = copyTags(tags) + blkiotags["device"] = "total" + blkiofields = map[string]interface{}{ + "io_service_bytes_recursive_read": uint64(100), + "io_serviced_recursive_write": uint64(302), + "container_id": "123456789", + } + acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags) + // test docker_container_mem measurement memfields := map[string]interface{}{ "max_usage": uint64(1001), @@ -186,6 +210,17 @@ func testStats() *types.StatsJSON { TxBytes: 4, } + stats.Networks["eth1"] = types.NetworkStats{ + RxDropped: 5, + RxBytes: 6, + RxErrors: 7, + TxPackets: 8, + TxDropped: 5, + RxPackets: 6, + TxErrors: 7, + TxBytes: 8, + } + sbr := types.BlkioStatEntry{ Major: 6, Minor: 0, @@ -198,11 +233,19 @@ func testStats() *types.StatsJSON { Op: "write", Value: 101, } + sr2 := types.BlkioStatEntry{ + Major: 6, + Minor: 1, + Op: "write", + Value: 201, + } stats.BlkioStats.IoServiceBytesRecursive = append( stats.BlkioStats.IoServiceBytesRecursive, sbr) stats.BlkioStats.IoServicedRecursive = append( stats.BlkioStats.IoServicedRecursive, sr) + stats.BlkioStats.IoServicedRecursive = append( + stats.BlkioStats.IoServicedRecursive, sr2) return stats } diff --git a/plugins/inputs/hddtemp/README.md b/plugins/inputs/hddtemp/README.md new file mode 100644 index 000000000..d87ae625d --- /dev/null +++ b/plugins/inputs/hddtemp/README.md @@ -0,0 +1,22 @@ +# Hddtemp Input Plugin + +This plugin reads data from hddtemp daemon + +## Requirements + +Hddtemp should be installed and its daemon running + +## Configuration + +``` +[[inputs.hddtemp]] +## By default, telegraf gathers temps data from all disks detected by the +## hddtemp. +## +## Only collect temps from the selected disks. +## +## A * as the device name will return the temperature values of all disks. +## +# address = "127.0.0.1:7634" +# devices = ["sda", "*"] +``` diff --git a/plugins/inputs/hddtemp/go-hddtemp/LICENSE b/plugins/inputs/hddtemp/go-hddtemp/LICENSE new file mode 100644 index 000000000..d5aed19c6 --- /dev/null +++ b/plugins/inputs/hddtemp/go-hddtemp/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Mendelson Gusmão + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/plugins/inputs/hddtemp/go-hddtemp/hddtemp.go b/plugins/inputs/hddtemp/go-hddtemp/hddtemp.go new file mode 100644 index 000000000..d7d650b79 --- /dev/null +++ b/plugins/inputs/hddtemp/go-hddtemp/hddtemp.go @@ -0,0 +1,61 @@ +package hddtemp + +import ( + "bytes" + "io" + "net" + "strconv" + "strings" +) + +type disk struct { + DeviceName string + Model string + Temperature int32 + Unit string + Status string +} + +func Fetch(address string) ([]disk, error) { + var ( + err error + conn net.Conn + buffer bytes.Buffer + disks []disk + ) + + if conn, err = net.Dial("tcp", address); err != nil { + return nil, err + } + + if _, err = io.Copy(&buffer, conn); err != nil { + return nil, err + } + + fields := strings.Split(buffer.String(), "|") + + for index := 0; index < len(fields)/5; index++ { + status := "" + offset := index * 5 + device := fields[offset+1] + device = device[strings.LastIndex(device, "/")+1:] + + temperatureField := fields[offset+3] + temperature, err := strconv.ParseInt(temperatureField, 10, 32) + + if err != nil { + temperature = 0 + status = temperatureField + } + + disks = append(disks, disk{ + DeviceName: device, + Model: fields[offset+2], + Temperature: int32(temperature), + Unit: fields[offset+4], + Status: status, + }) + } + + return disks, nil +} diff --git a/plugins/inputs/hddtemp/go-hddtemp/hddtemp_test.go b/plugins/inputs/hddtemp/go-hddtemp/hddtemp_test.go new file mode 100644 index 000000000..858e91a90 --- /dev/null +++ b/plugins/inputs/hddtemp/go-hddtemp/hddtemp_test.go @@ -0,0 +1,116 @@ +package hddtemp + +import ( + "net" + "reflect" + "testing" +) + +func TestFetch(t *testing.T) { + l := serve(t, []byte("|/dev/sda|foobar|36|C|")) + defer l.Close() + + disks, err := Fetch(l.Addr().String()) + + if err != nil { + t.Error("expecting err to be nil") + } + + expected := []disk{ + { + DeviceName: "sda", + Model: "foobar", + Temperature: 36, + Unit: "C", + }, + } + + if !reflect.DeepEqual(expected, disks) { + t.Error("disks' slice is different from expected") + } +} + +func TestFetchWrongAddress(t *testing.T) { + _, err := Fetch("127.0.0.1:1") + + if err == nil { + t.Error("expecting err to be non-nil") + } +} + +func TestFetchStatus(t *testing.T) { + l := serve(t, []byte("|/dev/sda|foobar|SLP|C|")) + defer l.Close() + + disks, err := Fetch(l.Addr().String()) + + if err != nil { + t.Error("expecting err to be nil") + } + + expected := []disk{ + { + DeviceName: "sda", + Model: "foobar", + Temperature: 0, + Unit: "C", + Status: "SLP", + }, + } + + if !reflect.DeepEqual(expected, disks) { + t.Error("disks' slice is different from expected") + } +} + +func TestFetchTwoDisks(t *testing.T) { + l := serve(t, []byte("|/dev/hda|ST380011A|46|C||/dev/hdd|ST340016A|SLP|*|")) + defer l.Close() + + disks, err := Fetch(l.Addr().String()) + + if err != nil { + t.Error("expecting err to be nil") + } + + expected := []disk{ + { + DeviceName: "hda", + Model: "ST380011A", + Temperature: 46, + Unit: "C", + }, + { + DeviceName: "hdd", + Model: "ST340016A", + Temperature: 0, + Unit: "*", + Status: "SLP", + }, + } + + if !reflect.DeepEqual(expected, disks) { + t.Error("disks' slice is different from expected") + } +} + +func serve(t *testing.T, data []byte) net.Listener { + l, err := net.Listen("tcp", "127.0.0.1:0") + + if err != nil { + t.Fatal(err) + } + + go func(t *testing.T) { + conn, err := l.Accept() + + if err != nil { + t.Fatal(err) + } + + conn.Write(data) + conn.Close() + }(t) + + return l +} diff --git a/plugins/inputs/hddtemp/hddtemp.go b/plugins/inputs/hddtemp/hddtemp.go new file mode 100644 index 000000000..c1e01c3c6 --- /dev/null +++ b/plugins/inputs/hddtemp/hddtemp.go @@ -0,0 +1,74 @@ +// +build linux + +package hddtemp + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + gohddtemp "github.com/influxdata/telegraf/plugins/inputs/hddtemp/go-hddtemp" +) + +const defaultAddress = "127.0.0.1:7634" + +type HDDTemp struct { + Address string + Devices []string +} + +func (_ *HDDTemp) Description() string { + return "Monitor disks' temperatures using hddtemp" +} + +var hddtempSampleConfig = ` + ## By default, telegraf gathers temps data from all disks detected by the + ## hddtemp. + ## + ## Only collect temps from the selected disks. + ## + ## A * as the device name will return the temperature values of all disks. + ## + # address = "127.0.0.1:7634" + # devices = ["sda", "*"] +` + +func (_ *HDDTemp) SampleConfig() string { + return hddtempSampleConfig +} + +func (h *HDDTemp) Gather(acc telegraf.Accumulator) error { + disks, err := gohddtemp.Fetch(h.Address) + + if err != nil { + return err + } + + for _, disk := range disks { + for _, chosenDevice := range h.Devices { + if chosenDevice == "*" || chosenDevice == disk.DeviceName { + tags := map[string]string{ + "device": disk.DeviceName, + "model": disk.Model, + "unit": disk.Unit, + "status": disk.Status, + } + + fields := map[string]interface{}{ + disk.DeviceName: disk.Temperature, + } + + acc.AddFields("hddtemp", fields, tags) + } + } + } + + return nil +} + +func init() { + inputs.Add("hddtemp", func() telegraf.Input { + return &HDDTemp{ + Address: defaultAddress, + Devices: []string{"*"}, + } + }) +} diff --git a/plugins/inputs/hddtemp/hddtemp_nocompile.go b/plugins/inputs/hddtemp/hddtemp_nocompile.go new file mode 100644 index 000000000..0c5801670 --- /dev/null +++ b/plugins/inputs/hddtemp/hddtemp_nocompile.go @@ -0,0 +1,3 @@ +// +build !linux + +package hddtemp diff --git a/plugins/inputs/mesos/README.md b/plugins/inputs/mesos/README.md index 20a6dd244..affb66463 100644 --- a/plugins/inputs/mesos/README.md +++ b/plugins/inputs/mesos/README.md @@ -1,6 +1,6 @@ # Mesos Input Plugin -This input plugin gathers metrics from Mesos (*currently only Mesos masters*). +This input plugin gathers metrics from Mesos. For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page. ### Configuration: @@ -8,14 +8,41 @@ For more information, please check the [Mesos Observability Metrics](http://meso ```toml # Telegraf plugin for gathering metrics from N Mesos masters [[inputs.mesos]] - # Timeout, in ms. + ## Timeout, in ms. timeout = 100 - # A list of Mesos masters, default value is localhost:5050. + ## A list of Mesos masters. masters = ["localhost:5050"] - # Metrics groups to be collected, by default, all enabled. - master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"] + ## Master metrics groups to be collected, by default, all enabled. + master_collections = [ + "resources", + "master", + "system", + "agents", + "frameworks", + "tasks", + "messages", + "evqueue", + "registrar", + ] + ## A list of Mesos slaves, default is [] + # slaves = [] + ## Slave metrics groups to be collected, by default, all enabled. + # slave_collections = [ + # "resources", + # "agent", + # "system", + # "executors", + # "tasks", + # "messages", + # ] + ## Include mesos tasks statistics, default is false + # slave_tasks = true ``` +By dafault this plugin is not configured to gather metrics from mesos. Since mesos cluster can be deployed in numerous ways it does not provide ane default +values in that matter. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally by enabling `slave_tasks` will allow +agthering metrics from takss runing on specified slaves (this options is disabled by default). + ### Measurements & Fields: Mesos master metric groups @@ -33,6 +60,12 @@ Mesos master metric groups - master/disk_revocable_percent - master/disk_revocable_total - master/disk_revocable_used + - master/gpus_percent + - master/gpus_used + - master/gpus_total + - master/gpus_revocable_percent + - master/gpus_revocable_total + - master/gpus_revocable_used - master/mem_percent - master/mem_used - master/mem_total @@ -136,17 +169,111 @@ Mesos master metric groups - registrar/state_store_ms/p999 - registrar/state_store_ms/p9999 +Mesos slave metric groups +- resources + - slave/cpus_percent + - slave/cpus_used + - slave/cpus_total + - slave/cpus_revocable_percent + - slave/cpus_revocable_total + - slave/cpus_revocable_used + - slave/disk_percent + - slave/disk_used + - slave/disk_total + - slave/disk_revocable_percent + - slave/disk_revocable_total + - slave/disk_revocable_used + - slave/gpus_percent + - slave/gpus_used + - slave/gpus_total, + - slave/gpus_revocable_percent + - slave/gpus_revocable_total + - slave/gpus_revocable_used + - slave/mem_percent + - slave/mem_used + - slave/mem_total + - slave/mem_revocable_percent + - slave/mem_revocable_total + - slave/mem_revocable_used + +- agent + - slave/registered + - slave/uptime_secs + +- system + - system/cpus_total + - system/load_15min + - system/load_5min + - system/load_1min + - system/mem_free_bytes + - system/mem_total_bytes + +- executors + - containerizer/mesos/container_destroy_errors + - slave/container_launch_errors + - slave/executors_preempted + - slave/frameworks_active + - slave/executor_directory_max_allowed_age_secs + - slave/executors_registering + - slave/executors_running + - slave/executors_terminated + - slave/executors_terminating + - slave/recovery_errors + +- tasks + - slave/tasks_failed + - slave/tasks_finished + - slave/tasks_killed + - slave/tasks_lost + - slave/tasks_running + - slave/tasks_staging + - slave/tasks_starting + +- messages + - slave/invalid_framework_messages + - slave/invalid_status_updates + - slave/valid_framework_messages + - slave/valid_status_updates + +Mesos tasks metric groups + +- executor_id +- executor_name +- framework_id +- source +- statistics (all metrics below will have `statistics_` prefix included in their names + - cpus_limit + - cpus_system_time_secs + - cpus_user_time_secs + - mem_anon_bytes + - mem_cache_bytes + - mem_critical_pressure_counter + - mem_file_bytes + - mem_limit_bytes + - mem_low_pressure_counter + - mem_mapped_file_bytes + - mem_medium_pressure_counter + - mem_rss_bytes + - mem_swap_bytes + - mem_total_bytes + - mem_total_memsw_bytes + - mem_unevictable_bytes + - timestamp + ### Tags: -- All measurements have the following tags: +- All master/slave measurements have the following tags: + - server + - role (master/slave) + +- Tasks measurements have the following tags: - server ### Example Output: - ``` $ telegraf -config ~/mesos.conf -input-filter mesos -test * Plugin: mesos, Collection 1 -mesos,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0, +mesos,host=172.17.8.102,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0, master/cpus_revocable_percent=0,master/cpus_revocable_total=0, master/cpus_revocable_used=0,master/cpus_total=2, master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0, @@ -163,3 +290,16 @@ master/mem_revocable_used=0,master/mem_total=1002, master/mem_used=0,master/messages_authenticate=0, master/messages_deactivate_framework=0 ... ``` + +Meoso tasks metrics (if enabled): +``` +mesos-tasks,host=172.17.8.102,server=172.17.8.101,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2 +statistics_cpus_limit=0.2,statistics_cpus_system_time_secs=142.49,statistics_cpus_user_time_secs=388.14, +statistics_mem_anon_bytes=359129088,statistics_mem_cache_bytes=3964928, +statistics_mem_critical_pressure_counter=0,statistics_mem_file_bytes=3964928, +statistics_mem_limit_bytes=767557632,statistics_mem_low_pressure_counter=0, +statistics_mem_mapped_file_bytes=114688,statistics_mem_medium_pressure_counter=0, +statistics_mem_rss_bytes=359129088,statistics_mem_swap_bytes=0,statistics_mem_total_bytes=363094016, +statistics_mem_total_memsw_bytes=363094016,statistics_mem_unevictable_bytes=0, +statistics_timestamp=1465486052.70525 1465486053052811792... +``` diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index b096a20d9..a719dc9f4 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -17,33 +17,57 @@ import ( jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) +type Role string + +const ( + MASTER Role = "master" + SLAVE = "slave" +) + type Mesos struct { Timeout int Masters []string MasterCols []string `toml:"master_collections"` + Slaves []string + SlaveCols []string `toml:"slave_collections"` + SlaveTasks bool } -var defaultMetrics = []string{ - "resources", "master", "system", "slaves", "frameworks", - "tasks", "messages", "evqueue", "messages", "registrar", +var allMetrics = map[Role][]string{ + MASTER: []string{"resources", "master", "system", "agents", "frameworks", "tasks", "messages", "evqueue", "registrar"}, + SLAVE: []string{"resources", "agent", "system", "executors", "tasks", "messages"}, } var sampleConfig = ` - # Timeout, in ms. + ## Timeout, in ms. timeout = 100 - # A list of Mesos masters, default value is localhost:5050. + ## A list of Mesos masters. masters = ["localhost:5050"] - # Metrics groups to be collected, by default, all enabled. + ## Master metrics groups to be collected, by default, all enabled. master_collections = [ "resources", "master", "system", - "slaves", + "agents", "frameworks", + "tasks", "messages", "evqueue", "registrar", ] + ## A list of Mesos slaves, default is [] + # slaves = [] + ## Slave metrics groups to be collected, by default, all enabled. + # slave_collections = [ + # "resources", + # "agent", + # "system", + # "executors", + # "tasks", + # "messages", + # ] + ## Include mesos tasks statistics, default is false + # slave_tasks = true ` // SampleConfig returns a sample configuration block @@ -56,21 +80,54 @@ func (m *Mesos) Description() string { return "Telegraf plugin for gathering metrics from N Mesos masters" } +func (m *Mesos) SetDefaults() { + if len(m.MasterCols) == 0 { + m.MasterCols = allMetrics[MASTER] + } + + if len(m.SlaveCols) == 0 { + m.SlaveCols = allMetrics[SLAVE] + } + + if m.Timeout == 0 { + log.Println("[mesos] Missing timeout value, setting default value (100ms)") + m.Timeout = 100 + } +} + // Gather() metrics from given list of Mesos Masters func (m *Mesos) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup var errorChannel chan error - if len(m.Masters) == 0 { - m.Masters = []string{"localhost:5050"} - } + m.SetDefaults() - errorChannel = make(chan error, len(m.Masters)*2) + errorChannel = make(chan error, len(m.Masters)+2*len(m.Slaves)) for _, v := range m.Masters { wg.Add(1) go func(c string) { - errorChannel <- m.gatherMetrics(c, acc) + errorChannel <- m.gatherMainMetrics(c, ":5050", MASTER, acc) + wg.Done() + return + }(v) + } + + for _, v := range m.Slaves { + wg.Add(1) + go func(c string) { + errorChannel <- m.gatherMainMetrics(c, ":5051", MASTER, acc) + wg.Done() + return + }(v) + + if !m.SlaveTasks { + continue + } + + wg.Add(1) + go func(c string) { + errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc) wg.Done() return }(v) @@ -94,7 +151,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error { } // metricsDiff() returns set names for removal -func metricsDiff(w []string) []string { +func metricsDiff(role Role, w []string) []string { b := []string{} s := make(map[string]bool) @@ -106,7 +163,7 @@ func metricsDiff(w []string) []string { s[v] = true } - for _, d := range defaultMetrics { + for _, d := range allMetrics[role] { if _, ok := s[d]; !ok { b = append(b, d) } @@ -116,156 +173,239 @@ func metricsDiff(w []string) []string { } // masterBlocks serves as kind of metrics registry groupping them in sets -func masterBlocks(g string) []string { +func getMetrics(role Role, group string) []string { var m map[string][]string m = make(map[string][]string) - m["resources"] = []string{ - "master/cpus_percent", - "master/cpus_used", - "master/cpus_total", - "master/cpus_revocable_percent", - "master/cpus_revocable_total", - "master/cpus_revocable_used", - "master/disk_percent", - "master/disk_used", - "master/disk_total", - "master/disk_revocable_percent", - "master/disk_revocable_total", - "master/disk_revocable_used", - "master/mem_percent", - "master/mem_used", - "master/mem_total", - "master/mem_revocable_percent", - "master/mem_revocable_total", - "master/mem_revocable_used", + if role == MASTER { + m["resources"] = []string{ + "master/cpus_percent", + "master/cpus_used", + "master/cpus_total", + "master/cpus_revocable_percent", + "master/cpus_revocable_total", + "master/cpus_revocable_used", + "master/disk_percent", + "master/disk_used", + "master/disk_total", + "master/disk_revocable_percent", + "master/disk_revocable_total", + "master/disk_revocable_used", + "master/gpus_percent", + "master/gpus_used", + "master/gpus_total", + "master/gpus_revocable_percent", + "master/gpus_revocable_total", + "master/gpus_revocable_used", + "master/mem_percent", + "master/mem_used", + "master/mem_total", + "master/mem_revocable_percent", + "master/mem_revocable_total", + "master/mem_revocable_used", + } + + m["master"] = []string{ + "master/elected", + "master/uptime_secs", + } + + m["system"] = []string{ + "system/cpus_total", + "system/load_15min", + "system/load_5min", + "system/load_1min", + "system/mem_free_bytes", + "system/mem_total_bytes", + } + + m["agents"] = []string{ + "master/slave_registrations", + "master/slave_removals", + "master/slave_reregistrations", + "master/slave_shutdowns_scheduled", + "master/slave_shutdowns_canceled", + "master/slave_shutdowns_completed", + "master/slaves_active", + "master/slaves_connected", + "master/slaves_disconnected", + "master/slaves_inactive", + } + + m["frameworks"] = []string{ + "master/frameworks_active", + "master/frameworks_connected", + "master/frameworks_disconnected", + "master/frameworks_inactive", + "master/outstanding_offers", + } + + m["tasks"] = []string{ + "master/tasks_error", + "master/tasks_failed", + "master/tasks_finished", + "master/tasks_killed", + "master/tasks_lost", + "master/tasks_running", + "master/tasks_staging", + "master/tasks_starting", + } + + m["messages"] = []string{ + "master/invalid_executor_to_framework_messages", + "master/invalid_framework_to_executor_messages", + "master/invalid_status_update_acknowledgements", + "master/invalid_status_updates", + "master/dropped_messages", + "master/messages_authenticate", + "master/messages_deactivate_framework", + "master/messages_decline_offers", + "master/messages_executor_to_framework", + "master/messages_exited_executor", + "master/messages_framework_to_executor", + "master/messages_kill_task", + "master/messages_launch_tasks", + "master/messages_reconcile_tasks", + "master/messages_register_framework", + "master/messages_register_slave", + "master/messages_reregister_framework", + "master/messages_reregister_slave", + "master/messages_resource_request", + "master/messages_revive_offers", + "master/messages_status_update", + "master/messages_status_update_acknowledgement", + "master/messages_unregister_framework", + "master/messages_unregister_slave", + "master/messages_update_slave", + "master/recovery_slave_removals", + "master/slave_removals/reason_registered", + "master/slave_removals/reason_unhealthy", + "master/slave_removals/reason_unregistered", + "master/valid_framework_to_executor_messages", + "master/valid_status_update_acknowledgements", + "master/valid_status_updates", + "master/task_lost/source_master/reason_invalid_offers", + "master/task_lost/source_master/reason_slave_removed", + "master/task_lost/source_slave/reason_executor_terminated", + "master/valid_executor_to_framework_messages", + } + + m["evqueue"] = []string{ + "master/event_queue_dispatches", + "master/event_queue_http_requests", + "master/event_queue_messages", + } + + m["registrar"] = []string{ + "registrar/state_fetch_ms", + "registrar/state_store_ms", + "registrar/state_store_ms/max", + "registrar/state_store_ms/min", + "registrar/state_store_ms/p50", + "registrar/state_store_ms/p90", + "registrar/state_store_ms/p95", + "registrar/state_store_ms/p99", + "registrar/state_store_ms/p999", + "registrar/state_store_ms/p9999", + } + } else if role == SLAVE { + m["resources"] = []string{ + "slave/cpus_percent", + "slave/cpus_used", + "slave/cpus_total", + "slave/cpus_revocable_percent", + "slave/cpus_revocable_total", + "slave/cpus_revocable_used", + "slave/disk_percent", + "slave/disk_used", + "slave/disk_total", + "slave/disk_revocable_percent", + "slave/disk_revocable_total", + "slave/disk_revocable_used", + "slave/gpus_percent", + "slave/gpus_used", + "slave/gpus_total", + "slave/gpus_revocable_percent", + "slave/gpus_revocable_total", + "slave/gpus_revocable_used", + "slave/mem_percent", + "slave/mem_used", + "slave/mem_total", + "slave/mem_revocable_percent", + "slave/mem_revocable_total", + "slave/mem_revocable_used", + } + + m["agent"] = []string{ + "slave/registered", + "slave/uptime_secs", + } + + m["system"] = []string{ + "system/cpus_total", + "system/load_15min", + "system/load_5min", + "system/load_1min", + "system/mem_free_bytes", + "system/mem_total_bytes", + } + + m["executors"] = []string{ + "containerizer/mesos/container_destroy_errors", + "slave/container_launch_errors", + "slave/executors_preempted", + "slave/frameworks_active", + "slave/executor_directory_max_allowed_age_secs", + "slave/executors_registering", + "slave/executors_running", + "slave/executors_terminated", + "slave/executors_terminating", + "slave/recovery_errors", + } + + m["tasks"] = []string{ + "slave/tasks_failed", + "slave/tasks_finished", + "slave/tasks_killed", + "slave/tasks_lost", + "slave/tasks_running", + "slave/tasks_staging", + "slave/tasks_starting", + } + + m["messages"] = []string{ + "slave/invalid_framework_messages", + "slave/invalid_status_updates", + "slave/valid_framework_messages", + "slave/valid_status_updates", + } } - m["master"] = []string{ - "master/elected", - "master/uptime_secs", - } - - m["system"] = []string{ - "system/cpus_total", - "system/load_15min", - "system/load_5min", - "system/load_1min", - "system/mem_free_bytes", - "system/mem_total_bytes", - } - - m["slaves"] = []string{ - "master/slave_registrations", - "master/slave_removals", - "master/slave_reregistrations", - "master/slave_shutdowns_scheduled", - "master/slave_shutdowns_canceled", - "master/slave_shutdowns_completed", - "master/slaves_active", - "master/slaves_connected", - "master/slaves_disconnected", - "master/slaves_inactive", - } - - m["frameworks"] = []string{ - "master/frameworks_active", - "master/frameworks_connected", - "master/frameworks_disconnected", - "master/frameworks_inactive", - "master/outstanding_offers", - } - - m["tasks"] = []string{ - "master/tasks_error", - "master/tasks_failed", - "master/tasks_finished", - "master/tasks_killed", - "master/tasks_lost", - "master/tasks_running", - "master/tasks_staging", - "master/tasks_starting", - } - - m["messages"] = []string{ - "master/invalid_executor_to_framework_messages", - "master/invalid_framework_to_executor_messages", - "master/invalid_status_update_acknowledgements", - "master/invalid_status_updates", - "master/dropped_messages", - "master/messages_authenticate", - "master/messages_deactivate_framework", - "master/messages_decline_offers", - "master/messages_executor_to_framework", - "master/messages_exited_executor", - "master/messages_framework_to_executor", - "master/messages_kill_task", - "master/messages_launch_tasks", - "master/messages_reconcile_tasks", - "master/messages_register_framework", - "master/messages_register_slave", - "master/messages_reregister_framework", - "master/messages_reregister_slave", - "master/messages_resource_request", - "master/messages_revive_offers", - "master/messages_status_update", - "master/messages_status_update_acknowledgement", - "master/messages_unregister_framework", - "master/messages_unregister_slave", - "master/messages_update_slave", - "master/recovery_slave_removals", - "master/slave_removals/reason_registered", - "master/slave_removals/reason_unhealthy", - "master/slave_removals/reason_unregistered", - "master/valid_framework_to_executor_messages", - "master/valid_status_update_acknowledgements", - "master/valid_status_updates", - "master/task_lost/source_master/reason_invalid_offers", - "master/task_lost/source_master/reason_slave_removed", - "master/task_lost/source_slave/reason_executor_terminated", - "master/valid_executor_to_framework_messages", - } - - m["evqueue"] = []string{ - "master/event_queue_dispatches", - "master/event_queue_http_requests", - "master/event_queue_messages", - } - - m["registrar"] = []string{ - "registrar/state_fetch_ms", - "registrar/state_store_ms", - "registrar/state_store_ms/max", - "registrar/state_store_ms/min", - "registrar/state_store_ms/p50", - "registrar/state_store_ms/p90", - "registrar/state_store_ms/p95", - "registrar/state_store_ms/p99", - "registrar/state_store_ms/p999", - "registrar/state_store_ms/p9999", - } - - ret, ok := m[g] + ret, ok := m[group] if !ok { - log.Println("[mesos] Unkown metrics group: ", g) + log.Printf("[mesos] Unkown %s metrics group: %s\n", role, group) return []string{} } return ret } -// removeGroup(), remove unwanted sets -func (m *Mesos) removeGroup(j *map[string]interface{}) { +func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) { var ok bool + var selectedMetrics []string - b := metricsDiff(m.MasterCols) + if role == MASTER { + selectedMetrics = m.MasterCols + } else if role == SLAVE { + selectedMetrics = m.SlaveCols + } - for _, k := range b { - for _, v := range masterBlocks(k) { - if _, ok = (*j)[v]; ok { - delete((*j), v) + for _, k := range metricsDiff(role, selectedMetrics) { + for _, v := range getMetrics(role, k) { + if _, ok = (*metrics)[v]; ok { + delete((*metrics), v) } } } @@ -280,23 +420,66 @@ var client = &http.Client{ Timeout: time.Duration(4 * time.Second), } -// This should not belong to the object -func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { - var jsonOut map[string]interface{} +func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error { + var metrics []map[string]interface{} - host, _, err := net.SplitHostPort(a) + host, _, err := net.SplitHostPort(address) if err != nil { - host = a - a = a + ":5050" + host = address + address = address + defaultPort } tags := map[string]string{ "server": host, } - if m.Timeout == 0 { - log.Println("[mesos] Missing timeout value, setting default value (100ms)") - m.Timeout = 100 + ts := strconv.Itoa(m.Timeout) + "ms" + + resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts) + + if err != nil { + return err + } + + data, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return err + } + + if err = json.Unmarshal([]byte(data), &metrics); err != nil { + return errors.New("Error decoding JSON response") + } + + for _, task := range metrics { + tags["task_id"] = task["executor_id"].(string) + + jf := jsonparser.JSONFlattener{} + err = jf.FlattenJSON("", task) + + if err != nil { + return err + } + + acc.AddFields("mesos-tasks", jf.Fields, tags) + } + + return nil +} + +// This should not belong to the object +func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error { + var jsonOut map[string]interface{} + + host, _, err := net.SplitHostPort(a) + if err != nil { + host = a + a = a + defaultPort + } + + tags := map[string]string{ + "server": host, + "role": string(role), } ts := strconv.Itoa(m.Timeout) + "ms" @@ -317,7 +500,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { return errors.New("Error decoding JSON response") } - m.removeGroup(&jsonOut) + m.filterMetrics(role, &jsonOut) jf := jsonparser.JSONFlattener{} diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index c56580649..062e23e4a 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -2,70 +2,275 @@ package mesos import ( "encoding/json" + "fmt" "math/rand" "net/http" "net/http/httptest" "os" "testing" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" ) -var mesosMetrics map[string]interface{} -var ts *httptest.Server +var masterMetrics map[string]interface{} +var masterTestServer *httptest.Server +var slaveMetrics map[string]interface{} +var slaveTaskMetrics map[string]interface{} +var slaveTestServer *httptest.Server + +func randUUID() string { + b := make([]byte, 16) + rand.Read(b) + return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) +} func generateMetrics() { - mesosMetrics = make(map[string]interface{}) + masterMetrics = make(map[string]interface{}) - metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total", - "master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used", - "master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent", - "master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent", - "master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total", - "master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total", - "system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes", - "system/mem_total_bytes", "master/slave_registrations", "master/slave_removals", - "master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled", - "master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected", - "master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active", - "master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive", - "master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished", - "master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging", - "master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", - "master/invalid_status_update_acknowledgements", "master/invalid_status_updates", - "master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework", - "master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor", - "master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks", - "master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave", - "master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request", - "master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement", - "master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave", - "master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy", - "master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements", - "master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers", - "master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated", - "master/valid_executor_to_framework_messages", "master/event_queue_dispatches", - "master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms", - "registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min", - "registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95", - "registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"} + metricNames := []string{ + // resources + "master/cpus_percent", + "master/cpus_used", + "master/cpus_total", + "master/cpus_revocable_percent", + "master/cpus_revocable_total", + "master/cpus_revocable_used", + "master/disk_percent", + "master/disk_used", + "master/disk_total", + "master/disk_revocable_percent", + "master/disk_revocable_total", + "master/disk_revocable_used", + "master/gpus_percent", + "master/gpus_used", + "master/gpus_total", + "master/gpus_revocable_percent", + "master/gpus_revocable_total", + "master/gpus_revocable_used", + "master/mem_percent", + "master/mem_used", + "master/mem_total", + "master/mem_revocable_percent", + "master/mem_revocable_total", + "master/mem_revocable_used", + // master + "master/elected", + "master/uptime_secs", + // system + "system/cpus_total", + "system/load_15min", + "system/load_5min", + "system/load_1min", + "system/mem_free_bytes", + "system/mem_total_bytes", + // agents + "master/slave_registrations", + "master/slave_removals", + "master/slave_reregistrations", + "master/slave_shutdowns_scheduled", + "master/slave_shutdowns_canceled", + "master/slave_shutdowns_completed", + "master/slaves_active", + "master/slaves_connected", + "master/slaves_disconnected", + "master/slaves_inactive", + // frameworks + "master/frameworks_active", + "master/frameworks_connected", + "master/frameworks_disconnected", + "master/frameworks_inactive", + "master/outstanding_offers", + // tasks + "master/tasks_error", + "master/tasks_failed", + "master/tasks_finished", + "master/tasks_killed", + "master/tasks_lost", + "master/tasks_running", + "master/tasks_staging", + "master/tasks_starting", + // messages + "master/invalid_executor_to_framework_messages", + "master/invalid_framework_to_executor_messages", + "master/invalid_status_update_acknowledgements", + "master/invalid_status_updates", + "master/dropped_messages", + "master/messages_authenticate", + "master/messages_deactivate_framework", + "master/messages_decline_offers", + "master/messages_executor_to_framework", + "master/messages_exited_executor", + "master/messages_framework_to_executor", + "master/messages_kill_task", + "master/messages_launch_tasks", + "master/messages_reconcile_tasks", + "master/messages_register_framework", + "master/messages_register_slave", + "master/messages_reregister_framework", + "master/messages_reregister_slave", + "master/messages_resource_request", + "master/messages_revive_offers", + "master/messages_status_update", + "master/messages_status_update_acknowledgement", + "master/messages_unregister_framework", + "master/messages_unregister_slave", + "master/messages_update_slave", + "master/recovery_slave_removals", + "master/slave_removals/reason_registered", + "master/slave_removals/reason_unhealthy", + "master/slave_removals/reason_unregistered", + "master/valid_framework_to_executor_messages", + "master/valid_status_update_acknowledgements", + "master/valid_status_updates", + "master/task_lost/source_master/reason_invalid_offers", + "master/task_lost/source_master/reason_slave_removed", + "master/task_lost/source_slave/reason_executor_terminated", + "master/valid_executor_to_framework_messages", + // evgqueue + "master/event_queue_dispatches", + "master/event_queue_http_requests", + "master/event_queue_messages", + // registrar + "registrar/state_fetch_ms", + "registrar/state_store_ms", + "registrar/state_store_ms/max", + "registrar/state_store_ms/min", + "registrar/state_store_ms/p50", + "registrar/state_store_ms/p90", + "registrar/state_store_ms/p95", + "registrar/state_store_ms/p99", + "registrar/state_store_ms/p999", + "registrar/state_store_ms/p9999", + } for _, k := range metricNames { - mesosMetrics[k] = rand.Float64() + masterMetrics[k] = rand.Float64() + } + + slaveMetrics = make(map[string]interface{}) + + metricNames = []string{ + // resources + "slave/cpus_percent", + "slave/cpus_used", + "slave/cpus_total", + "slave/cpus_revocable_percent", + "slave/cpus_revocable_total", + "slave/cpus_revocable_used", + "slave/disk_percent", + "slave/disk_used", + "slave/disk_total", + "slave/disk_revocable_percent", + "slave/disk_revocable_total", + "slave/disk_revocable_used", + "slave/gpus_percent", + "slave/gpus_used", + "slave/gpus_total", + "slave/gpus_revocable_percent", + "slave/gpus_revocable_total", + "slave/gpus_revocable_used", + "slave/mem_percent", + "slave/mem_used", + "slave/mem_total", + "slave/mem_revocable_percent", + "slave/mem_revocable_total", + "slave/mem_revocable_used", + // agent + "slave/registered", + "slave/uptime_secs", + // system + "system/cpus_total", + "system/load_15min", + "system/load_5min", + "system/load_1min", + "system/mem_free_bytes", + "system/mem_total_bytes", + // executors + "containerizer/mesos/container_destroy_errors", + "slave/container_launch_errors", + "slave/executors_preempted", + "slave/frameworks_active", + "slave/executor_directory_max_allowed_age_secs", + "slave/executors_registering", + "slave/executors_running", + "slave/executors_terminated", + "slave/executors_terminating", + "slave/recovery_errors", + // tasks + "slave/tasks_failed", + "slave/tasks_finished", + "slave/tasks_killed", + "slave/tasks_lost", + "slave/tasks_running", + "slave/tasks_staging", + "slave/tasks_starting", + // messages + "slave/invalid_framework_messages", + "slave/invalid_status_updates", + "slave/valid_framework_messages", + "slave/valid_status_updates", + } + + for _, k := range metricNames { + slaveMetrics[k] = rand.Float64() + } + + slaveTaskMetrics = map[string]interface{}{ + "executor_id": fmt.Sprintf("task_%s", randUUID()), + "executor_name": "Some task description", + "framework_id": randUUID(), + "source": fmt.Sprintf("task_source_%s", randUUID()), + "statistics": map[string]interface{}{ + "cpus_limit": rand.Float64(), + "cpus_system_time_secs": rand.Float64(), + "cpus_user_time_secs": rand.Float64(), + "mem_anon_bytes": float64(rand.Int63()), + "mem_cache_bytes": float64(rand.Int63()), + "mem_critical_pressure_counter": float64(rand.Int63()), + "mem_file_bytes": float64(rand.Int63()), + "mem_limit_bytes": float64(rand.Int63()), + "mem_low_pressure_counter": float64(rand.Int63()), + "mem_mapped_file_bytes": float64(rand.Int63()), + "mem_medium_pressure_counter": float64(rand.Int63()), + "mem_rss_bytes": float64(rand.Int63()), + "mem_swap_bytes": float64(rand.Int63()), + "mem_total_bytes": float64(rand.Int63()), + "mem_total_memsw_bytes": float64(rand.Int63()), + "mem_unevictable_bytes": float64(rand.Int63()), + "timestamp": rand.Float64(), + }, } } func TestMain(m *testing.M) { generateMetrics() - r := http.NewServeMux() - r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { + + masterRouter := http.NewServeMux() + masterRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(mesosMetrics) + json.NewEncoder(w).Encode(masterMetrics) }) - ts = httptest.NewServer(r) + masterTestServer = httptest.NewServer(masterRouter) + + slaveRouter := http.NewServeMux() + slaveRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(slaveMetrics) + }) + slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics}) + }) + slaveTestServer = httptest.NewServer(slaveRouter) + rc := m.Run() - ts.Close() + + masterTestServer.Close() + slaveTestServer.Close() os.Exit(rc) } @@ -73,7 +278,7 @@ func TestMesosMaster(t *testing.T) { var acc testutil.Accumulator m := Mesos{ - Masters: []string{ts.Listener.Addr().String()}, + Masters: []string{masterTestServer.Listener.Addr().String()}, Timeout: 10, } @@ -83,34 +288,88 @@ func TestMesosMaster(t *testing.T) { t.Errorf(err.Error()) } - acc.AssertContainsFields(t, "mesos", mesosMetrics) + acc.AssertContainsFields(t, "mesos", masterMetrics) } -func TestRemoveGroup(t *testing.T) { - generateMetrics() - +func TestMasterFilter(t *testing.T) { m := Mesos{ MasterCols: []string{ "resources", "master", "registrar", }, } b := []string{ - "system", "slaves", "frameworks", - "messages", "evqueue", + "system", "agents", "frameworks", + "messages", "evqueue", "tasks", } - m.removeGroup(&mesosMetrics) + m.filterMetrics(MASTER, &masterMetrics) for _, v := range b { - for _, x := range masterBlocks(v) { - if _, ok := mesosMetrics[x]; ok { + for _, x := range getMetrics(MASTER, v) { + if _, ok := masterMetrics[x]; ok { t.Errorf("Found key %s, it should be gone.", x) } } } for _, v := range m.MasterCols { - for _, x := range masterBlocks(v) { - if _, ok := mesosMetrics[x]; !ok { + for _, x := range getMetrics(MASTER, v) { + if _, ok := masterMetrics[x]; !ok { + t.Errorf("Didn't find key %s, it should present.", x) + } + } + } +} + +func TestMesosSlave(t *testing.T) { + var acc testutil.Accumulator + + m := Mesos{ + Masters: []string{}, + Slaves: []string{slaveTestServer.Listener.Addr().String()}, + SlaveTasks: true, + Timeout: 10, + } + + err := m.Gather(&acc) + + if err != nil { + t.Errorf(err.Error()) + } + + acc.AssertContainsFields(t, "mesos", slaveMetrics) + + jf := jsonparser.JSONFlattener{} + err = jf.FlattenJSON("", slaveTaskMetrics) + + if err != nil { + t.Errorf(err.Error()) + } + + acc.AssertContainsFields(t, "mesos-tasks", jf.Fields) +} + +func TestSlaveFilter(t *testing.T) { + m := Mesos{ + SlaveCols: []string{ + "resources", "agent", "tasks", + }, + } + b := []string{ + "system", "executors", "messages", + } + + m.filterMetrics(SLAVE, &slaveMetrics) + + for _, v := range b { + for _, x := range getMetrics(SLAVE, v) { + if _, ok := slaveMetrics[x]; ok { + t.Errorf("Found key %s, it should be gone.", x) + } + } + } + for _, v := range m.MasterCols { + for _, x := range getMetrics(SLAVE, v) { + if _, ok := slaveMetrics[x]; !ok { t.Errorf("Didn't find key %s, it should present.", x) } } diff --git a/plugins/inputs/ping/ping_windows.go b/plugins/inputs/ping/ping_windows.go index b1d3ef06f..d36f44526 100644 --- a/plugins/inputs/ping/ping_windows.go +++ b/plugins/inputs/ping/ping_windows.go @@ -1,3 +1,210 @@ // +build windows - package ping + +import ( + "errors" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "os/exec" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +// HostPinger is a function that runs the "ping" function using a list of +// passed arguments. This can be easily switched with a mocked ping function +// for unit test purposes (see ping_test.go) +type HostPinger func(timeout float64, args ...string) (string, error) + +type Ping struct { + // Number of pings to send (ping -c ) + Count int + + // Ping timeout, in seconds. 0 means no timeout (ping -W ) + Timeout float64 + + // URLs to ping + Urls []string + + // host ping function + pingHost HostPinger +} + +func (s *Ping) Description() string { + return "Ping given url(s) and return statistics" +} + +const sampleConfig = ` + ## urls to ping + urls = ["www.google.com"] # required + + ## number of pings to send per collection (ping -n ) + count = 4 # required + + ## Ping timeout, in seconds. 0 means default timeout (ping -w ) + Timeout = 0 +` + +func (s *Ping) SampleConfig() string { + return sampleConfig +} + +func hostPinger(timeout float64, args ...string) (string, error) { + bin, err := exec.LookPath("ping") + if err != nil { + return "", err + } + c := exec.Command(bin, args...) + out, err := internal.CombinedOutputTimeout(c, + time.Second*time.Duration(timeout+1)) + return string(out), err +} + +// processPingOutput takes in a string output from the ping command +// based on linux implementation but using regex ( multilanguage support ) ( shouldn't affect the performance of the program ) +// It returns (, , , , ) +func processPingOutput(out string) (int, int, int, int, int, error) { + // So find a line contain 3 numbers except reply lines + var stats, aproxs []string = nil, nil + err := errors.New("Fatal error processing ping output") + stat := regexp.MustCompile(`=\W*(\d+)\D*=\W*(\d+)\D*=\W*(\d+)`) + aprox := regexp.MustCompile(`=\W*(\d+)\D*ms\D*=\W*(\d+)\D*ms\D*=\W*(\d+)\D*ms`) + lines := strings.Split(out, "\n") + for _, line := range lines { + if !strings.Contains(line, "TTL") { + if stats == nil { + stats = stat.FindStringSubmatch(line) + } + if stats != nil && aproxs == nil { + aproxs = aprox.FindStringSubmatch(line) + } + } + } + + // stats data should contain 4 members: entireExpression + ( Send, Receive, Lost ) + if len(stats) != 4 { + return 0, 0, 0, 0, 0, err + } + trans, err := strconv.Atoi(stats[1]) + if err != nil { + return 0, 0, 0, 0, 0, err + } + rec, err := strconv.Atoi(stats[2]) + if err != nil { + return 0, 0, 0, 0, 0, err + } + + // aproxs data should contain 4 members: entireExpression + ( min, max, avg ) + if len(aproxs) != 4 { + return trans, rec, 0, 0, 0, err + } + min, err := strconv.Atoi(aproxs[1]) + if err != nil { + return trans, rec, 0, 0, 0, err + } + max, err := strconv.Atoi(aproxs[2]) + if err != nil { + return trans, rec, 0, 0, 0, err + } + avg, err := strconv.Atoi(aproxs[3]) + if err != nil { + return 0, 0, 0, 0, 0, err + } + + return trans, rec, avg, min, max, err +} + +func (p *Ping) timeout() float64 { + // According to MSDN, default ping timeout for windows is 4 second + // Add also one second interval + + if p.Timeout > 0 { + return p.Timeout + 1 + } + return 4 + 1 +} + +// args returns the arguments for the 'ping' executable +func (p *Ping) args(url string) []string { + args := []string{"-n", strconv.Itoa(p.Count)} + + if p.Timeout > 0 { + args = append(args, "-w", strconv.FormatFloat(p.Timeout*1000, 'f', 0, 64)) + } + + args = append(args, url) + + return args +} + +func (p *Ping) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + errorChannel := make(chan error, len(p.Urls)*2) + var pendingError error = nil + // Spin off a go routine for each url to ping + for _, url := range p.Urls { + wg.Add(1) + go func(u string) { + defer wg.Done() + args := p.args(u) + totalTimeout := p.timeout() * float64(p.Count) + out, err := p.pingHost(totalTimeout, args...) + // ping host return exitcode != 0 also when there was no response from host + // but command was execute succesfully + if err != nil { + // Combine go err + stderr output + pendingError = errors.New(strings.TrimSpace(out) + ", " + err.Error()) + } + tags := map[string]string{"url": u} + trans, rec, avg, min, max, err := processPingOutput(out) + if err != nil { + // fatal error + if pendingError != nil { + errorChannel <- pendingError + } + errorChannel <- err + return + } + // Calculate packet loss percentage + loss := float64(trans-rec) / float64(trans) * 100.0 + fields := map[string]interface{}{ + "packets_transmitted": trans, + "packets_received": rec, + "percent_packet_loss": loss, + } + if avg > 0 { + fields["average_response_ms"] = avg + } + if min > 0 { + fields["minimum_response_ms"] = min + } + if max > 0 { + fields["maximum_response_ms"] = max + } + acc.AddFields("ping", fields, tags) + }(url) + } + + wg.Wait() + close(errorChannel) + + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) +} + +func init() { + inputs.Add("ping", func() telegraf.Input { + return &Ping{pingHost: hostPinger} + }) +} diff --git a/plugins/inputs/ping/ping_windows_test.go b/plugins/inputs/ping/ping_windows_test.go new file mode 100644 index 000000000..a4d0609e6 --- /dev/null +++ b/plugins/inputs/ping/ping_windows_test.go @@ -0,0 +1,218 @@ +// +build windows +package ping + +import ( + "errors" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "testing" +) + +// Windows ping format ( should support multilanguage ?) +var winPLPingOutput = ` +Badanie 8.8.8.8 z 32 bajtami danych: +Odpowiedz z 8.8.8.8: bajtow=32 czas=49ms TTL=43 +Odpowiedz z 8.8.8.8: bajtow=32 czas=46ms TTL=43 +Odpowiedz z 8.8.8.8: bajtow=32 czas=48ms TTL=43 +Odpowiedz z 8.8.8.8: bajtow=32 czas=57ms TTL=43 + +Statystyka badania ping dla 8.8.8.8: + Pakiety: Wyslane = 4, Odebrane = 4, Utracone = 0 + (0% straty), +Szacunkowy czas bladzenia pakietww w millisekundach: + Minimum = 46 ms, Maksimum = 57 ms, Czas sredni = 50 ms +` + +// Windows ping format ( should support multilanguage ?) +var winENPingOutput = ` +Pinging 8.8.8.8 with 32 bytes of data: +Reply from 8.8.8.8: bytes=32 time=52ms TTL=43 +Reply from 8.8.8.8: bytes=32 time=50ms TTL=43 +Reply from 8.8.8.8: bytes=32 time=50ms TTL=43 +Reply from 8.8.8.8: bytes=32 time=51ms TTL=43 + +Ping statistics for 8.8.8.8: + Packets: Sent = 4, Received = 4, Lost = 0 (0% loss), +Approximate round trip times in milli-seconds: + Minimum = 50ms, Maximum = 52ms, Average = 50ms +` + +func TestHost(t *testing.T) { + trans, rec, avg, min, max, err := processPingOutput(winPLPingOutput) + assert.NoError(t, err) + assert.Equal(t, 4, trans, "4 packets were transmitted") + assert.Equal(t, 4, rec, "4 packets were received") + assert.Equal(t, 50, avg, "Average 50") + assert.Equal(t, 46, min, "Min 46") + assert.Equal(t, 57, max, "max 57") + + trans, rec, avg, min, max, err = processPingOutput(winENPingOutput) + assert.NoError(t, err) + assert.Equal(t, 4, trans, "4 packets were transmitted") + assert.Equal(t, 4, rec, "4 packets were received") + assert.Equal(t, 50, avg, "Average 50") + assert.Equal(t, 50, min, "Min 50") + assert.Equal(t, 52, max, "Max 52") +} + +func mockHostPinger(timeout float64, args ...string) (string, error) { + return winENPingOutput, nil +} + +// Test that Gather function works on a normal ping +func TestPingGather(t *testing.T) { + var acc testutil.Accumulator + p := Ping{ + Urls: []string{"www.google.com", "www.reddit.com"}, + pingHost: mockHostPinger, + } + + p.Gather(&acc) + tags := map[string]string{"url": "www.google.com"} + fields := map[string]interface{}{ + "packets_transmitted": 4, + "packets_received": 4, + "percent_packet_loss": 0.0, + "average_response_ms": 50, + "minimum_response_ms": 50, + "maximum_response_ms": 52, + } + acc.AssertContainsTaggedFields(t, "ping", fields, tags) + + tags = map[string]string{"url": "www.reddit.com"} + acc.AssertContainsTaggedFields(t, "ping", fields, tags) +} + +var errorPingOutput = ` +Badanie nask.pl [195.187.242.157] z 32 bajtami danych: +Upłynął limit czasu żądania. +Upłynął limit czasu żądania. +Upłynął limit czasu żądania. +Upłynął limit czasu żądania. + +Statystyka badania ping dla 195.187.242.157: + Pakiety: Wysłane = 4, Odebrane = 0, Utracone = 4 + (100% straty), +` + +func mockErrorHostPinger(timeout float64, args ...string) (string, error) { + return errorPingOutput, errors.New("No packets received") +} + +// Test that Gather works on a ping with no transmitted packets, even though the +// command returns an error +func TestBadPingGather(t *testing.T) { + var acc testutil.Accumulator + p := Ping{ + Urls: []string{"www.amazon.com"}, + pingHost: mockErrorHostPinger, + } + + p.Gather(&acc) + tags := map[string]string{"url": "www.amazon.com"} + fields := map[string]interface{}{ + "packets_transmitted": 4, + "packets_received": 0, + "percent_packet_loss": 100.0, + } + acc.AssertContainsTaggedFields(t, "ping", fields, tags) +} + +var lossyPingOutput = ` +Badanie thecodinglove.com [66.6.44.4] z 9800 bajtami danych: +Upłynął limit czasu żądania. +Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48 +Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48 +Odpowiedź z 66.6.44.4: bajtów=9800 czas=118ms TTL=48 +Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48 +Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48 +Upłynął limit czasu żądania. +Odpowiedź z 66.6.44.4: bajtów=9800 czas=119ms TTL=48 +Odpowiedź z 66.6.44.4: bajtów=9800 czas=116ms TTL=48 + +Statystyka badania ping dla 66.6.44.4: + Pakiety: Wysłane = 9, Odebrane = 7, Utracone = 2 + (22% straty), +Szacunkowy czas błądzenia pakietów w millisekundach: + Minimum = 114 ms, Maksimum = 119 ms, Czas średni = 115 ms +` + +func mockLossyHostPinger(timeout float64, args ...string) (string, error) { + return lossyPingOutput, nil +} + +// Test that Gather works on a ping with lossy packets +func TestLossyPingGather(t *testing.T) { + var acc testutil.Accumulator + p := Ping{ + Urls: []string{"www.google.com"}, + pingHost: mockLossyHostPinger, + } + + p.Gather(&acc) + tags := map[string]string{"url": "www.google.com"} + fields := map[string]interface{}{ + "packets_transmitted": 9, + "packets_received": 7, + "percent_packet_loss": 22.22222222222222, + "average_response_ms": 115, + "minimum_response_ms": 114, + "maximum_response_ms": 119, + } + acc.AssertContainsTaggedFields(t, "ping", fields, tags) +} + +// Fatal ping output (invalid argument) +var fatalPingOutput = ` +Bad option -d. + + +Usage: ping [-t] [-a] [-n count] [-l size] [-f] [-i TTL] [-v TOS] + [-r count] [-s count] [[-j host-list] | [-k host-list]] + [-w timeout] [-R] [-S srcaddr] [-4] [-6] target_name + +Options: + -t Ping the specified host until stopped. + To see statistics and continue - type Control-Break; + To stop - type Control-C. + -a Resolve addresses to hostnames. + -n count Number of echo requests to send. + -l size Send buffer size. + -f Set Don't Fragment flag in packet (IPv4-only). + -i TTL Time To Live. + -v TOS Type Of Service (IPv4-only. This setting has been deprecated + and has no effect on the type of service field in the IP Header). + -r count Record route for count hops (IPv4-only). + -s count Timestamp for count hops (IPv4-only). + -j host-list Loose source route along host-list (IPv4-only). + -k host-list Strict source route along host-list (IPv4-only). + -w timeout Timeout in milliseconds to wait for each reply. + -R Use routing header to test reverse route also (IPv6-only). + -S srcaddr Source address to use. + -4 Force using IPv4. + -6 Force using IPv6. + +` + +func mockFatalHostPinger(timeout float64, args ...string) (string, error) { + return fatalPingOutput, errors.New("So very bad") +} + +// Test that a fatal ping command does not gather any statistics. +func TestFatalPingGather(t *testing.T) { + var acc testutil.Accumulator + p := Ping{ + Urls: []string{"www.amazon.com"}, + pingHost: mockFatalHostPinger, + } + + p.Gather(&acc) + assert.False(t, acc.HasMeasurement("packets_transmitted"), + "Fatal ping should not have packet measurements") + assert.False(t, acc.HasMeasurement("packets_received"), + "Fatal ping should not have packet measurements") + assert.False(t, acc.HasMeasurement("percent_packet_loss"), + "Fatal ping should not have packet measurements") + assert.False(t, acc.HasMeasurement("average_response_ms"), + "Fatal ping should not have packet measurements") +} diff --git a/plugins/inputs/win_perf_counters/win_perf_counters.go b/plugins/inputs/win_perf_counters/win_perf_counters.go index 4684289ee..fb7b093c0 100644 --- a/plugins/inputs/win_perf_counters/win_perf_counters.go +++ b/plugins/inputs/win_perf_counters/win_perf_counters.go @@ -107,7 +107,8 @@ type item struct { counterHandle win.PDH_HCOUNTER } -var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", " ", "_") +var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", + " ", "_", "%", "Percent", `\`, "") func (m *Win_PerfCounters) AddItem(metrics *itemList, query string, objectName string, counter string, instance string, measurement string, include_total bool) { @@ -299,13 +300,12 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error { tags["instance"] = s } tags["objectname"] = metric.objectName - fields[sanitizedChars.Replace(string(metric.counter))] = float32(c.FmtValue.DoubleValue) + fields[sanitizedChars.Replace(metric.counter)] = + float32(c.FmtValue.DoubleValue) - var measurement string - if metric.measurement == "" { + measurement := sanitizedChars.Replace(metric.measurement) + if measurement == "" { measurement = "win_perf_counters" - } else { - measurement = metric.measurement } acc.AddFields(measurement, fields, tags) } diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 4f7ce8053..ce6dc1f57 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -12,17 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var ( - invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) - - // Prometheus metric names must match this regex - // see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels - metricName = regexp.MustCompile("^[a-zA-Z_:][a-zA-Z0-9_:]*$") - - // Prometheus labels must match this regex - // see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels - labelName = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") -) +var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) type PrometheusClient struct { Listen string @@ -119,9 +109,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { if len(k) == 0 { continue } - if !labelName.MatchString(k) { - continue - } labels = append(labels, k) l[k] = v } @@ -144,11 +131,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { mname = fmt.Sprintf("%s_%s", key, n) } - // verify that it is a valid measurement name - if !metricName.MatchString(mname) { - continue - } - desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l) var metric prometheus.Metric var err error diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 1058faf83..598aa3155 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -28,6 +28,7 @@ type Accumulator struct { sync.Mutex Metrics []*Metric + Errors []error debug bool } @@ -84,6 +85,16 @@ func (a *Accumulator) AddFields( a.Metrics = append(a.Metrics, p) } +// AddError appends the given error to Accumulator.Errors. +func (a *Accumulator) AddError(err error) { + if err == nil { + return + } + a.Lock() + a.Errors = append(a.Errors, err) + a.Unlock() +} + func (a *Accumulator) SetPrecision(precision, interval time.Duration) { return }