diff --git a/CHANGELOG.md b/CHANGELOG.md index cf71e79aa..28ce825c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ - [#758](https://github.com/influxdata/telegraf/pull/758): UDP Listener input plugin, thanks @whatyouhide! - [#769](https://github.com/influxdata/telegraf/issues/769): httpjson plugin: allow specifying SSL configuration. - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! +- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! +- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! +- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. +- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. +- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! +- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998! +- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert! +- [#235](https://github.com/influxdata/telegraf/issues/235): Add number of users to the `system` input plugin. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" @@ -18,6 +26,7 @@ - [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert! - [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package - [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory +- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable. ## v0.10.4.1 diff --git a/Makefile b/Makefile index ef316bd03..c87f78b55 100644 --- a/Makefile +++ b/Makefile @@ -22,8 +22,8 @@ build-windows: ./cmd/telegraf/telegraf.go build-for-docker: - CGO_ENABLED=0 GOOS=linux go -o telegraf -ldflags \ - "-X main.Version=$(VERSION)" \ + CGO_ENABLED=0 GOOS=linux go build -installsuffix cgo -o telegraf -ldflags \ + "-s -X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go # Build with race detector diff --git a/README.md b/README.md index e9c20996a..fb9363100 100644 --- a/README.md +++ b/README.md @@ -214,11 +214,13 @@ Currently implemented sources: * disk * diskio * swap + * processes Telegraf can also collect metrics via the following service plugins: * statsd -* udp listener +* udp_listener +* tcp_listener * mqtt_consumer * kafka_consumer * nats_consumer diff --git a/agent/accumulator.go b/agent/accumulator.go index b04ff2b53..7ec22cd7f 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -105,7 +105,6 @@ func (ac *accumulator) AddFields( continue } } - result[k] = v // Validate uint64 and float64 fields switch val := v.(type) { @@ -116,6 +115,7 @@ func (ac *accumulator) AddFields( } else { result[k] = int64(9223372036854775807) } + continue case float64: // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { @@ -127,6 +127,8 @@ func (ac *accumulator) AddFields( continue } } + + result[k] = v } fields = nil if len(result) == 0 { @@ -168,5 +170,8 @@ func (ac *accumulator) setDefaultTags(tags map[string]string) { } func (ac *accumulator) addDefaultTag(key, value string) { + if ac.defaultTags == nil { + ac.defaultTags = make(map[string]string) + } ac.defaultTags[key] = value } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go new file mode 100644 index 000000000..05f9b02aa --- /dev/null +++ b/agent/accumulator_test.go @@ -0,0 +1,302 @@ +package agent + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/models" + + "github.com/stretchr/testify/assert" +) + +func TestAdd(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + actual) +} + +func TestAddDefaultTags(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,default=tag value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()), + actual) +} + +func TestAddFields(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": float64(99), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest usage=99") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test usage=99") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()), + actual) +} + +// Test that all Inf fields get dropped, and not added to metrics channel +func TestAddInfFields(t *testing.T) { + inf := math.Inf(1) + ninf := math.Inf(-1) + + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": inf, + "nusage": ninf, + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + assert.Len(t, a.metrics, 0) + + // test that non-inf fields are kept and not dropped + fields["notinf"] = float64(100) + a.AddFields("acctest", fields, map[string]string{}) + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest notinf=100") +} + +// Test that nan fields are dropped and not added +func TestAddNaNFields(t *testing.T) { + nan := math.NaN() + + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": nan, + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + assert.Len(t, a.metrics, 0) + + // test that non-nan fields are kept and not dropped + fields["notnan"] = float64(100) + a.AddFields("acctest", fields, map[string]string{}) + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest notnan=100") +} + +func TestAddUint64Fields(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": uint64(99), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest usage=99i") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test usage=99i") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test usage=99i %d", now.UnixNano()), + actual) +} + +func TestAddUint64Overflow(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + fields := map[string]interface{}{ + "usage": uint64(9223372036854775808), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}) + a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest usage=9223372036854775807i") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test usage=9223372036854775807i") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test usage=9223372036854775807i %d", now.UnixNano()), + actual) +} + +func TestAddInts(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", int(101), map[string]string{}) + a.Add("acctest", int32(101), map[string]string{"acc": "test"}) + a.Add("acctest", int64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,default=tag value=101i") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=101i") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=101i %d", now.UnixNano()), + actual) +} + +func TestAddFloats(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", float32(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()), + actual) +} + +func TestAddStrings(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", "test", map[string]string{"acc": "test"}) + a.Add("acctest", "foo", map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=\"test\"") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=\"foo\" %d", now.UnixNano()), + actual) +} + +func TestAddBools(t *testing.T) { + a := accumulator{} + a.addDefaultTag("default", "tag") + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.Add("acctest", true, map[string]string{"acc": "test"}) + a.Add("acctest", false, map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest,acc=test,default=tag value=true") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()), + actual) +} diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index a65c5607c..d54aaa4e3 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,8 +11,9 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" - + "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" + "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" ) @@ -30,11 +31,14 @@ var fSampleConfig = flag.Bool("sample-config", false, var fPidfile = flag.String("pidfile", "", "file to write our pid to") var fInputFilters = flag.String("input-filter", "", "filter the inputs to enable, separator is :") +var fInputList = flag.Bool("input-list", false, + "print available output plugins.") var fOutputFilters = flag.String("output-filter", "", "filter the outputs to enable, separator is :") +var fOutputList = flag.Bool("output-list", false, + "print available output plugins.") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") - var fInputFiltersLegacy = flag.String("filter", "", "filter the inputs to enable, separator is :") var fOutputFiltersLegacy = flag.String("outputfilter", "", @@ -59,7 +63,9 @@ The flags are: -sample-config print out full sample configuration to stdout -config-directory directory containing additional *.conf files -input-filter filter the input plugins to enable, separator is : + -input-list print all the plugins inputs -output-filter filter the output plugins to enable, separator is : + -output-list print all the available outputs -usage print usage for a plugin, ie, 'telegraf -usage mysql' -debug print metrics as they're generated to stdout -quiet run in quiet mode @@ -115,6 +121,22 @@ func main() { outputFilters = strings.Split(":"+outputFilter+":", ":") } + if *fOutputList { + fmt.Println("Available Output Plugins:") + for k, _ := range outputs.Outputs { + fmt.Printf(" %s\n", k) + } + return + } + + if *fInputList { + fmt.Println("Available Input Plugins:") + for k, _ := range inputs.Inputs { + fmt.Printf(" %s\n", k) + } + return + } + if *fVersion { v := fmt.Sprintf("Telegraf - Version %s", Version) fmt.Println(v) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 58dbdf261..853dc6d05 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -97,7 +97,7 @@ fields which begin with `time_`. percpu = true totalcpu = false # filter all fields beginning with 'time_' - drop = ["time_*"] + fielddrop = ["time_*"] ``` #### Input Config: tagpass and tagdrop @@ -106,7 +106,7 @@ fields which begin with `time_`. [[inputs.cpu]] percpu = true totalcpu = false - drop = ["cpu_time"] + fielddrop = ["cpu_time"] # Don't collect CPU data for cpu6 & cpu7 [inputs.cpu.tagdrop] cpu = [ "cpu6", "cpu7" ] @@ -199,7 +199,7 @@ to avoid measurement collisions: percpu = true totalcpu = false name_override = "percpu_usage" - drop = ["cpu_time*"] + fielddrop = ["cpu_time*"] ``` ## `[outputs.xxx]` Configuration diff --git a/etc/telegraf.conf b/etc/telegraf.conf index a6057ecd2..3deb7f895 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -123,6 +123,10 @@ [[inputs.mem]] # no configuration +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration + # Read metrics about swap memory usage [[inputs.swap]] # no configuration diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 9d111c757..33fa4e120 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -82,9 +82,11 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { } } } else { - log.Printf("WARNING: overwriting cached metrics, you may want to " + - "increase the metric_buffer_limit setting in your [agent] " + - "config if you do not wish to overwrite metrics.\n") + if ro.overwriteI == 0 { + log.Printf("WARNING: overwriting cached metrics, you may want to " + + "increase the metric_buffer_limit setting in your [agent] " + + "config if you do not wish to overwrite metrics.\n") + } if ro.overwriteI == len(ro.metrics) { ro.overwriteI = 0 } diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index d647872e6..243d815d1 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -48,6 +48,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/system" + _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" diff --git a/plugins/inputs/docker/README.md b/plugins/inputs/docker/README.md index 6086c89e8..97450e2aa 100644 --- a/plugins/inputs/docker/README.md +++ b/plugins/inputs/docker/README.md @@ -95,18 +95,50 @@ on the availability of per-cpu stats on your system. - io_serviced_recursive_sync - io_serviced_recursive_total - io_serviced_recursive_write +- docker_ + - n_used_file_descriptors + - n_cpus + - n_containers + - n_images + - n_goroutines + - n_listener_events + - memory_total + - pool_blocksize +- docker_data + - available + - total + - used +- docker_metadata + - available + - total + - used + ### Tags: -- All stats have the following tags: +- docker (memory_total) + - unit=bytes +- docker (pool_blocksize) + - unit=bytes +- docker_data + - unit=bytes +- docker_metadata + - unit=bytes + +- docker_cpu specific: - cont_id (container ID) - cont_image (container image) - cont_name (container name) -- docker_cpu specific: - cpu - docker_net specific: + - cont_id (container ID) + - cont_image (container image) + - cont_name (container name) - network - docker_blkio specific: + - cont_id (container ID) + - cont_image (container image) + - cont_name (container name) - device ### Example Output: @@ -114,6 +146,16 @@ on the availability of per-cpu stats on your system. ``` % ./telegraf -config ~/ws/telegraf.conf -input-filter docker -test * Plugin: docker, Collection 1 +> docker n_cpus=8i 1456926671065383978 +> docker n_used_file_descriptors=15i 1456926671065383978 +> docker n_containers=7i 1456926671065383978 +> docker n_images=152i 1456926671065383978 +> docker n_goroutines=36i 1456926671065383978 +> docker n_listener_events=0i 1456926671065383978 +> docker,unit=bytes memory_total=18935443456i 1456926671065383978 +> docker,unit=bytes pool_blocksize=65540i 1456926671065383978 +> docker_data,unit=bytes available=24340000000i,total=107400000000i,used=14820000000i 1456926671065383978 +> docker_metadata,unit=bytes available=2126999999i,total=2146999999i,used=20420000i 145692667106538 > docker_mem,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\ cont_image=spotify/kafka,cont_name=kafka \ active_anon=52568064i,active_file=6926336i,cache=12038144i,fail_count=0i,\ diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 0d89979c1..cdc8ec1e5 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -1,8 +1,11 @@ package system import ( + "encoding/json" "fmt" "log" + "regexp" + "strconv" "strings" "sync" "time" @@ -17,9 +20,29 @@ type Docker struct { Endpoint string ContainerNames []string - client *docker.Client + client DockerClient } +type DockerClient interface { + // Docker Client wrapper + // Useful for test + Info() (*docker.Env, error) + ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) + Stats(opts docker.StatsOptions) error +} + +const ( + KB = 1000 + MB = 1000 * KB + GB = 1000 * MB + TB = 1000 * GB + PB = 1000 * TB +) + +var ( + sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`) +) + var sampleConfig = ` ## Docker Endpoint ## To use TCP, set endpoint = "tcp://[ip]:[port]" @@ -58,12 +81,20 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { d.client = c } + // Get daemon info + err := d.gatherInfo(acc) + if err != nil { + fmt.Println(err.Error()) + } + + // List containers opts := docker.ListContainersOptions{} containers, err := d.client.ListContainers(opts) if err != nil { return err } + // Get container data var wg sync.WaitGroup wg.Add(len(containers)) for _, container := range containers { @@ -81,6 +112,76 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { return nil } +func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { + // Init vars + var driverStatus [][]string + dataFields := make(map[string]interface{}) + metadataFields := make(map[string]interface{}) + now := time.Now() + // Get info from docker daemon + info, err := d.client.Info() + if err != nil { + return err + } + + fields := map[string]interface{}{ + "n_cpus": info.GetInt64("NCPU"), + "n_used_file_descriptors": info.GetInt64("NFd"), + "n_containers": info.GetInt64("Containers"), + "n_images": info.GetInt64("Images"), + "n_goroutines": info.GetInt64("NGoroutines"), + "n_listener_events": info.GetInt64("NEventsListener"), + } + // Add metrics + acc.AddFields("docker", + fields, + nil, + now) + acc.AddFields("docker", + map[string]interface{}{"memory_total": info.GetInt64("MemTotal")}, + map[string]string{"unit": "bytes"}, + now) + // Get storage metrics + driverStatusRaw := []byte(info.Get("DriverStatus")) + json.Unmarshal(driverStatusRaw, &driverStatus) + for _, rawData := range driverStatus { + // Try to convert string to int (bytes) + value, err := parseSize(rawData[1]) + if err != nil { + continue + } + name := strings.ToLower(strings.Replace(rawData[0], " ", "_", -1)) + if name == "pool_blocksize" { + // pool blocksize + acc.AddFields("docker", + map[string]interface{}{"pool_blocksize": value}, + map[string]string{"unit": "bytes"}, + now) + } else if strings.HasPrefix(name, "data_space_") { + // data space + field_name := strings.TrimPrefix(name, "data_space_") + dataFields[field_name] = value + } else if strings.HasPrefix(name, "metadata_space_") { + // metadata space + field_name := strings.TrimPrefix(name, "metadata_space_") + metadataFields[field_name] = value + } + } + if len(dataFields) > 0 { + acc.AddFields("docker_data", + dataFields, + map[string]string{"unit": "bytes"}, + now) + } + if len(metadataFields) > 0 { + acc.AddFields("docker_metadata", + metadataFields, + map[string]string{"unit": "bytes"}, + now) + } + return nil +} + func (d *Docker) gatherContainer( container docker.APIContainers, acc telegraf.Accumulator, @@ -334,6 +435,27 @@ func sliceContains(in string, sl []string) bool { return false } +// Parses the human-readable size string into the amount it represents. +func parseSize(sizeStr string) (int64, error) { + matches := sizeRegex.FindStringSubmatch(sizeStr) + if len(matches) != 4 { + return -1, fmt.Errorf("invalid size: '%s'", sizeStr) + } + + size, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return -1, err + } + + uMap := map[string]int64{"k": KB, "m": MB, "g": GB, "t": TB, "p": PB} + unitPrefix := strings.ToLower(matches[3]) + if mul, ok := uMap[unitPrefix]; ok { + size *= float64(mul) + } + + return int64(size), nil +} + func init() { inputs.Add("docker", func() telegraf.Input { return &Docker{} diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index aebe8102e..23fd0bb34 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -1,12 +1,14 @@ package system import ( + "encoding/json" "testing" "time" "github.com/influxdata/telegraf/testutil" "github.com/fsouza/go-dockerclient" + "github.com/stretchr/testify/require" ) func TestDockerGatherContainerStats(t *testing.T) { @@ -194,3 +196,186 @@ func testStats() *docker.Stats { return stats } + +type FakeDockerClient struct { +} + +func (d FakeDockerClient) Info() (*docker.Env, error) { + env := docker.Env{"Containers=108", "OomKillDisable=false", "SystemTime=2016-02-24T00:55:09.15073105-05:00", "NEventsListener=0", "ID=5WQQ:TFWR:FDNG:OKQ3:37Y4:FJWG:QIKK:623T:R3ME:QTKB:A7F7:OLHD", "Debug=false", "LoggingDriver=json-file", "KernelVersion=4.3.0-1-amd64", "IndexServerAddress=https://index.docker.io/v1/", "MemTotal=3840757760", "Images=199", "CpuCfsQuota=true", "Name=absol", "SwapLimit=false", "IPv4Forwarding=true", "ExecutionDriver=native-0.2", "InitSha1=23a51f3c916d2b5a3bbb31caf301fd2d14edd518", "ExperimentalBuild=false", "CpuCfsPeriod=true", "RegistryConfig={\"IndexConfigs\":{\"docker.io\":{\"Mirrors\":null,\"Name\":\"docker.io\",\"Official\":true,\"Secure\":true}},\"InsecureRegistryCIDRs\":[\"127.0.0.0/8\"],\"Mirrors\":null}", "OperatingSystem=Linux Mint LMDE (containerized)", "BridgeNfIptables=true", "HttpsProxy=", "Labels=null", "MemoryLimit=false", "DriverStatus=[[\"Pool Name\",\"docker-8:1-1182287-pool\"],[\"Pool Blocksize\",\"65.54 kB\"],[\"Backing Filesystem\",\"extfs\"],[\"Data file\",\"/dev/loop0\"],[\"Metadata file\",\"/dev/loop1\"],[\"Data Space Used\",\"17.3 GB\"],[\"Data Space Total\",\"107.4 GB\"],[\"Data Space Available\",\"36.53 GB\"],[\"Metadata Space Used\",\"20.97 MB\"],[\"Metadata Space Total\",\"2.147 GB\"],[\"Metadata Space Available\",\"2.127 GB\"],[\"Udev Sync Supported\",\"true\"],[\"Deferred Removal Enabled\",\"false\"],[\"Data loop file\",\"/var/lib/docker/devicemapper/devicemapper/data\"],[\"Metadata loop file\",\"/var/lib/docker/devicemapper/devicemapper/metadata\"],[\"Library Version\",\"1.02.115 (2016-01-25)\"]]", "NFd=19", "HttpProxy=", "Driver=devicemapper", "NGoroutines=39", "InitPath=/usr/lib/docker.io/dockerinit", "NCPU=4", "DockerRootDir=/var/lib/docker", "NoProxy=", "BridgeNfIp6tables=true"} + return &env, nil +} + +func (d FakeDockerClient) ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) { + container1 := docker.APIContainers{ + ID: "e2173b9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296b7dfb", + Image: "quay.io/coreos/etcd:v2.2.2", + Command: "/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379", + Created: 1455941930, + Status: "Up 4 hours", + Ports: []docker.APIPort{ + docker.APIPort{ + PrivatePort: 7001, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 4001, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2380, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2379, + PublicPort: 2379, + Type: "tcp", + IP: "0.0.0.0", + }, + }, + SizeRw: 0, + SizeRootFs: 0, + Names: []string{"/etcd"}, + } + container2 := docker.APIContainers{ + ID: "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", + Image: "quay.io/coreos/etcd:v2.2.2", + Command: "/etcd -name etcd2 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379", + Created: 1455941933, + Status: "Up 4 hours", + Ports: []docker.APIPort{ + docker.APIPort{ + PrivatePort: 7002, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 4002, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2381, + PublicPort: 0, + Type: "tcp", + }, + docker.APIPort{ + PrivatePort: 2382, + PublicPort: 2382, + Type: "tcp", + IP: "0.0.0.0", + }, + }, + SizeRw: 0, + SizeRootFs: 0, + Names: []string{"/etcd2"}, + } + + containers := []docker.APIContainers{container1, container2} + return containers, nil + + //#{e6a96c84ca91a5258b7cb752579fb68826b68b49ff957487695cd4d13c343b44 titilambert/snmpsim /bin/sh -c 'snmpsimd --agent-udpv4-endpoint=0.0.0.0:31161 --process-user=root --process-group=user' 1455724831 Up 4 hours [{31161 31161 udp 0.0.0.0}] 0 0 [/snmp] map[]}]2016/02/24 01:05:01 Gathered metrics, (3s interval), from 1 inputs in 1.233836656s +} + +func (d FakeDockerClient) Stats(opts docker.StatsOptions) error { + jsonStat := `{"read":"2016-02-24T11:42:27.472459608-05:00","memory_stats":{"stats":{},"limit":18935443456},"blkio_stats":{"io_service_bytes_recursive":[{"major":252,"minor":1,"op":"Read","value":753664},{"major":252,"minor":1,"op":"Write"},{"major":252,"minor":1,"op":"Sync"},{"major":252,"minor":1,"op":"Async","value":753664},{"major":252,"minor":1,"op":"Total","value":753664}],"io_serviced_recursive":[{"major":252,"minor":1,"op":"Read","value":26},{"major":252,"minor":1,"op":"Write"},{"major":252,"minor":1,"op":"Sync"},{"major":252,"minor":1,"op":"Async","value":26},{"major":252,"minor":1,"op":"Total","value":26}]},"cpu_stats":{"cpu_usage":{"percpu_usage":[17871,4959158,1646137,1231652,11829401,244656,369972,0],"usage_in_usermode":10000000,"total_usage":20298847},"system_cpu_usage":24052607520000000,"throttling_data":{}},"precpu_stats":{"cpu_usage":{"percpu_usage":[17871,4959158,1646137,1231652,11829401,244656,369972,0],"usage_in_usermode":10000000,"total_usage":20298847},"system_cpu_usage":24052599550000000,"throttling_data":{}}}` + var stat docker.Stats + json.Unmarshal([]byte(jsonStat), &stat) + opts.Stats <- &stat + return nil +} + +func TestDockerGatherInfo(t *testing.T) { + var acc testutil.Accumulator + client := FakeDockerClient{} + d := Docker{client: client} + + err := d.Gather(&acc) + + require.NoError(t, err) + + acc.AssertContainsTaggedFields(t, + "docker", + map[string]interface{}{ + "n_listener_events": int64(0), + "n_cpus": int64(4), + "n_used_file_descriptors": int64(19), + "n_containers": int64(108), + "n_images": int64(199), + "n_goroutines": int64(39), + }, + map[string]string{}, + ) + + acc.AssertContainsTaggedFields(t, + "docker_data", + map[string]interface{}{ + "used": int64(17300000000), + "total": int64(107400000000), + "available": int64(36530000000), + }, + map[string]string{ + "unit": "bytes", + }, + ) + acc.AssertContainsTaggedFields(t, + "docker_cpu", + map[string]interface{}{ + "usage_total": uint64(1231652), + }, + map[string]string{ + "cont_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", + "cont_name": "etcd2", + "cont_image": "quay.io/coreos/etcd:v2.2.2", + "cpu": "cpu3", + }, + ) + acc.AssertContainsTaggedFields(t, + "docker_mem", + map[string]interface{}{ + "total_pgpgout": uint64(0), + "usage_percent": float64(0), + "rss": uint64(0), + "total_writeback": uint64(0), + "active_anon": uint64(0), + "total_pgmafault": uint64(0), + "total_rss": uint64(0), + "total_unevictable": uint64(0), + "active_file": uint64(0), + "total_mapped_file": uint64(0), + "pgpgin": uint64(0), + "total_active_file": uint64(0), + "total_active_anon": uint64(0), + "total_cache": uint64(0), + "inactive_anon": uint64(0), + "pgmajfault": uint64(0), + "total_inactive_anon": uint64(0), + "total_rss_huge": uint64(0), + "rss_huge": uint64(0), + "hierarchical_memory_limit": uint64(0), + "pgpgout": uint64(0), + "unevictable": uint64(0), + "total_inactive_file": uint64(0), + "writeback": uint64(0), + "total_pgfault": uint64(0), + "total_pgpgin": uint64(0), + "cache": uint64(0), + "mapped_file": uint64(0), + "inactive_file": uint64(0), + "max_usage": uint64(0), + "fail_count": uint64(0), + "pgfault": uint64(0), + "usage": uint64(0), + "limit": uint64(18935443456), + }, + map[string]string{ + "cont_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", + "cont_name": "etcd2", + "cont_image": "quay.io/coreos/etcd:v2.2.2", + }, + ) + + //fmt.Print(info) +} diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bc0d225c6..07c87199f 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -14,10 +14,11 @@ import ( ) type Kafka struct { - ConsumerGroup string - Topics []string - ZookeeperPeers []string - Consumer *consumergroup.ConsumerGroup + ConsumerGroup string + Topics []string + ZookeeperPeers []string + ZookeeperChroot string + Consumer *consumergroup.ConsumerGroup // Legacy metric buffer support MetricBuffer int @@ -48,6 +49,8 @@ var sampleConfig = ` topics = ["telegraf"] ## an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "/" ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc config := consumergroup.NewConfig() + config.Zookeeper.Chroot = k.ZookeeperChroot switch strings.ToLower(k.Offset) { case "oldest", "": config.Offsets.Initial = sarama.OffsetOldest diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 42cadfd60..e36889703 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -26,6 +26,9 @@ type MQTTConsumer struct { // Legacy metric buffer support MetricBuffer int + PersistentSession bool + ClientID string `toml:"client_id"` + // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file @@ -57,6 +60,13 @@ var sampleConfig = ` "sensors/#", ] + # if true, messages that can't be delivered while the subscriber is offline + # will be delivered when it comes back (such as on service restart). + # NOTE: if true, client_id MUST be set + persistent_session = false + # If empty, a random client ID will be generated. + client_id = "" + ## username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" @@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() + if m.PersistentSession && m.ClientID == "" { + return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + + " = true, you MUST also set client_id") + } + m.acc = acc if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) @@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() - opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + if m.ClientID == "" { + opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + } else { + opts.SetClientID(m.ClientID) + } tlsCfg, err := internal.GetTLSConfig( m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) @@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { } opts.SetAutoReconnect(true) opts.SetKeepAlive(time.Second * 60) + opts.SetCleanSession(!m.PersistentSession) return opts, nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index b1dd59bcf..e926ebbb2 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -7,6 +7,8 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" ) @@ -28,6 +30,52 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { return n, in } +// Test that default client has random ID +func TestRandomClientID(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}} + opts, err := m1.createOpts() + assert.NoError(t, err) + + m2 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}} + opts2, err2 := m2.createOpts() + assert.NoError(t, err2) + + assert.NotEqual(t, opts.ClientID, opts2.ClientID) +} + +// Test that default client has random ID +func TestClientID(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + ClientID: "telegraf-test", + } + opts, err := m1.createOpts() + assert.NoError(t, err) + + m2 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + ClientID: "telegraf-test", + } + opts2, err2 := m2.createOpts() + assert.NoError(t, err2) + + assert.Equal(t, "telegraf-test", opts2.ClientID) + assert.Equal(t, "telegraf-test", opts.ClientID) +} + +// Test that Start() fails if client ID is not set but persistent is +func TestPersistentClientIDFail(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + PersistentSession: true, + } + acc := testutil.Accumulator{} + err := m1.Start(&acc) + assert.Error(t, err) +} + // Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestMQTTConsumer() diff --git a/plugins/inputs/phpfpm/child.go b/plugins/inputs/phpfpm/child.go new file mode 100644 index 000000000..2ebdf2ffb --- /dev/null +++ b/plugins/inputs/phpfpm/child.go @@ -0,0 +1,331 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package phpfpm + +// This file implements FastCGI from the perspective of a child process. + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/http/cgi" + "os" + "strings" + "sync" + "time" +) + +// request holds the state for an in-progress request. As soon as it's complete, +// it's converted to an http.Request. +type request struct { + pw *io.PipeWriter + reqId uint16 + params map[string]string + buf [1024]byte + rawParams []byte + keepConn bool +} + +func newRequest(reqId uint16, flags uint8) *request { + r := &request{ + reqId: reqId, + params: map[string]string{}, + keepConn: flags&flagKeepConn != 0, + } + r.rawParams = r.buf[:0] + return r +} + +// parseParams reads an encoded []byte into Params. +func (r *request) parseParams() { + text := r.rawParams + r.rawParams = nil + for len(text) > 0 { + keyLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + valLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + if int(keyLen)+int(valLen) > len(text) { + return + } + key := readString(text, keyLen) + text = text[keyLen:] + val := readString(text, valLen) + text = text[valLen:] + r.params[key] = val + } +} + +// response implements http.ResponseWriter. +type response struct { + req *request + header http.Header + w *bufWriter + wroteHeader bool +} + +func newResponse(c *child, req *request) *response { + return &response{ + req: req, + header: http.Header{}, + w: newWriter(c.conn, typeStdout, req.reqId), + } +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) Write(data []byte) (int, error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + return r.w.Write(data) +} + +func (r *response) WriteHeader(code int) { + if r.wroteHeader { + return + } + r.wroteHeader = true + if code == http.StatusNotModified { + // Must not have body. + r.header.Del("Content-Type") + r.header.Del("Content-Length") + r.header.Del("Transfer-Encoding") + } else if r.header.Get("Content-Type") == "" { + r.header.Set("Content-Type", "text/html; charset=utf-8") + } + + if r.header.Get("Date") == "" { + r.header.Set("Date", time.Now().UTC().Format(http.TimeFormat)) + } + + fmt.Fprintf(r.w, "Status: %d %s\r\n", code, http.StatusText(code)) + r.header.Write(r.w) + r.w.WriteString("\r\n") +} + +func (r *response) Flush() { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + r.w.Flush() +} + +func (r *response) Close() error { + r.Flush() + return r.w.Close() +} + +type child struct { + conn *conn + handler http.Handler + + mu sync.Mutex // protects requests: + requests map[uint16]*request // keyed by request ID +} + +func newChild(rwc io.ReadWriteCloser, handler http.Handler) *child { + return &child{ + conn: newConn(rwc), + handler: handler, + requests: make(map[uint16]*request), + } +} + +func (c *child) serve() { + defer c.conn.Close() + defer c.cleanUp() + var rec record + for { + if err := rec.read(c.conn.rwc); err != nil { + return + } + if err := c.handleRecord(&rec); err != nil { + return + } + } +} + +var errCloseConn = errors.New("fcgi: connection should be closed") + +var emptyBody = ioutil.NopCloser(strings.NewReader("")) + +// ErrRequestAborted is returned by Read when a handler attempts to read the +// body of a request that has been aborted by the web server. +var ErrRequestAborted = errors.New("fcgi: request aborted by web server") + +// ErrConnClosed is returned by Read when a handler attempts to read the body of +// a request after the connection to the web server has been closed. +var ErrConnClosed = errors.New("fcgi: connection to web server closed") + +func (c *child) handleRecord(rec *record) error { + c.mu.Lock() + req, ok := c.requests[rec.h.Id] + c.mu.Unlock() + if !ok && rec.h.Type != typeBeginRequest && rec.h.Type != typeGetValues { + // The spec says to ignore unknown request IDs. + return nil + } + + switch rec.h.Type { + case typeBeginRequest: + if req != nil { + // The server is trying to begin a request with the same ID + // as an in-progress request. This is an error. + return errors.New("fcgi: received ID that is already in-flight") + } + + var br beginRequest + if err := br.read(rec.content()); err != nil { + return err + } + if br.role != roleResponder { + c.conn.writeEndRequest(rec.h.Id, 0, statusUnknownRole) + return nil + } + req = newRequest(rec.h.Id, br.flags) + c.mu.Lock() + c.requests[rec.h.Id] = req + c.mu.Unlock() + return nil + case typeParams: + // NOTE(eds): Technically a key-value pair can straddle the boundary + // between two packets. We buffer until we've received all parameters. + if len(rec.content()) > 0 { + req.rawParams = append(req.rawParams, rec.content()...) + return nil + } + req.parseParams() + return nil + case typeStdin: + content := rec.content() + if req.pw == nil { + var body io.ReadCloser + if len(content) > 0 { + // body could be an io.LimitReader, but it shouldn't matter + // as long as both sides are behaving. + body, req.pw = io.Pipe() + } else { + body = emptyBody + } + go c.serveRequest(req, body) + } + if len(content) > 0 { + // TODO(eds): This blocks until the handler reads from the pipe. + // If the handler takes a long time, it might be a problem. + req.pw.Write(content) + } else if req.pw != nil { + req.pw.Close() + } + return nil + case typeGetValues: + values := map[string]string{"FCGI_MPXS_CONNS": "1"} + c.conn.writePairs(typeGetValuesResult, 0, values) + return nil + case typeData: + // If the filter role is implemented, read the data stream here. + return nil + case typeAbortRequest: + c.mu.Lock() + delete(c.requests, rec.h.Id) + c.mu.Unlock() + c.conn.writeEndRequest(rec.h.Id, 0, statusRequestComplete) + if req.pw != nil { + req.pw.CloseWithError(ErrRequestAborted) + } + if !req.keepConn { + // connection will close upon return + return errCloseConn + } + return nil + default: + b := make([]byte, 8) + b[0] = byte(rec.h.Type) + c.conn.writeRecord(typeUnknownType, 0, b) + return nil + } +} + +func (c *child) serveRequest(req *request, body io.ReadCloser) { + r := newResponse(c, req) + httpReq, err := cgi.RequestFromMap(req.params) + if err != nil { + // there was an error reading the request + r.WriteHeader(http.StatusInternalServerError) + c.conn.writeRecord(typeStderr, req.reqId, []byte(err.Error())) + } else { + httpReq.Body = body + c.handler.ServeHTTP(r, httpReq) + } + r.Close() + c.mu.Lock() + delete(c.requests, req.reqId) + c.mu.Unlock() + c.conn.writeEndRequest(req.reqId, 0, statusRequestComplete) + + // Consume the entire body, so the host isn't still writing to + // us when we close the socket below in the !keepConn case, + // otherwise we'd send a RST. (golang.org/issue/4183) + // TODO(bradfitz): also bound this copy in time. Or send + // some sort of abort request to the host, so the host + // can properly cut off the client sending all the data. + // For now just bound it a little and + io.CopyN(ioutil.Discard, body, 100<<20) + body.Close() + + if !req.keepConn { + c.conn.Close() + } +} + +func (c *child) cleanUp() { + c.mu.Lock() + defer c.mu.Unlock() + for _, req := range c.requests { + if req.pw != nil { + // race with call to Close in c.serveRequest doesn't matter because + // Pipe(Reader|Writer).Close are idempotent + req.pw.CloseWithError(ErrConnClosed) + } + } +} + +// Serve accepts incoming FastCGI connections on the listener l, creating a new +// goroutine for each. The goroutine reads requests and then calls handler +// to reply to them. +// If l is nil, Serve accepts connections from os.Stdin. +// If handler is nil, http.DefaultServeMux is used. +func Serve(l net.Listener, handler http.Handler) error { + if l == nil { + var err error + l, err = net.FileListener(os.Stdin) + if err != nil { + return err + } + defer l.Close() + } + if handler == nil { + handler = http.DefaultServeMux + } + for { + rw, err := l.Accept() + if err != nil { + return err + } + c := newChild(rw, handler) + go c.serve() + } +} diff --git a/plugins/inputs/phpfpm/phpfpm_fcgi.go b/plugins/inputs/phpfpm/fcgi.go similarity index 79% rename from plugins/inputs/phpfpm/phpfpm_fcgi.go rename to plugins/inputs/phpfpm/fcgi.go index 03aac7634..689660ea0 100644 --- a/plugins/inputs/phpfpm/phpfpm_fcgi.go +++ b/plugins/inputs/phpfpm/fcgi.go @@ -17,11 +17,6 @@ import ( "errors" "io" "sync" - - "net" - "strconv" - - "strings" ) // recType is a record type, as defined by @@ -277,74 +272,3 @@ func (w *streamWriter) Close() error { // send empty record to close the stream return w.c.writeRecord(w.recType, w.reqId, nil) } - -func NewClient(h string, args ...interface{}) (fcgi *conn, err error) { - var con net.Conn - if len(args) != 1 { - err = errors.New("fcgi: not enough params") - return - } - switch args[0].(type) { - case int: - addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) - con, err = net.Dial("tcp", addr) - case string: - laddr := net.UnixAddr{Name: args[0].(string), Net: h} - con, err = net.DialUnix(h, nil, &laddr) - default: - err = errors.New("fcgi: we only accept int (port) or string (socket) params.") - } - fcgi = &conn{ - rwc: con, - } - return -} - -func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) { - defer client.rwc.Close() - var reqId uint16 = 1 - - err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) - if err != nil { - return - } - - err = client.writePairs(typeParams, reqId, env) - if err != nil { - return - } - - if len(requestData) > 0 { - if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { - return - } - } - - rec := &record{} - var err1 error - - // recive untill EOF or FCGI_END_REQUEST -READ_LOOP: - for { - err1 = rec.read(client.rwc) - if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { - if err1 != io.EOF { - err = err1 - } - break - } - - switch { - case rec.h.Type == typeStdout: - retout = append(retout, rec.content()...) - case rec.h.Type == typeStderr: - reterr = append(reterr, rec.content()...) - case rec.h.Type == typeEndRequest: - fallthrough - default: - break READ_LOOP - } - } - - return -} diff --git a/plugins/inputs/phpfpm/fcgi_client.go b/plugins/inputs/phpfpm/fcgi_client.go new file mode 100644 index 000000000..56978ad3a --- /dev/null +++ b/plugins/inputs/phpfpm/fcgi_client.go @@ -0,0 +1,86 @@ +package phpfpm + +import ( + "errors" + "io" + "net" + "strconv" + "strings" +) + +// Create an fcgi client +func newFcgiClient(h string, args ...interface{}) (*conn, error) { + var con net.Conn + if len(args) != 1 { + return nil, errors.New("fcgi: not enough params") + } + + var err error + switch args[0].(type) { + case int: + addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) + con, err = net.Dial("tcp", addr) + case string: + laddr := net.UnixAddr{Name: args[0].(string), Net: h} + con, err = net.DialUnix(h, nil, &laddr) + default: + err = errors.New("fcgi: we only accept int (port) or string (socket) params.") + } + fcgi := &conn{ + rwc: con, + } + + return fcgi, err +} + +func (client *conn) Request( + env map[string]string, + requestData string, +) (retout []byte, reterr []byte, err error) { + defer client.rwc.Close() + var reqId uint16 = 1 + + err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) + if err != nil { + return + } + + err = client.writePairs(typeParams, reqId, env) + if err != nil { + return + } + + if len(requestData) > 0 { + if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { + return + } + } + + rec := &record{} + var err1 error + + // recive untill EOF or FCGI_END_REQUEST +READ_LOOP: + for { + err1 = rec.read(client.rwc) + if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { + if err1 != io.EOF { + err = err1 + } + break + } + + switch { + case rec.h.Type == typeStdout: + retout = append(retout, rec.content()...) + case rec.h.Type == typeStderr: + reterr = append(reterr, rec.content()...) + case rec.h.Type == typeEndRequest: + fallthrough + default: + break READ_LOOP + } + } + + return +} diff --git a/plugins/inputs/phpfpm/fcgi_test.go b/plugins/inputs/phpfpm/fcgi_test.go new file mode 100644 index 000000000..15e0030a7 --- /dev/null +++ b/plugins/inputs/phpfpm/fcgi_test.go @@ -0,0 +1,280 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package phpfpm + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "net/http" + "testing" +) + +var sizeTests = []struct { + size uint32 + bytes []byte +}{ + {0, []byte{0x00}}, + {127, []byte{0x7F}}, + {128, []byte{0x80, 0x00, 0x00, 0x80}}, + {1000, []byte{0x80, 0x00, 0x03, 0xE8}}, + {33554431, []byte{0x81, 0xFF, 0xFF, 0xFF}}, +} + +func TestSize(t *testing.T) { + b := make([]byte, 4) + for i, test := range sizeTests { + n := encodeSize(b, test.size) + if !bytes.Equal(b[:n], test.bytes) { + t.Errorf("%d expected %x, encoded %x", i, test.bytes, b) + } + size, n := readSize(test.bytes) + if size != test.size { + t.Errorf("%d expected %d, read %d", i, test.size, size) + } + if len(test.bytes) != n { + t.Errorf("%d did not consume all the bytes", i) + } + } +} + +var streamTests = []struct { + desc string + recType recType + reqId uint16 + content []byte + raw []byte +}{ + {"single record", typeStdout, 1, nil, + []byte{1, byte(typeStdout), 0, 1, 0, 0, 0, 0}, + }, + // this data will have to be split into two records + {"two records", typeStdin, 300, make([]byte, 66000), + bytes.Join([][]byte{ + // header for the first record + {1, byte(typeStdin), 0x01, 0x2C, 0xFF, 0xFF, 1, 0}, + make([]byte, 65536), + // header for the second + {1, byte(typeStdin), 0x01, 0x2C, 0x01, 0xD1, 7, 0}, + make([]byte, 472), + // header for the empty record + {1, byte(typeStdin), 0x01, 0x2C, 0, 0, 0, 0}, + }, + nil), + }, +} + +type nilCloser struct { + io.ReadWriter +} + +func (c *nilCloser) Close() error { return nil } + +func TestStreams(t *testing.T) { + var rec record +outer: + for _, test := range streamTests { + buf := bytes.NewBuffer(test.raw) + var content []byte + for buf.Len() > 0 { + if err := rec.read(buf); err != nil { + t.Errorf("%s: error reading record: %v", test.desc, err) + continue outer + } + content = append(content, rec.content()...) + } + if rec.h.Type != test.recType { + t.Errorf("%s: got type %d expected %d", test.desc, rec.h.Type, test.recType) + continue + } + if rec.h.Id != test.reqId { + t.Errorf("%s: got request ID %d expected %d", test.desc, rec.h.Id, test.reqId) + continue + } + if !bytes.Equal(content, test.content) { + t.Errorf("%s: read wrong content", test.desc) + continue + } + buf.Reset() + c := newConn(&nilCloser{buf}) + w := newWriter(c, test.recType, test.reqId) + if _, err := w.Write(test.content); err != nil { + t.Errorf("%s: error writing record: %v", test.desc, err) + continue + } + if err := w.Close(); err != nil { + t.Errorf("%s: error closing stream: %v", test.desc, err) + continue + } + if !bytes.Equal(buf.Bytes(), test.raw) { + t.Errorf("%s: wrote wrong content", test.desc) + } + } +} + +type writeOnlyConn struct { + buf []byte +} + +func (c *writeOnlyConn) Write(p []byte) (int, error) { + c.buf = append(c.buf, p...) + return len(p), nil +} + +func (c *writeOnlyConn) Read(p []byte) (int, error) { + return 0, errors.New("conn is write-only") +} + +func (c *writeOnlyConn) Close() error { + return nil +} + +func TestGetValues(t *testing.T) { + var rec record + rec.h.Type = typeGetValues + + wc := new(writeOnlyConn) + c := newChild(wc, nil) + err := c.handleRecord(&rec) + if err != nil { + t.Fatalf("handleRecord: %v", err) + } + + const want = "\x01\n\x00\x00\x00\x12\x06\x00" + + "\x0f\x01FCGI_MPXS_CONNS1" + + "\x00\x00\x00\x00\x00\x00\x01\n\x00\x00\x00\x00\x00\x00" + if got := string(wc.buf); got != want { + t.Errorf(" got: %q\nwant: %q\n", got, want) + } +} + +func nameValuePair11(nameData, valueData string) []byte { + return bytes.Join( + [][]byte{ + {byte(len(nameData)), byte(len(valueData))}, + []byte(nameData), + []byte(valueData), + }, + nil, + ) +} + +func makeRecord( + recordType recType, + requestId uint16, + contentData []byte, +) []byte { + requestIdB1 := byte(requestId >> 8) + requestIdB0 := byte(requestId) + + contentLength := len(contentData) + contentLengthB1 := byte(contentLength >> 8) + contentLengthB0 := byte(contentLength) + return bytes.Join([][]byte{ + {1, byte(recordType), requestIdB1, requestIdB0, contentLengthB1, + contentLengthB0, 0, 0}, + contentData, + }, + nil) +} + +// a series of FastCGI records that start a request and begin sending the +// request body +var streamBeginTypeStdin = bytes.Join([][]byte{ + // set up request 1 + makeRecord(typeBeginRequest, 1, + []byte{0, byte(roleResponder), 0, 0, 0, 0, 0, 0}), + // add required parameters to request 1 + makeRecord(typeParams, 1, nameValuePair11("REQUEST_METHOD", "GET")), + makeRecord(typeParams, 1, nameValuePair11("SERVER_PROTOCOL", "HTTP/1.1")), + makeRecord(typeParams, 1, nil), + // begin sending body of request 1 + makeRecord(typeStdin, 1, []byte("0123456789abcdef")), +}, + nil) + +var cleanUpTests = []struct { + input []byte + err error +}{ + // confirm that child.handleRecord closes req.pw after aborting req + { + bytes.Join([][]byte{ + streamBeginTypeStdin, + makeRecord(typeAbortRequest, 1, nil), + }, + nil), + ErrRequestAborted, + }, + // confirm that child.serve closes all pipes after error reading record + { + bytes.Join([][]byte{ + streamBeginTypeStdin, + nil, + }, + nil), + ErrConnClosed, + }, +} + +type nopWriteCloser struct { + io.ReadWriter +} + +func (nopWriteCloser) Close() error { + return nil +} + +// Test that child.serve closes the bodies of aborted requests and closes the +// bodies of all requests before returning. Causes deadlock if either condition +// isn't met. See issue 6934. +func TestChildServeCleansUp(t *testing.T) { + for _, tt := range cleanUpTests { + input := make([]byte, len(tt.input)) + copy(input, tt.input) + rc := nopWriteCloser{bytes.NewBuffer(input)} + done := make(chan bool) + c := newChild(rc, http.HandlerFunc(func( + w http.ResponseWriter, + r *http.Request, + ) { + // block on reading body of request + _, err := io.Copy(ioutil.Discard, r.Body) + if err != tt.err { + t.Errorf("Expected %#v, got %#v", tt.err, err) + } + // not reached if body of request isn't closed + done <- true + })) + go c.serve() + // wait for body of request to be closed or all goroutines to block + <-done + } +} + +type rwNopCloser struct { + io.Reader + io.Writer +} + +func (rwNopCloser) Close() error { + return nil +} + +// Verifies it doesn't crash. Issue 11824. +func TestMalformedParams(t *testing.T) { + input := []byte{ + // beginRequest, requestId=1, contentLength=8, role=1, keepConn=1 + 1, 1, 0, 1, 0, 8, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, + // params, requestId=1, contentLength=10, k1Len=50, v1Len=50 (malformed, wrong length) + 1, 4, 0, 1, 0, 10, 0, 0, 50, 50, 3, 4, 5, 6, 7, 8, 9, 10, + // end of params + 1, 4, 0, 1, 0, 0, 0, 0, + } + rw := rwNopCloser{bytes.NewReader(input), ioutil.Discard} + c := newChild(rw, http.DefaultServeMux) + c.serve() +} diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index c07262342..199b0005b 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -112,6 +112,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { statusPath string ) + var err error if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") { u, err := url.Parse(addr) if err != nil { @@ -120,7 +121,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { socketAddr := strings.Split(u.Host, ":") fcgiIp := socketAddr[0] fcgiPort, _ := strconv.Atoi(socketAddr[1]) - fcgi, _ = NewClient(fcgiIp, fcgiPort) + fcgi, err = newFcgiClient(fcgiIp, fcgiPort) } else { socketAddr := strings.Split(addr, ":") if len(socketAddr) >= 2 { @@ -134,8 +135,13 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if _, err := os.Stat(socketPath); os.IsNotExist(err) { return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) } - fcgi, _ = NewClient("unix", socketPath) + fcgi, err = newFcgiClient("unix", socketPath) } + + if err != nil { + return err + } + return g.gatherFcgi(fcgi, statusPath, acc) } diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index 90552c2a6..ef96500a3 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -35,6 +35,10 @@ The above configuration would result in output like: # Measurements Note: prefix can be set by the user, per process. + +Threads related measurement names: +- procstat_[prefix_]num_threads value=5 + File descriptor related measurement names: - procstat_[prefix_]num_fds value=4 diff --git a/plugins/inputs/procstat/spec_processor.go b/plugins/inputs/procstat/spec_processor.go index b09ed4f21..bb248f003 100644 --- a/plugins/inputs/procstat/spec_processor.go +++ b/plugins/inputs/procstat/spec_processor.go @@ -52,6 +52,7 @@ func NewSpecProcessor( } func (p *SpecProcessor) pushMetrics() { + p.pushNThreadsStats() p.pushFDStats() p.pushCtxStats() p.pushIOStats() @@ -60,6 +61,15 @@ func (p *SpecProcessor) pushMetrics() { p.flush() } +func (p *SpecProcessor) pushNThreadsStats() error { + numThreads, err := p.proc.NumThreads() + if err != nil { + return fmt.Errorf("NumThreads error: %s\n", err) + } + p.add("num_threads", numThreads) + return nil +} + func (p *SpecProcessor) pushFDStats() error { fds, err := p.proc.NumFDs() if err != nil { diff --git a/plugins/inputs/redis/README.md b/plugins/inputs/redis/README.md new file mode 100644 index 000000000..d7d98ccc9 --- /dev/null +++ b/plugins/inputs/redis/README.md @@ -0,0 +1,86 @@ +# Telegraf Plugin: Redis + +### Configuration: + +``` +# Read Redis's basic status information +[[inputs.redis]] + ## specify servers via a url matching: + ## [protocol://][:password]@address[:port] + ## e.g. + ## tcp://localhost:6379 + ## tcp://:password@192.168.99.100 + ## + ## If no servers are specified, then localhost is used as the host. + ## If no port is specified, 6379 is used + servers = ["tcp://localhost:6379"] +``` + +### Measurements & Fields: + +- Measurement + - uptime_in_seconds + - connected_clients + - used_memory + - used_memory_rss + - used_memory_peak + - used_memory_lua + - rdb_changes_since_last_save + - total_connections_received + - total_commands_processed + - instantaneous_ops_per_sec + - instantaneous_input_kbps + - instantaneous_output_kbps + - sync_full + - sync_partial_ok + - sync_partial_err + - expired_keys + - evicted_keys + - keyspace_hits + - keyspace_misses + - pubsub_channels + - pubsub_patterns + - latest_fork_usec + - connected_slaves + - master_repl_offset + - repl_backlog_active + - repl_backlog_size + - repl_backlog_histlen + - mem_fragmentation_ratio + - used_cpu_sys + - used_cpu_user + - used_cpu_sys_children + - used_cpu_user_children + +### Tags: + +- All measurements have the following tags: + - port + - server + +### Example Output: + +Using this configuration: +``` +[[inputs.nginx]] + ## specify servers via a url matching: + ## [protocol://][:password]@address[:port] + ## e.g. + ## tcp://localhost:6379 + ## tcp://:password@192.168.99.100 + ## + ## If no servers are specified, then localhost is used as the host. + ## If no port is specified, 6379 is used + servers = ["tcp://localhost:6379"] +``` + +When run with: +``` +./telegraf -config telegraf.conf -input-filter redis -test +``` + +It produces: +``` +* Plugin: redis, Collection 1 +> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383 +``` diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go index 3d4827fc1..2af293d57 100644 --- a/plugins/inputs/snmp/snmp.go +++ b/plugins/inputs/snmp/snmp.go @@ -464,13 +464,14 @@ func (h *Host) SNMPMap(acc telegraf.Accumulator) error { // To get mapping between instance id // and instance name oid_asked := table.mappingTable + oid_next := oid_asked need_more_requests := true // Set max repetition maxRepetition := uint8(32) // Launch requests for need_more_requests { // Launch request - result, err3 := snmpClient.GetBulk([]string{oid_asked}, 0, maxRepetition) + result, err3 := snmpClient.GetBulk([]string{oid_next}, 0, maxRepetition) if err3 != nil { return err3 } @@ -572,6 +573,7 @@ func (h *Host) SNMPMap(acc telegraf.Accumulator) error { // Determine if we need more requests if strings.HasPrefix(lastOid, oid_asked) { need_more_requests = true + oid_next = lastOid } else { need_more_requests = false } diff --git a/plugins/inputs/system/PROCESSES_README.md b/plugins/inputs/system/PROCESSES_README.md new file mode 100644 index 000000000..006e043fb --- /dev/null +++ b/plugins/inputs/system/PROCESSES_README.md @@ -0,0 +1,58 @@ +# Processes Input Plugin + +This plugin gathers info about the total number of processes and groups +them by status (zombie, sleeping, running, etc.) + +On linux this plugin requires access to procfs (/proc), on other OSes +it requires access to execute `ps`. + +### Configuration: + +```toml +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration +``` + +### Measurements & Fields: + +- processes + - blocked (aka disk sleep or uninterruptible sleep) + - running + - sleeping + - stopped + - total + - zombie + - wait (freebsd only) + - idle (bsd only) + - paging (linux only) + - total_threads (linux only) + +### Process State Mappings + +Different OSes use slightly different State codes for their processes, these +state codes are documented in `man ps`, and I will give a mapping of what major +OS state codes correspond to in telegraf metrics: + +``` +Linux FreeBSD Darwin meaning + R R R running + S S S sleeping + Z Z Z zombie + T T T stopped + none I I idle (sleeping for longer than about 20 seconds) + D D,L U blocked (waiting in uninterruptible sleep, or locked) + W W none paging (linux kernel < 2.6 only), wait (freebsd) +``` + +### Tags: + +None + +### Example Output: + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test +* Plugin: processes, Collection 1 +> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042 +``` diff --git a/plugins/inputs/system/SYSTEM_README.md b/plugins/inputs/system/SYSTEM_README.md new file mode 100644 index 000000000..fc873c7e8 --- /dev/null +++ b/plugins/inputs/system/SYSTEM_README.md @@ -0,0 +1,35 @@ +# System Input Plugin + +The system plugin gathers general stats on system load, uptime, +and number of users logged in. It is basically equivalent +to the unix `uptime` command. + +### Configuration: + +```toml +# Read metrics about system load & uptime +[[inputs.system]] + # no configuration +``` + +### Measurements & Fields: + +- system + - load1 (float) + - load15 (float) + - load5 (float) + - n_users (integer) + - uptime (integer, seconds) + - uptime_format (string) + +### Tags: + +None + +### Example Output: + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter system -test +* Plugin: system, Collection 1 +> system load1=2.05,load15=2.38,load5=2.03,n_users=4i,uptime=239043i,uptime_format="2 days, 18:24" 1457546165399253452 +``` diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go new file mode 100644 index 000000000..aae0e6ba4 --- /dev/null +++ b/plugins/inputs/system/processes.go @@ -0,0 +1,216 @@ +// +build !windows + +package system + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "os/exec" + "path" + "runtime" + "strconv" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Processes struct { + execPS func() ([]byte, error) + readProcFile func(statFile string) ([]byte, error) + + forcePS bool + forceProc bool +} + +func (p *Processes) Description() string { + return "Get the number of processes and group them by status" +} + +func (p *Processes) SampleConfig() string { return "" } + +func (p *Processes) Gather(acc telegraf.Accumulator) error { + // Get an empty map of metric fields + fields := getEmptyFields() + + // Decide if we will use 'ps' to get stats (use procfs otherwise) + usePS := true + if runtime.GOOS == "linux" { + usePS = false + } + if p.forcePS { + usePS = true + } else if p.forceProc { + usePS = false + } + + // Gather stats from 'ps' or procfs + if usePS { + if err := p.gatherFromPS(fields); err != nil { + return err + } + } else { + if err := p.gatherFromProc(fields); err != nil { + return err + } + } + + acc.AddFields("processes", fields, nil) + return nil +} + +// Gets empty fields of metrics based on the OS +func getEmptyFields() map[string]interface{} { + fields := map[string]interface{}{ + "blocked": int64(0), + "zombies": int64(0), + "stopped": int64(0), + "running": int64(0), + "sleeping": int64(0), + "total": int64(0), + } + switch runtime.GOOS { + case "freebsd": + fields["idle"] = int64(0) + fields["wait"] = int64(0) + case "darwin": + fields["idle"] = int64(0) + case "openbsd": + fields["idle"] = int64(0) + case "linux": + fields["paging"] = int64(0) + fields["total_threads"] = int64(0) + } + return fields +} + +// exec `ps` to get all process states +func (p *Processes) gatherFromPS(fields map[string]interface{}) error { + out, err := p.execPS() + if err != nil { + return err + } + + for i, status := range bytes.Fields(out) { + if i == 0 && string(status) == "STAT" { + // This is a header, skip it + continue + } + switch status[0] { + case 'W': + fields["wait"] = fields["wait"].(int64) + int64(1) + case 'U', 'D', 'L': + // Also known as uninterruptible sleep or disk sleep + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombies"] = fields["zombies"].(int64) + int64(1) + case 'T': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'I': + fields["idle"] = fields["idle"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] from ps", + string(status[0])) + } + fields["total"] = fields["total"].(int64) + int64(1) + } + return nil +} + +// get process states from /proc/(pid)/stat files +func (p *Processes) gatherFromProc(fields map[string]interface{}) error { + files, err := ioutil.ReadDir("/proc") + if err != nil { + return err + } + + for _, file := range files { + if !file.IsDir() { + continue + } + + statFile := path.Join("/proc", file.Name(), "stat") + data, err := p.readProcFile(statFile) + if err != nil { + return err + } + if data == nil { + continue + } + + stats := bytes.Fields(data) + if len(stats) < 3 { + return fmt.Errorf("Something is terribly wrong with %s", statFile) + } + switch stats[2][0] { + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'D': + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombies"] = fields["zombies"].(int64) + int64(1) + case 'T', 't': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'W': + fields["paging"] = fields["paging"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] in file %s", + string(stats[2][0]), statFile) + } + fields["total"] = fields["total"].(int64) + int64(1) + + threads, err := strconv.Atoi(string(stats[19])) + if err != nil { + log.Printf("processes: Error parsing thread count: %s", err) + continue + } + fields["total_threads"] = fields["total_threads"].(int64) + int64(threads) + } + return nil +} + +func readProcFile(statFile string) ([]byte, error) { + if _, err := os.Stat(statFile); os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + data, err := ioutil.ReadFile(statFile) + if err != nil { + return nil, err + } + + return data, nil +} + +func execPS() ([]byte, error) { + bin, err := exec.LookPath("ps") + if err != nil { + return nil, err + } + + out, err := exec.Command(bin, "axo", "state").Output() + if err != nil { + return nil, err + } + + return out, err +} + +func init() { + inputs.Add("processes", func() telegraf.Input { + return &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } + }) +} diff --git a/plugins/inputs/system/processes_test.go b/plugins/inputs/system/processes_test.go new file mode 100644 index 000000000..de9b6aa5b --- /dev/null +++ b/plugins/inputs/system/processes_test.go @@ -0,0 +1,151 @@ +package system + +import ( + "fmt" + "runtime" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcesses(t *testing.T) { + processes := &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } + var acc testutil.Accumulator + + err := processes.Gather(&acc) + require.NoError(t, err) + + assert.True(t, acc.HasIntField("processes", "running")) + assert.True(t, acc.HasIntField("processes", "sleeping")) + assert.True(t, acc.HasIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "total")) + total, ok := acc.Get("processes") + require.True(t, ok) + assert.True(t, total.Fields["total"].(int64) > 0) +} + +func TestFromPS(t *testing.T) { + processes := &Processes{ + execPS: testExecPS, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["blocked"] = int64(4) + fields["zombies"] = int64(1) + fields["running"] = int64(4) + fields["sleeping"] = int64(34) + fields["total"] = int64(43) + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func TestFromPSError(t *testing.T) { + processes := &Processes{ + execPS: testExecPSError, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.Error(t, err) +} + +func TestFromProcFiles(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("This test only runs on linux") + } + tester := tester{} + processes := &Processes{ + readProcFile: tester.testProcFile, + forceProc: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["sleeping"] = tester.calls + fields["total_threads"] = tester.calls * 2 + fields["total"] = tester.calls + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func testExecPS() ([]byte, error) { + return []byte(testPSOut), nil +} + +// struct for counting calls to testProcFile +type tester struct { + calls int64 +} + +func (t *tester) testProcFile(_ string) ([]byte, error) { + t.calls++ + return []byte(fmt.Sprintf(testProcStat, "S", "2")), nil +} + +func testExecPSError() ([]byte, error) { + return []byte(testPSOut), fmt.Errorf("ERROR!") +} + +const testPSOut = ` +STAT +S +S +S +S +R +R +S +S +Ss +Ss +S +SNs +Ss +Ss +S +R+ +S +U +S +S +S +S +Ss +S+ +Ss +S +S+ +S+ +Ss +S+ +Ss +S +R+ +Ss +S +S+ +S+ +Ss +L +U +Z +D +S+ +` + +const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +` diff --git a/plugins/inputs/system/system.go b/plugins/inputs/system/system.go index 9922d5a92..42b0310a4 100644 --- a/plugins/inputs/system/system.go +++ b/plugins/inputs/system/system.go @@ -31,11 +31,17 @@ func (_ *SystemStats) Gather(acc telegraf.Accumulator) error { return err } + users, err := host.Users() + if err != nil { + return err + } + fields := map[string]interface{}{ "load1": loadavg.Load1, "load5": loadavg.Load5, "load15": loadavg.Load15, "uptime": hostinfo.Uptime, + "n_users": len(users), "uptime_format": format_uptime(hostinfo.Uptime), } acc.AddFields("system", fields, nil) diff --git a/plugins/inputs/tcp_listener/README.md b/plugins/inputs/tcp_listener/README.md new file mode 100644 index 000000000..63a7dea3c --- /dev/null +++ b/plugins/inputs/tcp_listener/README.md @@ -0,0 +1,30 @@ +# TCP listener service input plugin + +The TCP listener is a service input plugin that listens for messages on a TCP +socket and adds those messages to InfluxDB. +The plugin expects messages in the +[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +# Generic TCP listener +[[inputs.tcp_listener]] + ## Address and port to host TCP listener on + service_address = ":8094" + + ## Number of TCP messages allowed to queue up. Once filled, the + ## TCP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## Maximum number of concurrent TCP connections to allow + max_tcp_connections = 250 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go new file mode 100644 index 000000000..dd239fedf --- /dev/null +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -0,0 +1,264 @@ +package tcp_listener + +import ( + "bufio" + "fmt" + "log" + "net" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type TcpListener struct { + ServiceAddress string + AllowedPendingMessages int + MaxTCPConnections int `toml:"max_tcp_connections"` + + sync.Mutex + // Lock for preventing a data race during resource cleanup + cleanup sync.Mutex + wg sync.WaitGroup + + in chan []byte + done chan struct{} + // accept channel tracks how many active connections there are, if there + // is an available bool in accept, then we are below the maximum and can + // accept the connection + accept chan bool + + // track the listener here so we can close it in Stop() + listener *net.TCPListener + // track current connections so we can close them in Stop() + conns map[string]*net.TCPConn + + parser parsers.Parser + acc telegraf.Accumulator +} + +var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + + "You may want to increase allowed_pending_messages in the config\n" + +const sampleConfig = ` + ## Address and port to host TCP listener on + service_address = ":8094" + + ## Number of TCP messages allowed to queue up. Once filled, the + ## TCP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## Maximum number of concurrent TCP connections to allow + max_tcp_connections = 250 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (t *TcpListener) SampleConfig() string { + return sampleConfig +} + +func (t *TcpListener) Description() string { + return "Generic TCP listener" +} + +// All the work is done in the Start() function, so this is just a dummy +// function. +func (t *TcpListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (t *TcpListener) SetParser(parser parsers.Parser) { + t.parser = parser +} + +// Start starts the tcp listener service. +func (t *TcpListener) Start(acc telegraf.Accumulator) error { + t.Lock() + defer t.Unlock() + + t.acc = acc + t.in = make(chan []byte, t.AllowedPendingMessages) + t.done = make(chan struct{}) + t.accept = make(chan bool, t.MaxTCPConnections) + t.conns = make(map[string]*net.TCPConn) + for i := 0; i < t.MaxTCPConnections; i++ { + t.accept <- true + } + + // Start listener + var err error + address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress) + t.listener, err = net.ListenTCP("tcp", address) + if err != nil { + log.Fatalf("ERROR: ListenUDP - %s", err) + return err + } + log.Println("TCP server listening on: ", t.listener.Addr().String()) + + t.wg.Add(2) + go t.tcpListen() + go t.tcpParser() + + log.Printf("Started TCP listener service on %s\n", t.ServiceAddress) + return nil +} + +// Stop cleans up all resources +func (t *TcpListener) Stop() { + t.Lock() + defer t.Unlock() + close(t.done) + t.listener.Close() + + // Close all open TCP connections + // - get all conns from the t.conns map and put into slice + // - this is so the forget() function doesnt conflict with looping + // over the t.conns map + var conns []*net.TCPConn + t.cleanup.Lock() + for _, conn := range t.conns { + conns = append(conns, conn) + } + t.cleanup.Unlock() + for _, conn := range conns { + conn.Close() + } + + t.wg.Wait() + close(t.in) + log.Println("Stopped TCP listener service on ", t.ServiceAddress) +} + +// tcpListen listens for incoming TCP connections. +func (t *TcpListener) tcpListen() error { + defer t.wg.Done() + + for { + select { + case <-t.done: + return nil + default: + // Accept connection: + conn, err := t.listener.AcceptTCP() + if err != nil { + return err + } + + log.Printf("Received TCP Connection from %s", conn.RemoteAddr()) + + select { + case <-t.accept: + // not over connection limit, handle the connection properly. + t.wg.Add(1) + // generate a random id for this TCPConn + id := internal.RandomString(6) + t.remember(id, conn) + go t.handler(conn, id) + default: + // We are over the connection limit, refuse & close. + t.refuser(conn) + } + } + } +} + +// refuser refuses a TCP connection +func (t *TcpListener) refuser(conn *net.TCPConn) { + // Tell the connection why we are closing. + fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+ + " reached, closing.\nYou may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", t.MaxTCPConnections) + conn.Close() + log.Printf("Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("WARNING: Maximum TCP Connections reached, you may want to" + + " adjust max_tcp_connections") +} + +// handler handles a single TCP Connection +func (t *TcpListener) handler(conn *net.TCPConn, id string) { + // connection cleanup function + defer func() { + t.wg.Done() + conn.Close() + log.Printf("Closed TCP Connection from %s", conn.RemoteAddr()) + // Add one connection potential back to channel when this one closes + t.accept <- true + t.forget(id) + }() + + scanner := bufio.NewScanner(conn) + for { + select { + case <-t.done: + return + default: + if !scanner.Scan() { + return + } + buf := scanner.Bytes() + select { + case t.in <- buf: + default: + log.Printf(dropwarn, string(buf)) + } + } + } +} + +// tcpParser parses the incoming tcp byte packets +func (t *TcpListener) tcpParser() error { + defer t.wg.Done() + for { + select { + case <-t.done: + return nil + case packet := <-t.in: + if len(packet) == 0 { + continue + } + metrics, err := t.parser.Parse(packet) + if err == nil { + t.storeMetrics(metrics) + } else { + log.Printf("Malformed packet: [%s], Error: %s\n", + string(packet), err) + } + } + } +} + +func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error { + t.Lock() + defer t.Unlock() + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + return nil +} + +// forget a TCP connection +func (t *TcpListener) forget(id string) { + t.cleanup.Lock() + defer t.cleanup.Unlock() + delete(t.conns, id) +} + +// remember a TCP connection +func (t *TcpListener) remember(id string, conn *net.TCPConn) { + t.cleanup.Lock() + defer t.cleanup.Unlock() + t.conns[id] = conn +} + +func init() { + inputs.Add("tcp_listener", func() telegraf.Input { + return &TcpListener{} + }) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go new file mode 100644 index 000000000..b4aec9dd2 --- /dev/null +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -0,0 +1,259 @@ +package tcp_listener + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" + + testMsgs = ` +cpu_load_short,host=server02 value=12.0 1422568543702900257 +cpu_load_short,host=server03 value=12.0 1422568543702900257 +cpu_load_short,host=server04 value=12.0 1422568543702900257 +cpu_load_short,host=server05 value=12.0 1422568543702900257 +cpu_load_short,host=server06 value=12.0 1422568543702900257 +` +) + +func newTestTcpListener() (*TcpListener, chan []byte) { + in := make(chan []byte, 1500) + listener := &TcpListener{ + ServiceAddress: ":8194", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + in: in, + done: make(chan struct{}), + } + return listener, in +} + +func TestConnectTCP(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8194", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("tcp", "127.0.0.1:8194") + require.NoError(t, err) + + // send single message to socket + fmt.Fprintf(conn, testMsg) + time.Sleep(time.Millisecond * 15) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) + + // send multiple messages to socket + fmt.Fprintf(conn, testMsgs) + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +// Test that MaxTCPConections is respected +func TestConcurrentConns(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8195", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8195") + buf := make([]byte, 1500) + n, err := conn.Read(buf) + assert.NoError(t, err) + assert.Equal(t, + "Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+ + "You may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", + string(buf[:n])) + + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected when max==1 +func TestConcurrentConns1(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8196", + AllowedPendingMessages: 10000, + MaxTCPConnections: 1, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8196") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8196") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8196") + buf := make([]byte, 1500) + n, err := conn.Read(buf) + assert.NoError(t, err) + assert.Equal(t, + "Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+ + "You may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", + string(buf[:n])) + + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected +func TestCloseConcurrentConns(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8195", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + + listener.Stop() +} + +func TestRunParser(t *testing.T) { + var testmsg = []byte(testMsg) + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + if a := acc.NFields(); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) +} + +func TestRunParserInvalidMsg(t *testing.T) { + var testmsg = []byte("cpu_load_short") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + + if a := acc.NFields(); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +func TestRunParserGraphiteMsg(t *testing.T) { + var testmsg = []byte("cpu.load.graphite 12 1454780029") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "cpu_load_graphite", + map[string]interface{}{"value": float64(12)}) +} + +func TestRunParserJSONMsg(t *testing.T) { + var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "udp_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 9a3e0e678..f0288c73e 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml exit_if_fail telegraf -config $tmpdir/config.toml \ -test -input-filter cpu:mem -mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS +cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz eval "git describe --exact-match HEAD" if [ $? -eq 0 ]; then