diff --git a/accumulator.go b/accumulator.go index cedbe92b5..02aee761d 100644 --- a/accumulator.go +++ b/accumulator.go @@ -11,6 +11,20 @@ type Accumulator interface { tags map[string]string, t ...time.Time) + // AddGauge is the same as AddFields, but will add the metric as a "Gauge" + // type + AddGauge(measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time) + + // AddCounter is the same as AddFields, but will add the metric as a "Counter" + // type + AddCounter(measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time) + AddError(err error) Debug() bool diff --git a/agent/accumulator.go b/agent/accumulator.go index 6cdc9d3e7..ce3e22eeb 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -44,16 +44,52 @@ func (ac *accumulator) AddFields( tags map[string]string, t ...time.Time, ) { + if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil { + ac.metrics <- m + } +} + +func (ac *accumulator) AddGauge( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil { + ac.metrics <- m + } +} + +func (ac *accumulator) AddCounter( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil { + ac.metrics <- m + } +} + +// makeMetric either returns a metric, or returns nil if the metric doesn't +// need to be created (because of filtering, an error, etc.) +func (ac *accumulator) makeMetric( + measurement string, + fields map[string]interface{}, + tags map[string]string, + mType telegraf.ValueType, + t ...time.Time, +) telegraf.Metric { if len(fields) == 0 || len(measurement) == 0 { - return + return nil } if !ac.inputConfig.Filter.ShouldNamePass(measurement) { - return + return nil } if !ac.inputConfig.Filter.ShouldTagsPass(tags) { - return + return nil } // Override measurement name if set @@ -120,7 +156,7 @@ func (ac *accumulator) AddFields( } fields = nil if len(result) == 0 { - return + return nil } var timestamp time.Time @@ -131,15 +167,26 @@ func (ac *accumulator) AddFields( } timestamp = timestamp.Round(ac.precision) - m, err := telegraf.NewMetric(measurement, tags, result, timestamp) + var m telegraf.Metric + var err error + switch mType { + case telegraf.Counter: + m, err = telegraf.NewCounterMetric(measurement, tags, result, timestamp) + case telegraf.Gauge: + m, err = telegraf.NewGaugeMetric(measurement, tags, result, timestamp) + default: + m, err = telegraf.NewMetric(measurement, tags, result, timestamp) + } if err != nil { log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) - return + return nil } + if ac.trace { fmt.Println("> " + m.String()) } - ac.metrics <- m + + return m } // AddError passes a runtime error to the accumulator. diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 7ef5b3421..7da8c96e2 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -48,6 +48,76 @@ func TestAdd(t *testing.T) { actual) } +func TestAddGauge(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &models.InputConfig{} + + a.AddGauge("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{}) + a.AddGauge("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}) + a.AddGauge("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + assert.Equal(t, testm.Type(), telegraf.Gauge) + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + assert.Equal(t, testm.Type(), telegraf.Gauge) + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + actual) + assert.Equal(t, testm.Type(), telegraf.Gauge) +} + +func TestAddCounter(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &models.InputConfig{} + + a.AddCounter("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{}) + a.AddCounter("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}) + a.AddCounter("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + assert.Equal(t, testm.Type(), telegraf.Counter) + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + assert.Equal(t, testm.Type(), telegraf.Counter) + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + actual) + assert.Equal(t, testm.Type(), telegraf.Counter) +} + func TestAddNoPrecisionWithInterval(t *testing.T) { a := accumulator{} now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) diff --git a/plugins/inputs/system/cpu.go b/plugins/inputs/system/cpu.go index 8fc9739ba..295c0d347 100644 --- a/plugins/inputs/system/cpu.go +++ b/plugins/inputs/system/cpu.go @@ -13,13 +13,15 @@ type CPUStats struct { ps PS lastStats []cpu.TimesStat - PerCPU bool `toml:"percpu"` - TotalCPU bool `toml:"totalcpu"` + PerCPU bool `toml:"percpu"` + TotalCPU bool `toml:"totalcpu"` + CollectCPUTime bool `toml:"collect_cpu_time"` } func NewCPUStats(ps PS) *CPUStats { return &CPUStats{ - ps: ps, + ps: ps, + CollectCPUTime: true, } } @@ -32,8 +34,8 @@ var sampleConfig = ` percpu = true ## Whether to report total system cpu stats or not totalcpu = true - ## Comment this line if you want the raw CPU time metrics - fielddrop = ["time_*"] + ## If true, collect raw CPU time metrics. + collect_cpu_time = false ` func (_ *CPUStats) SampleConfig() string { @@ -54,23 +56,25 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error { total := totalCpuTime(cts) - // Add cpu time metrics - fields := map[string]interface{}{ - "time_user": cts.User, - "time_system": cts.System, - "time_idle": cts.Idle, - "time_nice": cts.Nice, - "time_iowait": cts.Iowait, - "time_irq": cts.Irq, - "time_softirq": cts.Softirq, - "time_steal": cts.Steal, - "time_guest": cts.Guest, - "time_guest_nice": cts.GuestNice, + if s.CollectCPUTime { + // Add cpu time metrics + fieldsC := map[string]interface{}{ + "time_user": cts.User, + "time_system": cts.System, + "time_idle": cts.Idle, + "time_nice": cts.Nice, + "time_iowait": cts.Iowait, + "time_irq": cts.Irq, + "time_softirq": cts.Softirq, + "time_steal": cts.Steal, + "time_guest": cts.Guest, + "time_guest_nice": cts.GuestNice, + } + acc.AddCounter("cpu", fieldsC, tags, now) } // Add in percentage if len(s.lastStats) == 0 { - acc.AddFields("cpu", fields, tags, now) // If it's the 1st gather, can't get CPU Usage stats yet continue } @@ -86,18 +90,19 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error { if totalDelta == 0 { continue } - - fields["usage_user"] = 100 * (cts.User - lastCts.User) / totalDelta - fields["usage_system"] = 100 * (cts.System - lastCts.System) / totalDelta - fields["usage_idle"] = 100 * (cts.Idle - lastCts.Idle) / totalDelta - fields["usage_nice"] = 100 * (cts.Nice - lastCts.Nice) / totalDelta - fields["usage_iowait"] = 100 * (cts.Iowait - lastCts.Iowait) / totalDelta - fields["usage_irq"] = 100 * (cts.Irq - lastCts.Irq) / totalDelta - fields["usage_softirq"] = 100 * (cts.Softirq - lastCts.Softirq) / totalDelta - fields["usage_steal"] = 100 * (cts.Steal - lastCts.Steal) / totalDelta - fields["usage_guest"] = 100 * (cts.Guest - lastCts.Guest) / totalDelta - fields["usage_guest_nice"] = 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta - acc.AddFields("cpu", fields, tags, now) + fieldsG := map[string]interface{}{ + "usage_user": 100 * (cts.User - lastCts.User) / totalDelta, + "usage_system": 100 * (cts.System - lastCts.System) / totalDelta, + "usage_idle": 100 * (cts.Idle - lastCts.Idle) / totalDelta, + "usage_nice": 100 * (cts.Nice - lastCts.Nice) / totalDelta, + "usage_iowait": 100 * (cts.Iowait - lastCts.Iowait) / totalDelta, + "usage_irq": 100 * (cts.Irq - lastCts.Irq) / totalDelta, + "usage_softirq": 100 * (cts.Softirq - lastCts.Softirq) / totalDelta, + "usage_steal": 100 * (cts.Steal - lastCts.Steal) / totalDelta, + "usage_guest": 100 * (cts.Guest - lastCts.Guest) / totalDelta, + "usage_guest_nice": 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta, + } + acc.AddGauge("cpu", fieldsG, tags, now) } s.lastStats = times diff --git a/plugins/inputs/system/disk.go b/plugins/inputs/system/disk.go index f79295294..e686a442d 100644 --- a/plugins/inputs/system/disk.go +++ b/plugins/inputs/system/disk.go @@ -70,7 +70,7 @@ func (s *DiskStats) Gather(acc telegraf.Accumulator) error { "inodes_free": du.InodesFree, "inodes_used": du.InodesUsed, } - acc.AddFields("disk", fields, tags) + acc.AddGauge("disk", fields, tags) } return nil @@ -139,7 +139,7 @@ func (s *DiskIOStats) Gather(acc telegraf.Accumulator) error { "write_time": io.WriteTime, "io_time": io.IoTime, } - acc.AddFields("diskio", fields, tags) + acc.AddCounter("diskio", fields, tags) } return nil diff --git a/plugins/inputs/system/kernel.go b/plugins/inputs/system/kernel.go index abad47731..66cb0f763 100644 --- a/plugins/inputs/system/kernel.go +++ b/plugins/inputs/system/kernel.go @@ -81,7 +81,7 @@ func (k *Kernel) Gather(acc telegraf.Accumulator) error { } } - acc.AddFields("kernel", fields, map[string]string{}) + acc.AddCounter("kernel", fields, map[string]string{}) return nil } diff --git a/plugins/inputs/system/memory.go b/plugins/inputs/system/memory.go index c6dbff45e..26dc550f8 100644 --- a/plugins/inputs/system/memory.go +++ b/plugins/inputs/system/memory.go @@ -35,7 +35,7 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error { "used_percent": 100 * float64(vm.Used) / float64(vm.Total), "available_percent": 100 * float64(vm.Available) / float64(vm.Total), } - acc.AddFields("mem", fields, nil) + acc.AddCounter("mem", fields, nil) return nil } @@ -56,15 +56,18 @@ func (s *SwapStats) Gather(acc telegraf.Accumulator) error { return fmt.Errorf("error getting swap memory info: %s", err) } - fields := map[string]interface{}{ + fieldsG := map[string]interface{}{ "total": swap.Total, "used": swap.Used, "free": swap.Free, "used_percent": swap.UsedPercent, - "in": swap.Sin, - "out": swap.Sout, } - acc.AddFields("swap", fields, nil) + fieldsC := map[string]interface{}{ + "in": swap.Sin, + "out": swap.Sout, + } + acc.AddGauge("swap", fieldsG, nil) + acc.AddCounter("swap", fieldsC, nil) return nil } diff --git a/plugins/inputs/system/memory_test.go b/plugins/inputs/system/memory_test.go index 1fced6918..4467c69aa 100644 --- a/plugins/inputs/system/memory_test.go +++ b/plugins/inputs/system/memory_test.go @@ -67,8 +67,6 @@ func TestMemStats(t *testing.T) { "used": uint64(1232), "used_percent": float64(12.2), "free": uint64(6412), - "in": uint64(7), - "out": uint64(830), } acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string)) } diff --git a/plugins/inputs/system/net.go b/plugins/inputs/system/net.go index f6bc05818..3f89176fb 100644 --- a/plugins/inputs/system/net.go +++ b/plugins/inputs/system/net.go @@ -81,7 +81,7 @@ func (s *NetIOStats) Gather(acc telegraf.Accumulator) error { "drop_in": io.Dropin, "drop_out": io.Dropout, } - acc.AddFields("net", fields, tags) + acc.AddCounter("net", fields, tags) } // Get system wide stats for different network protocols diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go index c16f7a480..1f77ae57d 100644 --- a/plugins/inputs/system/processes.go +++ b/plugins/inputs/system/processes.go @@ -57,7 +57,7 @@ func (p *Processes) Gather(acc telegraf.Accumulator) error { } } - acc.AddFields("processes", fields, nil) + acc.AddGauge("processes", fields, nil) return nil } diff --git a/plugins/inputs/system/system.go b/plugins/inputs/system/system.go index ff64740bf..1a61f11bf 100644 --- a/plugins/inputs/system/system.go +++ b/plugins/inputs/system/system.go @@ -37,16 +37,17 @@ func (_ *SystemStats) Gather(acc telegraf.Accumulator) error { return err } - fields := map[string]interface{}{ - "load1": loadavg.Load1, - "load5": loadavg.Load5, - "load15": loadavg.Load15, + acc.AddGauge("system", map[string]interface{}{ + "load1": loadavg.Load1, + "load5": loadavg.Load5, + "load15": loadavg.Load15, + "n_users": len(users), + "n_cpus": runtime.NumCPU(), + }, nil) + acc.AddCounter("system", map[string]interface{}{ "uptime": hostinfo.Uptime, - "n_users": len(users), "uptime_format": format_uptime(hostinfo.Uptime), - "n_cpus": runtime.NumCPU(), - } - acc.AddFields("system", fields, nil) + }, nil) return nil } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index cf07929f5..fe5727917 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -85,6 +85,24 @@ func (a *Accumulator) AddFields( a.Metrics = append(a.Metrics, p) } +func (a *Accumulator) AddCounter( + measurement string, + fields map[string]interface{}, + tags map[string]string, + timestamp ...time.Time, +) { + a.AddFields(measurement, fields, tags, timestamp...) +} + +func (a *Accumulator) AddGauge( + measurement string, + fields map[string]interface{}, + tags map[string]string, + timestamp ...time.Time, +) { + a.AddFields(measurement, fields, tags, timestamp...) +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {