Compare commits

..

4 Commits

Author SHA1 Message Date
Greg Linton
f88f665cd9 Add missing bracket 2018-06-28 11:49:07 -06:00
Ayrdrie
a2df042d92 Merge branch 'master' into logparser-divide-by-zero 2018-06-22 11:49:51 -06:00
Ayrdrie Palmer
c7a72b9a9d Update test for divide by zero error 2018-06-21 22:41:04 -06:00
Ayrdrie Palmer
09f884b4f0 Fix for divide by zero error by defaulting to current year if no year is given in timestamp 2018-06-21 22:15:33 -06:00
52 changed files with 447 additions and 1361 deletions

View File

@@ -16,11 +16,11 @@ jobs:
steps:
- checkout
- restore_cache:
key: vendor-{{ checksum "Gopkg.lock" }}
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }}
- run: 'make deps'
- save_cache:
name: 'vendored deps'
key: vendor-{{ checksum "Gopkg.lock" }}
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }}
paths:
- './vendor'
- persist_to_workspace:

View File

@@ -6,10 +6,6 @@
- [tengine](./plugins/inputs/tengine/README.md) - Contributed by @ertaoxu
### New Processors
- [enum](./plugins/processors/enum/README.md) - Contributed by @KarstenSchnitter
### New Aggregators
- [valuecounter](./plugins/aggregators/valuecounter/README.md) - Contributed by @piotr1212
@@ -26,26 +22,14 @@
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
- [#4322](https://github.com/influxdata/telegraf/pull/4322): Add log message when tail is added or removed from a file.
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
- [#4352](https://github.com/influxdata/telegraf/pull/4352): Gather IPMI metrics concurrently.
- [#4362](https://github.com/influxdata/telegraf/pull/4362): Add mongo document and connection metrics.
- [#3772](https://github.com/influxdata/telegraf/pull/3772): Add Enum Processor.
## v1.7.1 [2018-07-03]
## v1.7.1 [unreleased]
### Bugfixes
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
- [#4334](https://github.com/influxdata/telegraf/pull/4334): Fix syslog timestamp parsing with single digit day of month.
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
- [#4316](https://github.com/influxdata/telegraf/issues/4316): Fix incorrect precision being applied to metric in http_listener.
## v1.7 [2018-06-12]

View File

@@ -100,13 +100,6 @@ func init() {
}
```
### Input Plugin Development
* Run `make static` followed by `make plugin-[pluginName]` to spin up a docker dev environment
using docker-compose.
* ***[Optional]*** When developing a plugin, add a `dev` directory with a `docker-compose.yml` and `telegraf.conf`
as well as any other supporting files, where sensible.
## Adding Typed Metrics
In addition the the `AddFields` function, the accumulator also supports an

View File

@@ -92,15 +92,4 @@ docker-image:
plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl
ragel -Z -G2 $^ -o $@
static:
@echo "Building static linux binary..."
@CGO_ENABLED=0 \
GOOS=linux \
GOARCH=amd64 \
go build -ldflags "$(LDFLAGS)" ./cmd/telegraf
plugin-%:
@echo "Starting dev environment for $${$(@)} input plugin..."
@docker-compose -f plugins/inputs/$${$(@)}/dev/docker-compose.yml up
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64

View File

@@ -723,10 +723,6 @@
# ## Not used with telnet API.
# httpBatchSize = 50
#
# ## URI Path for Http requests to OpenTSDB.
# ## Used in cases where OpenTSDB is located behind a reverse proxy.
# httpPath = "/api/put"
#
# ## Debug true - Prints OpenTSDB communication
# debug = false
#

View File

@@ -246,8 +246,6 @@ func convert(in interface{}) (float64, bool) {
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}

View File

@@ -28,7 +28,6 @@ var m2, _ = metric.New("m1",
"c": float64(4),
"d": float64(6),
"e": float64(200),
"f": uint64(200),
"ignoreme": "string",
"andme": true,
},
@@ -82,10 +81,6 @@ func TestBasicStatsWithPeriod(t *testing.T) {
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -149,10 +144,6 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
}
expectedTags = map[string]string{
"foo": "bar",
@@ -178,7 +169,6 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
"c_count": float64(2),
"d_count": float64(2),
"e_count": float64(1),
"f_count": float64(1),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -204,7 +194,6 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
"c_min": float64(2),
"d_min": float64(2),
"e_min": float64(200),
"f_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -230,7 +219,6 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
"c_max": float64(4),
"d_max": float64(6),
"e_max": float64(200),
"f_max": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -256,7 +244,6 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
"c_mean": float64(3),
"d_mean": float64(4),
"e_mean": float64(200),
"f_mean": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -282,7 +269,6 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
"c_sum": float64(6),
"d_sum": float64(8),
"e_sum": float64(200),
"f_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -413,8 +399,6 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
"d_min": float64(2),
"e_max": float64(200), //e
"e_min": float64(200),
"f_max": float64(200), //f
"f_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -466,11 +450,6 @@ func TestBasicStatsWithAllStats(t *testing.T) {
"e_min": float64(200),
"e_mean": float64(200),
"e_sum": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"f_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",

View File

@@ -107,8 +107,6 @@ func convert(in interface{}) (float64, bool) {
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}

View File

@@ -38,7 +38,6 @@ var m2, _ = metric.New("m1",
"i": float64(1),
"j": float64(1),
"k": float64(200),
"l": uint64(200),
"ignoreme": "string",
"andme": true,
},
@@ -86,8 +85,6 @@ func TestMinMaxWithPeriod(t *testing.T) {
"j_min": float64(1),
"k_max": float64(200),
"k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -157,8 +154,6 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
"j_min": float64(1),
"k_max": float64(200),
"k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
}
expectedTags = map[string]string{
"foo": "bar",

View File

@@ -366,22 +366,9 @@ func (d *Docker) gatherContainer(
var v *types.StatsJSON
// Parse container name
cname := "unknown"
match := false
if len(container.Names) == 0 { // for tests
match = true
}
for i := range container.Names {
if !match {
match = d.containerFilter.Match(strings.TrimPrefix(container.Names[i], "/"))
if match {
cname = strings.TrimPrefix(container.Names[i], "/")
}
}
}
if !match {
return nil
if len(container.Names) > 0 {
// Not sure what to do with other names, just take the first.
cname = strings.TrimPrefix(container.Names[0], "/")
}
// the image name sometimes has a version part, or a private repo
@@ -404,6 +391,10 @@ func (d *Docker) gatherContainer(
"container_version": imageVersion,
}
if !d.containerFilter.Match(cname) {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
r, err := d.client.ContainerStats(ctx, container.ID, false)
@@ -420,11 +411,6 @@ func (d *Docker) gatherContainer(
}
daemonOSType := r.OSType
// use common (printed at `docker ps`) name for container
if cname != strings.TrimPrefix(v.Name, "/") && v.Name != "" {
tags["container_name"] = strings.TrimPrefix(v.Name, "/")
}
// Add labels to tags
for k, label := range container.Labels {
if d.labelFilter.Match(k) {
@@ -475,12 +461,12 @@ func (d *Docker) gatherContainer(
acc.AddFields("docker_container_health", healthfields, tags, time.Now())
}
parseContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total, daemonOSType)
gatherContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total, daemonOSType)
return nil
}
func parseContainerStats(
func gatherContainerStats(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,

View File

@@ -107,7 +107,7 @@ func TestDockerGatherContainerStats(t *testing.T) {
"container_image": "redis/image",
}
parseContainerStats(stats, &acc, tags, "123456789", true, true, "linux")
gatherContainerStats(stats, &acc, tags, "123456789", true, true, "linux")
// test docker_container_net measurement
netfields := map[string]interface{}{

View File

@@ -24,14 +24,11 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
- fibaro
- tags:
- deviceId (device id)
- section (section name)
- room (room name)
- name (device name)
- type (device type)
- fields:
- energy (float, when available from device)
- power (float, when available from device)
- value (float)
- value2 (float, when available from device)
@@ -39,17 +36,16 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
### Example Output:
```
fibaro,deviceId=9,host=vm1,name=Fenêtre\ haute,room=Cuisine,section=Cuisine,type=com.fibaro.FGRM222 energy=2.04,power=0.7,value=99,value2=99 1529996807000000000
fibaro,deviceId=10,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1529996807000000000
fibaro,deviceId=13,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 energy=4.33,power=0.7,value=99,value2=99 1529996807000000000
fibaro,deviceId=21,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1529996807000000000
fibaro,deviceId=90,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1529996807000000000
fibaro,deviceId=163,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=21.62 1529996807000000000
fibaro,deviceId=191,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1529996807000000000
fibaro,deviceId=193,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=195 1529996807000000000
fibaro,deviceId=200,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1529996807000000000
fibaro,deviceId=220,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=536 1529996807000000000
fibaro,deviceId=221,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=61 1529996807000000000
fibaro,deviceId=222,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1013.7 1529996807000000000
fibaro,deviceId=223,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=44 1529996807000000000
fibaro,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1523351010000000000
fibaro,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 value=99,value2=99 1523351010000000000
fibaro,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1523351010000000000
fibaro,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1523351010000000000
fibaro,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=17.87 1523351010000000000
fibaro,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1523351010000000000
fibaro,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=92 1523351010000000000
fibaro,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1523351010000000000
fibaro,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=880 1523351010000000000
fibaro,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=53 1523351010000000000
fibaro,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1006.9 1523351010000000000
fibaro,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=58 1523351010000000000
```

View File

@@ -67,8 +67,6 @@ type Devices struct {
Enabled bool `json:"enabled"`
Properties struct {
Dead interface{} `json:"dead"`
Energy interface{} `json:"energy"`
Power interface{} `json:"power"`
Value interface{} `json:"value"`
Value2 interface{} `json:"value2"`
} `json:"properties"`
@@ -164,26 +162,13 @@ func (f *Fibaro) Gather(acc telegraf.Accumulator) error {
}
tags := map[string]string{
"deviceId": strconv.FormatUint(uint64(device.ID), 10),
"section": sections[rooms[device.RoomID].SectionID],
"room": rooms[device.RoomID].Name,
"name": device.Name,
"type": device.Type,
"section": sections[rooms[device.RoomID].SectionID],
"room": rooms[device.RoomID].Name,
"name": device.Name,
"type": device.Type,
}
fields := make(map[string]interface{})
if device.Properties.Energy != nil {
if fValue, err := strconv.ParseFloat(device.Properties.Energy.(string), 64); err == nil {
fields["energy"] = fValue
}
}
if device.Properties.Power != nil {
if fValue, err := strconv.ParseFloat(device.Properties.Power.(string), 64); err == nil {
fields["power"] = fValue
}
}
if device.Properties.Value != nil {
value := device.Properties.Value
switch value {

View File

@@ -119,8 +119,6 @@ const devicesJSON = `
"type": "com.fibaro.FGRM222",
"enabled": true,
"properties": {
"energy": "4.33",
"power": "0.7",
"dead": "false",
"value": "50",
"value2": "75"
@@ -180,27 +178,27 @@ func TestJSONSuccess(t *testing.T) {
assert.Equal(t, uint64(5), acc.NMetrics())
// Ensure fields / values are correct - Device 1
tags := map[string]string{"deviceId": "1", "section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}
tags := map[string]string{"section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}
fields := map[string]interface{}{"value": float64(0)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 2
tags = map[string]string{"deviceId": "2", "section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"}
tags = map[string]string{"section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"}
fields = map[string]interface{}{"value": float64(1)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 3
tags = map[string]string{"deviceId": "3", "section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"}
tags = map[string]string{"section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"}
fields = map[string]interface{}{"value": float64(67)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 4
tags = map[string]string{"deviceId": "4", "section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"}
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"}
fields = map[string]interface{}{"value": float64(22.8)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 5
tags = map[string]string{"deviceId": "5", "section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"}
fields = map[string]interface{}{"energy": float64(4.33), "power": float64(0.7), "value": float64(50), "value2": float64(75)}
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"}
fields = map[string]interface{}{"value": float64(50), "value2": float64(75)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
}

View File

@@ -343,9 +343,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
h.mu.Lock()
defer h.mu.Unlock()
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
h.handler.SetTimeFunc(func() time.Time { return t })
metrics, err := h.parser.Parse(b)

View File

@@ -5,7 +5,6 @@ import (
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
@@ -62,18 +61,13 @@ func (m *Ipmi) Gather(acc telegraf.Accumulator) error {
}
if len(m.Servers) > 0 {
wg := sync.WaitGroup{}
for _, server := range m.Servers {
wg.Add(1)
go func(a telegraf.Accumulator, s string) {
defer wg.Done()
err := m.parse(a, s)
if err != nil {
a.AddError(err)
}
}(acc, server)
err := m.parse(acc, server)
if err != nil {
acc.AddError(err)
continue
}
}
wg.Wait()
} else {
err := m.parse(acc, "")
if err != nil {

View File

@@ -1,13 +0,0 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./test.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -1,12 +0,0 @@
[agent]
interval="1s"
flush_interval="1s"
[[inputs.logparser]]
files = ["/var/log/test.log"]
from_beginning = true
[inputs.logparser.grok]
patterns = [ "%{COMBINED_LOG_FORMAT}", "%{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{TIMESTAMP_ISO8601:timestamp}\\] \"(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})\" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) %{QS:referrer} %{QS:agent}"]
[[outputs.file]]
files = ["stdout"]

View File

@@ -1,2 +0,0 @@
127.0.0.1 ident auth [10/Oct/2000:13:55:36 -0700] "GET /anything HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"
127.0.0.1 ident auth [2018-02-21 13:10:34,555] "GET /peter HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"

View File

@@ -293,7 +293,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
timestamp = time.Unix(0, iv)
}
case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation(time.Stamp, v, p.loc)
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
if err == nil {
if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0)
@@ -340,6 +340,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil {
if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0)
}
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
@@ -469,7 +472,6 @@ func (t *tsModder) tsMod(ts time.Time) time.Time {
t.rollover = 0
return ts
}
if ts.Equal(t.last) {
t.dupe = ts
}

View File

@@ -971,41 +971,16 @@ func TestNewlineInPatterns(t *testing.T) {
require.NotNil(t, m)
}
func TestSyslogTimestamp(t *testing.T) {
tests := []struct {
name string
line string
expected time.Time
}{
{
name: "two digit day of month",
line: "Sep 25 09:01:55 value=42",
expected: time.Date(2018, time.September, 25, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month single space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month double space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2017, time.April, 1, 0, 0, 0, 0, time.UTC) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine(tt.line)
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, tt.expected, m.Time())
})
func TestSyslogTimestampParser(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("Sep 25 09:01:55 value=42")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
}
func TestReplaceTimestampComma(t *testing.T) {
@@ -1025,3 +1000,20 @@ func TestReplaceTimestampComma(t *testing.T) {
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
}
func TestEmptyYearInTimestamp(t *testing.T) {
p := &Parser{
Patterns: []string{`%{APPLE_SYSLOG_TIME_SHORT:timestamp:ts-"Jan 2 15:04:05"} %{HOSTNAME} %{APP_NAME:app_name}\[%{NUMBER:pid:int}\]%{GREEDYDATA:message}`},
CustomPatterns: `
APPLE_SYSLOG_TIME_SHORT %{MONTH} +%{MONTHDAY} %{TIME}
APP_NAME [a-zA-Z0-9\.]+
`,
}
require.NoError(t, p.Compile())
p.ParseLine("Nov 6 13:57:03 generic iTunes[6504]: info> Scale factor of main display = 2.0")
m, err := p.ParseLine("Nov 6 13:57:03 generic iTunes[6504]: objc[6504]: Object descriptor was null.")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
}

View File

@@ -203,10 +203,6 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
Poll: poll,
Logger: tail.DiscardingLogger,
})
//add message saying a new tailer was added for the file
log.Printf("D! tail added for file: %v", file)
if err != nil {
l.acc.AddError(err)
continue
@@ -291,10 +287,6 @@ func (l *LogParserPlugin) Stop() {
for _, t := range l.tailers {
err := t.Stop()
//message for a stopped tailer
log.Printf("D! tail dropped for file: %v", t.Filename)
if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
}

View File

@@ -1,16 +0,0 @@
version: '3'
services:
mongodb:
image: mongo
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
depends_on:
- mongodb
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -1,9 +0,0 @@
[agent]
interval="1s"
flush_interval="3s"
[[inputs.mongodb]]
servers = ["mongodb://mongodb:27017"]
[[outputs.file]]
files = ["stdout"]

View File

@@ -31,35 +31,28 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
}
var DefaultStats = map[string]string{
"inserts_per_sec": "Insert",
"queries_per_sec": "Query",
"updates_per_sec": "Update",
"deletes_per_sec": "Delete",
"getmores_per_sec": "GetMore",
"commands_per_sec": "Command",
"flushes_per_sec": "Flushes",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"net_in_bytes": "NetIn",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
"ttl_deletes_per_sec": "DeletedDocuments",
"ttl_passes_per_sec": "Passes",
"cursor_timed_out": "TimedOutC",
"cursor_no_timeout": "NoTimeoutC",
"cursor_pinned": "PinnedC",
"cursor_total": "TotalC",
"document_deleted": "DeletedD",
"document_inserted": "InsertedD",
"document_returned": "ReturnedD",
"document_updated": "UpdatedD",
"connections_current": "CurrentC",
"connections_available": "AvailableC",
"connections_total_created": "TotalCreatedC",
"inserts_per_sec": "Insert",
"queries_per_sec": "Query",
"updates_per_sec": "Update",
"deletes_per_sec": "Delete",
"getmores_per_sec": "GetMore",
"commands_per_sec": "Command",
"flushes_per_sec": "Flushes",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"net_in_bytes": "NetIn",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
"ttl_deletes_per_sec": "DeletedDocuments",
"ttl_passes_per_sec": "Passes",
"cursor_timed_out": "TimedOutC",
"cursor_no_timeout": "NoTimeoutC",
"cursor_pinned": "PinnedC",
"cursor_total": "TotalC",
}
var DefaultReplStats = map[string]string{

View File

@@ -38,13 +38,6 @@ func TestAddNonReplStats(t *testing.T) {
NoTimeoutC: 0,
PinnedC: 0,
TotalC: 0,
DeletedD: 0,
InsertedD: 0,
ReturnedD: 0,
UpdatedD: 0,
CurrentC: 0,
AvailableC: 0,
TotalCreatedC: 0,
},
tags,
)
@@ -189,50 +182,43 @@ func TestStateTag(t *testing.T) {
d.AddDefaultStats()
d.flush(&acc)
fields := map[string]interface{}{
"active_reads": int64(0),
"active_writes": int64(0),
"commands_per_sec": int64(0),
"deletes_per_sec": int64(0),
"flushes_per_sec": int64(0),
"getmores_per_sec": int64(0),
"inserts_per_sec": int64(0),
"member_status": "PRI",
"state": "PRIMARY",
"net_in_bytes": int64(0),
"net_out_bytes": int64(0),
"open_connections": int64(0),
"queries_per_sec": int64(0),
"queued_reads": int64(0),
"queued_writes": int64(0),
"repl_commands_per_sec": int64(0),
"repl_deletes_per_sec": int64(0),
"repl_getmores_per_sec": int64(0),
"repl_inserts_per_sec": int64(0),
"repl_queries_per_sec": int64(0),
"repl_updates_per_sec": int64(0),
"repl_lag": int64(0),
"repl_oplog_window_sec": int64(0),
"resident_megabytes": int64(0),
"updates_per_sec": int64(0),
"vsize_megabytes": int64(0),
"ttl_deletes_per_sec": int64(0),
"ttl_passes_per_sec": int64(0),
"jumbo_chunks": int64(0),
"total_in_use": int64(0),
"total_available": int64(0),
"total_created": int64(0),
"total_refreshing": int64(0),
"cursor_timed_out": int64(0),
"cursor_no_timeout": int64(0),
"cursor_pinned": int64(0),
"cursor_total": int64(0),
"document_deleted": int64(0),
"document_inserted": int64(0),
"document_returned": int64(0),
"document_updated": int64(0),
"connections_current": int64(0),
"connections_available": int64(0),
"connections_total_created": int64(0),
"active_reads": int64(0),
"active_writes": int64(0),
"commands_per_sec": int64(0),
"deletes_per_sec": int64(0),
"flushes_per_sec": int64(0),
"getmores_per_sec": int64(0),
"inserts_per_sec": int64(0),
"member_status": "PRI",
"state": "PRIMARY",
"net_in_bytes": int64(0),
"net_out_bytes": int64(0),
"open_connections": int64(0),
"queries_per_sec": int64(0),
"queued_reads": int64(0),
"queued_writes": int64(0),
"repl_commands_per_sec": int64(0),
"repl_deletes_per_sec": int64(0),
"repl_getmores_per_sec": int64(0),
"repl_inserts_per_sec": int64(0),
"repl_queries_per_sec": int64(0),
"repl_updates_per_sec": int64(0),
"repl_lag": int64(0),
"repl_oplog_window_sec": int64(0),
"resident_megabytes": int64(0),
"updates_per_sec": int64(0),
"vsize_megabytes": int64(0),
"ttl_deletes_per_sec": int64(0),
"ttl_passes_per_sec": int64(0),
"jumbo_chunks": int64(0),
"total_in_use": int64(0),
"total_available": int64(0),
"total_created": int64(0),
"total_refreshing": int64(0),
"cursor_timed_out": int64(0),
"cursor_no_timeout": int64(0),
"cursor_pinned": int64(0),
"cursor_total": int64(0),
}
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
}

View File

@@ -225,7 +225,7 @@ type FlushStats struct {
type ConnectionStats struct {
Current int64 `bson:"current"`
Available int64 `bson:"available"`
TotalCreated int64 `bson:"total_created"`
TotalCreated int64 `bson:"totalCreated"`
}
// DurTiming stores information related to journaling.
@@ -289,9 +289,8 @@ type OpcountStats struct {
// MetricsStats stores information related to metrics
type MetricsStats struct {
TTL *TTLStats `bson:"ttl"`
Cursor *CursorStats `bson:"cursor"`
Document *DocumentStats `bson:"document"`
TTL *TTLStats `bson:"ttl"`
Cursor *CursorStats `bson:"cursor"`
}
// TTLStats stores information related to documents with a ttl index.
@@ -306,14 +305,6 @@ type CursorStats struct {
Open *OpenCursorStats `bson:"open"`
}
// DocumentStats stores information related to document metrics.
type DocumentStats struct {
Deleted int64 `bson:"deleted"`
Inserted int64 `bson:"inserted"`
Returned int64 `bson:"returned"`
Updated int64 `bson:"updated"`
}
// OpenCursorStats stores information related to open cursor metrics
type OpenCursorStats struct {
NoTimeout int64 `bson:"noTimeout"`
@@ -466,12 +457,6 @@ type StatLine struct {
TimedOutC int64
NoTimeoutC, PinnedC, TotalC int64
// Document fields
DeletedD, InsertedD, ReturnedD, UpdatedD int64
// Connection fields
CurrentC, AvailableC, TotalCreatedC int64
// Collection locks (3.0 mmap only)
CollectionLocks *CollectionLockStatus

View File

@@ -1,42 +0,0 @@
version: '3'
services:
mysql:
image: mysql:5.7
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
maria:
image: mariadb
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
command: mysqld --userstat=1
percona:
image: percona
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
telegraf:
image: glinton/scratch
depends_on:
- mysql
- maria
- percona
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -1,61 +0,0 @@
# Uncomment each input as needed to test plugin
## mysql
#[[inputs.mysql]]
# servers = ["root:telegraf@tcp(mysql:3306)/"]
# gather_table_schema = true
# gather_process_list = true
# gather_user_statistics = true
# gather_info_schema_auto_inc = true
# gather_innodb_metrics = true
# gather_slave_status = true
# gather_binary_logs = false
# gather_table_io_waits = true
# gather_table_lock_waits = true
# gather_index_io_waits = true
# gather_event_waits = true
# gather_file_events_stats = true
# gather_perf_events_statements = true
# interval_slow = "30m"
# table_schema_databases = []
#
## mariadb
#[[inputs.mysql]]
# servers = ["root:telegraf@tcp(maria:3306)/"]
# gather_table_schema = true
# gather_process_list = true
# gather_user_statistics = true
# gather_info_schema_auto_inc = true
# gather_innodb_metrics = true
# gather_slave_status = true
# gather_binary_logs = false
# gather_table_io_waits = true
# gather_table_lock_waits = true
# gather_index_io_waits = true
# gather_event_waits = true
# gather_file_events_stats = true
# gather_perf_events_statements = true
# interval_slow = "30m"
# table_schema_databases = []
# percona
[[inputs.mysql]]
servers = ["root:telegraf@tcp(percona:3306)/"]
gather_table_schema = true
gather_process_list = true
gather_user_statistics = true
gather_info_schema_auto_inc = true
gather_innodb_metrics = true
gather_slave_status = true
gather_binary_logs = false
gather_table_io_waits = true
gather_table_lock_waits = true
gather_index_io_waits = true
gather_event_waits = true
gather_file_events_stats = true
gather_perf_events_statements = true
interval_slow = "30m"
table_schema_databases = []
[[outputs.file]]
files = ["stdout"]

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"database/sql"
"fmt"
"log"
"strconv"
"strings"
"sync"
@@ -79,7 +80,7 @@ var sampleConfig = `
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true
#
## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true
#
## gather auto_increment columns and max values from information schema
@@ -281,8 +282,9 @@ const (
GROUP BY command,state
ORDER BY null`
infoSchemaUserStatisticsQuery = `
SELECT *
FROM information_schema.user_statistics`
SELECT *,count(*)
FROM information_schema.user_statistics
GROUP BY user`
infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment,
CAST(pow(2, case data_type
@@ -759,6 +761,103 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
if len(fields) > 0 {
acc.AddFields("mysql", fields, tags)
}
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering process list: %s", err)
} else {
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
if err != nil {
return err
}
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
}
}
// gather connection metrics from user_statistics for each user
if m.GatherUserStatistics {
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering user stats: %s", err)
} else {
for conn_rows.Next() {
var user string
var total_connections int64
var concurrent_connections int64
var connected_time int64
var busy_time int64
var cpu_time int64
var bytes_received int64
var bytes_sent int64
var binlog_bytes_written int64
var rows_fetched int64
var rows_updated int64
var table_rows_read int64
var select_commands int64
var update_commands int64
var other_commands int64
var commit_transactions int64
var rollback_transactions int64
var denied_connections int64
var lost_connections int64
var access_denied int64
var empty_queries int64
var total_ssl_connections int64
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
}
}
return nil
}
@@ -809,29 +908,6 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
} else {
acc.AddFields("mysql_process_list", fields, tags)
}
// get count of connections from each user
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
return err
}
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
return nil
}
@@ -841,190 +917,77 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
// run query
rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil {
// disable collecting if table is not found (mysql specific error)
// (suppresses repeat errors)
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
m.GatherUserStatistics = false
}
return err
}
defer rows.Close()
cols, err := columnsToLower(rows.Columns())
if err != nil {
return err
}
read, err := getColSlice(len(cols))
if err != nil {
return err
}
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_fetched int64
rows_updated int64
table_rows_read int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
count uint32
)
servtag := getDSNTag(serv)
for rows.Next() {
err = rows.Scan(read...)
err = rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections, &count,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": *read[0].(*string)}
fields := map[string]interface{}{}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
for i := range cols {
if i == 0 {
continue // skip "user"
}
switch v := read[i].(type) {
case *int64:
fields[cols[i]] = *v
case *float64:
fields[cols[i]] = *v
case *string:
fields[cols[i]] = *v
default:
return fmt.Errorf("Unknown column type - %T", v)
}
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
return nil
}
// columnsToLower converts selected column names to lowercase.
func columnsToLower(s []string, e error) ([]string, error) {
if e != nil {
return nil, e
}
d := make([]string, len(s))
for i := range s {
d[i] = strings.ToLower(s[i])
}
return d, nil
}
// getColSlice returns an in interface slice that can be used in the row.Scan().
func getColSlice(l int) ([]interface{}, error) {
// list of all possible column names
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_read int64
rows_sent int64
rows_deleted int64
rows_inserted int64
rows_updated int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
max_statement_time_exceeded int64
// maria specific
fbusy_time float64
fcpu_time float64
// percona specific
rows_fetched int64
table_rows_read int64
)
switch l {
case 23: // maria5
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
}, nil
case 25: // maria10
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
&max_statement_time_exceeded,
}, nil
case 22: // percona
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&busy_time,
&cpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_fetched,
&rows_updated,
&table_rows_read,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
}, nil
}
return nil, fmt.Errorf("Not Supported - %d columns", l)
}
// gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {

View File

@@ -1,8 +1,6 @@
# Procstat Input Plugin
The procstat plugin can be used to monitor the system resource usage of one or more processes.
The procstat_lookup metric displays the query information,
specifically the number of PIDs returned on a search
Processes can be selected for monitoring using one of several methods:
- pidfile
@@ -129,18 +127,7 @@ implemented as a WMI query. The pattern allows fuzzy matching using only
- voluntary_context_switches (int)
- write_bytes (int, *telegraf* may need to be ran as **root**)
- write_count (int, *telegraf* may need to be ran as **root**)
- procstat_lookup
- tags:
- exe (string)
- pid_finder (string)
- pid_file (string)
- pattern (string)
- prefix (string)
- user (string)
- systemd_unit (string)
- cgroup (string)
- fields:
- pid_count (int)
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
### Example Output:

View File

@@ -107,13 +107,13 @@ requests that are in the queue but not yet issued to the device driver.
#### Calculate percent IO utilization per disk and host:
```
SELECT non_negative_derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
SELECT derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
```
#### Calculate average queue depth:
`iops_in_progress` will give you an instantaneous value. This will give you the average between polling intervals.
```
SELECT non_negative_derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
SELECT derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
```
### Example Output:

View File

@@ -1,30 +0,0 @@
# Swap Input Plugin
The swap plugin collects system swap metrics.
For more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
### Configuration:
```toml
# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
```
### Metrics:
- swap
- fields:
- free (int)
- total (int)
- used (int)
- used_percent (float)
- in (int)
- out (int)
### Example Output:
```
swap total=20855394304i,used_percent=45.43883523785713,used=9476448256i,free=1715331072i 1511894782000000000
```

View File

@@ -42,9 +42,45 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
return nil
}
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("mem", func() telegraf.Input {
return &MemStats{ps: ps}
})
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -30,6 +30,17 @@ func TestMemStats(t *testing.T) {
mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err)
@@ -50,4 +61,15 @@ func TestMemStats(t *testing.T) {
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))
acc.Metrics = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -1,47 +0,0 @@
package system
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -1,38 +0,0 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/require"
)
func TestSwapStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -72,15 +72,6 @@ It is recommended NOT to use this on OSes starting with Vista and newer because
Example for Windows Server 2003, this would be set to true:
`PreVistaSupport=true`
#### UsePerfCounterTime
Bool, if set to `true` will request a timestamp along with the PerfCounter data.
If se to `false`, current time will be used.
Supported on Windows Vista/Windows Server 2008 and newer
Example:
`UsePerfCounterTime=true`
### Object
See Entry below.

View File

@@ -1,73 +0,0 @@
// Copyright (c) 2010 The win Authors. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// 3. The names of the authors may not be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
// THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// This is the official list of 'win' authors for copyright purposes.
//
// Alexander Neumann <an2048@googlemail.com>
// Joseph Watson <jtwatson@linux-consulting.us>
// Kevin Pors <krpors@gmail.com>
// +build windows
package win_perf_counters
import (
"syscall"
)
type SYSTEMTIME struct {
wYear uint16
wMonth uint16
wDayOfWeek uint16
wDay uint16
wHour uint16
wMinute uint16
wSecond uint16
wMilliseconds uint16
}
type FILETIME struct {
dwLowDateTime uint32
dwHighDateTime uint32
}
var (
// Library
libkrnDll *syscall.DLL
// Functions
krn_FileTimeToSystemTime *syscall.Proc
krn_FileTimeToLocalFileTime *syscall.Proc
krn_LocalFileTimeToFileTime *syscall.Proc
krn_WideCharToMultiByte *syscall.Proc
)
func init() {
libkrnDll = syscall.MustLoadDLL("Kernel32.dll")
krn_FileTimeToSystemTime = libkrnDll.MustFindProc("FileTimeToSystemTime")
krn_FileTimeToLocalFileTime = libkrnDll.MustFindProc("FileTimeToLocalFileTime")
krn_LocalFileTimeToFileTime = libkrnDll.MustFindProc("LocalFileTimeToFileTime")
krn_WideCharToMultiByte = libkrnDll.MustFindProc("WideCharToMultiByte")
}

View File

@@ -38,15 +38,12 @@ import (
"unsafe"
"golang.org/x/sys/windows"
"time"
)
// Error codes
const (
ERROR_SUCCESS = 0
ERROR_FAILURE = 1
ERROR_INVALID_FUNCTION = 1
EPOCH_DIFFERENCE_MICROS int64 = 11644473600000000
ERROR_SUCCESS = 0
ERROR_INVALID_FUNCTION = 1
)
type (
@@ -173,7 +170,6 @@ var (
pdh_AddEnglishCounterW *syscall.Proc
pdh_CloseQuery *syscall.Proc
pdh_CollectQueryData *syscall.Proc
pdh_CollectQueryDataWithTime *syscall.Proc
pdh_GetFormattedCounterValue *syscall.Proc
pdh_GetFormattedCounterArrayW *syscall.Proc
pdh_OpenQuery *syscall.Proc
@@ -191,7 +187,6 @@ func init() {
pdh_AddEnglishCounterW, _ = libpdhDll.FindProc("PdhAddEnglishCounterW") // XXX: only supported on versions > Vista.
pdh_CloseQuery = libpdhDll.MustFindProc("PdhCloseQuery")
pdh_CollectQueryData = libpdhDll.MustFindProc("PdhCollectQueryData")
pdh_CollectQueryDataWithTime, _ = libpdhDll.FindProc("PdhCollectQueryDataWithTime")
pdh_GetFormattedCounterValue = libpdhDll.MustFindProc("PdhGetFormattedCounterValue")
pdh_GetFormattedCounterArrayW = libpdhDll.MustFindProc("PdhGetFormattedCounterArrayW")
pdh_OpenQuery = libpdhDll.MustFindProc("PdhOpenQuery")
@@ -308,37 +303,6 @@ func PdhCollectQueryData(hQuery PDH_HQUERY) uint32 {
return uint32(ret)
}
// PdhCollectQueryDataWithTime queries data from perfmon, retrieving the device/windows timestamp from the node it was collected on.
// Converts the filetime structure to a GO time class and returns the native time.
//
func PdhCollectQueryDataWithTime(hQuery PDH_HQUERY) (uint32, time.Time) {
var localFileTime FILETIME
ret, _, _ := pdh_CollectQueryDataWithTime.Call(uintptr(hQuery), uintptr(unsafe.Pointer(&localFileTime)))
if ret == ERROR_SUCCESS {
var utcFileTime FILETIME
ret, _, _ := krn_LocalFileTimeToFileTime.Call(
uintptr(unsafe.Pointer(&localFileTime)),
uintptr(unsafe.Pointer(&utcFileTime)))
if ret == 0 {
return uint32(ERROR_FAILURE), time.Now()
}
// First convert 100-ns intervals to microseconds, then adjust for the
// epoch difference
var totalMicroSeconds int64
totalMicroSeconds = ((int64(utcFileTime.dwHighDateTime) << 32) | int64(utcFileTime.dwLowDateTime)) / 10
totalMicroSeconds -= EPOCH_DIFFERENCE_MICROS
retTime := time.Unix(0, totalMicroSeconds*1000)
return uint32(ERROR_SUCCESS), retTime
}
return uint32(ret), time.Now()
}
// PdhGetFormattedCounterValueDouble formats the given hCounter using a 'double'. The result is set into the specialized union struct pValue.
// This function does not directly translate to a Windows counterpart due to union specialization tricks.
func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32, pValue *PDH_FMT_COUNTERVALUE_DOUBLE) uint32 {

View File

@@ -6,7 +6,6 @@ package win_perf_counters
import (
"errors"
"syscall"
"time"
"unsafe"
)
@@ -27,8 +26,7 @@ type PerformanceQuery interface {
GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error)
GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error)
CollectData() error
CollectDataWithTime() (time.Time, error)
IsVistaOrNewer() bool
AddEnglishCounterSupported() bool
}
//PdhError represents error returned from Performance Counters API
@@ -63,8 +61,8 @@ func (m *PerformanceQueryImpl) Open() error {
}
}
var handle PDH_HQUERY
if ret := PdhOpenQuery(0, 0, &handle); ret != ERROR_SUCCESS {
ret := PdhOpenQuery(0, 0, &handle)
if ret != ERROR_SUCCESS {
return NewPdhError(ret)
}
m.query = handle
@@ -76,8 +74,8 @@ func (m *PerformanceQueryImpl) Close() error {
if m.query == 0 {
return errors.New("uninitialised query")
}
if ret := PdhCloseQuery(m.query); ret != ERROR_SUCCESS {
ret := PdhCloseQuery(m.query)
if ret != ERROR_SUCCESS {
return NewPdhError(ret)
}
m.query = 0
@@ -89,8 +87,8 @@ func (m *PerformanceQueryImpl) AddCounterToQuery(counterPath string) (PDH_HCOUNT
if m.query == 0 {
return 0, errors.New("uninitialised query")
}
if ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle)
if ret != ERROR_SUCCESS {
return 0, NewPdhError(ret)
}
return counterHandle, nil
@@ -101,7 +99,8 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
if m.query == 0 {
return 0, errors.New("uninitialised query")
}
if ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle)
if ret != ERROR_SUCCESS {
return 0, NewPdhError(ret)
}
return counterHandle, nil
@@ -111,11 +110,13 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) {
var bufSize uint32
var buff []byte
var ret uint32
if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, nil); ret == PDH_MORE_DATA {
ret := PdhGetCounterInfo(counterHandle, 0, &bufSize, nil)
if ret == PDH_MORE_DATA {
buff = make([]byte, bufSize)
bufSize = uint32(len(buff))
if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0]); ret == ERROR_SUCCESS {
ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0])
if ret == ERROR_SUCCESS {
ci := (*PDH_COUNTER_INFO)(unsafe.Pointer(&buff[0]))
return UTF16PtrToString(ci.SzFullPath), nil
}
@@ -127,9 +128,9 @@ func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (strin
func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string, error) {
var bufSize uint32
var buff []uint16
var ret uint32
if ret = PdhExpandWildCardPath(counterPath, nil, &bufSize); ret == PDH_MORE_DATA {
ret := PdhExpandWildCardPath(counterPath, nil, &bufSize)
if ret == PDH_MORE_DATA {
buff = make([]uint16, bufSize)
bufSize = uint32(len(buff))
ret = PdhExpandWildCardPath(counterPath, &buff[0], &bufSize)
@@ -145,9 +146,8 @@ func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string,
func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) {
var counterType uint32
var value PDH_FMT_COUNTERVALUE_DOUBLE
var ret uint32
if ret = PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value); ret == ERROR_SUCCESS {
ret := PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value)
if ret == ERROR_SUCCESS {
if value.CStatus == PDH_CSTATUS_VALID_DATA || value.CStatus == PDH_CSTATUS_NEW_DATA {
return value.DoubleValue, nil
} else {
@@ -161,12 +161,11 @@ func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUN
func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
var buffSize uint32
var itemCount uint32
var ret uint32
if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil); ret == PDH_MORE_DATA {
ret := PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil)
if ret == PDH_MORE_DATA {
buff := make([]byte, buffSize)
if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0]); ret == ERROR_SUCCESS {
ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0])
if ret == ERROR_SUCCESS {
items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount]
values := make([]CounterValue, 0, itemCount)
for _, item := range items {
@@ -182,29 +181,17 @@ func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
}
func (m *PerformanceQueryImpl) CollectData() error {
var ret uint32
if m.query == 0 {
return errors.New("uninitialised query")
}
if ret = PdhCollectQueryData(m.query); ret != ERROR_SUCCESS {
ret := PdhCollectQueryData(m.query)
if ret != ERROR_SUCCESS {
return NewPdhError(ret)
}
return nil
}
func (m *PerformanceQueryImpl) CollectDataWithTime() (time.Time, error) {
if m.query == 0 {
return time.Now(), errors.New("uninitialised query")
}
ret, mtime := PdhCollectQueryDataWithTime(m.query)
if ret != ERROR_SUCCESS {
return time.Now(), NewPdhError(ret)
}
return mtime, nil
}
func (m *PerformanceQueryImpl) IsVistaOrNewer() bool {
func (m *PerformanceQueryImpl) AddEnglishCounterSupported() bool {
return PdhAddEnglishCounterSupported()
}

View File

@@ -23,8 +23,6 @@ var sampleConfig = `
## agent, it will not be gathered.
## Settings:
# PrintValid = false # Print All matching performance counters
# Whether request a timestamp along with the PerfCounter data or just use current time
# UsePerfCounterTime=true
# If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded
# and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names.
# If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names.
@@ -80,7 +78,6 @@ type Win_PerfCounters struct {
PrintValid bool
//deprecated: determined dynamically
PreVistaSupport bool
UsePerfCounterTime bool
Object []perfobject
CountersRefreshInterval internal.Duration
UseWildcardsExpansion bool
@@ -110,12 +107,6 @@ type counter struct {
counterHandle PDH_HCOUNTER
}
type instanceGrouping struct {
name string
instance string
objectname string
}
var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec",
" ", "_", "%", "Percent", `\`, "")
@@ -156,7 +147,7 @@ func (m *Win_PerfCounters) SampleConfig() string {
func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error {
var err error
var counterHandle PDH_HCOUNTER
if !m.query.IsVistaOrNewer() {
if !m.query.AddEnglishCounterSupported() {
counterHandle, err = m.query.AddCounterToQuery(counterPath)
if err != nil {
return err
@@ -258,15 +249,18 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
m.counters = m.counters[:0]
}
if err = m.query.Open(); err != nil {
err = m.query.Open()
if err != nil {
return err
}
if err = m.ParseConfig(); err != nil {
err = m.ParseConfig()
if err != nil {
return err
}
//some counters need two data samples before computing a value
if err = m.query.CollectData(); err != nil {
err = m.query.CollectData()
if err != nil {
return err
}
m.lastRefreshed = time.Now()
@@ -274,31 +268,37 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
time.Sleep(time.Second)
}
var collectFields = make(map[instanceGrouping]map[string]interface{})
var timestamp time.Time
if m.UsePerfCounterTime && m.query.IsVistaOrNewer() {
timestamp, err = m.query.CollectDataWithTime()
if err != nil {
return err
}
} else {
timestamp = time.Now()
if err = m.query.CollectData(); err != nil {
return err
}
type InstanceGrouping struct {
name string
instance string
objectname string
}
var collectFields = make(map[InstanceGrouping]map[string]interface{})
err = m.query.CollectData()
if err != nil {
return err
}
// For iterate over the known metrics and get the samples.
for _, metric := range m.counters {
// collect
if m.UseWildcardsExpansion {
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
if err == nil {
addCounterMeasurement(metric, metric.instance, value, collectFields)
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else {
//ignore invalid data as some counters from process instances returns this sometimes
if !isKnownCounterDataError(err) {
//ignore invalid data from as some counters from process instances returns this sometimes
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
}
@@ -326,14 +326,18 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
}
if add {
addCounterMeasurement(metric, cValue.InstanceName, cValue.Value, collectFields)
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, cValue.InstanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(cValue.Value)
}
}
} else {
//ignore invalid data as some counters from process instances returns this sometimes
if !isKnownCounterDataError(err) {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
}
}
}
@@ -345,33 +349,12 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
if len(instance.instance) > 0 {
tags["instance"] = instance.instance
}
acc.AddFields(instance.name, fields, tags, timestamp)
acc.AddFields(instance.name, fields, tags)
}
return nil
}
func addCounterMeasurement(metric *counter, instanceName string, value float64, collectFields map[instanceGrouping]map[string]interface{}) {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = instanceGrouping{measurement, instanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
}
func isKnownCounterDataError(err error) bool {
if pdhErr, ok := err.(*PdhError); ok && (pdhErr.ErrorCode == PDH_INVALID_DATA ||
pdhErr.ErrorCode == PDH_CALC_NEGATIVE_VALUE ||
pdhErr.ErrorCode == PDH_CSTATUS_INVALID_DATA) {
return true
}
return false
}
func init() {
inputs.Add("win_perf_counters", func() telegraf.Input {
return &Win_PerfCounters{query: &PerformanceQueryImpl{}, CountersRefreshInterval: internal.Duration{Duration: time.Second * 60}}

View File

@@ -70,11 +70,6 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
_, err = query.GetFormattedCounterValueDouble(hCounter)
require.NoError(t, err)
now := time.Now()
mtime, err := query.CollectDataWithTime()
require.NoError(t, err)
assert.True(t, mtime.Sub(now) < time.Second)
counterPath = "\\Process(*)\\% Processor Time"
paths, err := query.ExpandWildCardPath(counterPath)
require.NoError(t, err)
@@ -103,10 +98,6 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
require.NoError(t, err)
arr, err := query.GetFormattedCounterArrayDouble(hCounter)
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
time.Sleep(time.Second)
arr, err = query.GetFormattedCounterArrayDouble(hCounter)
}
require.NoError(t, err)
assert.True(t, len(arr) > 0, "Too")
@@ -605,7 +596,7 @@ func TestWinPerfcountersCollect2(t *testing.T) {
perfobjects[0] = PerfObject
m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
var acc testutil.Accumulator
err := m.Gather(&acc)
require.NoError(t, err)

View File

@@ -19,14 +19,12 @@ type testCounter struct {
value float64
}
type FakePerformanceQuery struct {
counters map[string]testCounter
vistaAndNewer bool
expandPaths map[string][]string
openCalled bool
counters map[string]testCounter
addEnglishSupported bool
expandPaths map[string][]string
openCalled bool
}
var MetricTime = time.Date(2018, 5, 28, 12, 0, 0, 0, time.UTC)
func (m *testCounter) ToCounterValue() *CounterValue {
_, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path)
if inst == "" {
@@ -104,10 +102,8 @@ func (m *FakePerformanceQuery) GetFormattedCounterValueDouble(counterHandle PDH_
} else {
if counter.value == 0 {
return 0, NewPdhError(PDH_INVALID_DATA)
} else if counter.value == -1 {
return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
} else {
return 0, NewPdhError(PDH_ACCESS_DENIED)
return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
}
}
}
@@ -142,18 +138,8 @@ func (m *FakePerformanceQuery) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
counters := make([]CounterValue, 0, len(e))
for _, p := range e {
counter := m.findCounterByPath(p)
if counter != nil {
if counter.value > 0 {
counters = append(counters, *counter.ToCounterValue())
} else {
if counter.value == 0 {
return nil, NewPdhError(PDH_INVALID_DATA)
} else if counter.value == -1 {
return nil, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
} else {
return nil, NewPdhError(PDH_ACCESS_DENIED)
}
}
if counter != nil && counter.value > 0 {
counters = append(counters, *counter.ToCounterValue())
} else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p)
}
@@ -174,15 +160,8 @@ func (m *FakePerformanceQuery) CollectData() error {
return nil
}
func (m *FakePerformanceQuery) CollectDataWithTime() (time.Time, error) {
if !m.openCalled {
return time.Now(), errors.New("CollectData: uninitialised query")
}
return MetricTime, nil
}
func (m *FakePerformanceQuery) IsVistaOrNewer() bool {
return m.vistaAndNewer
func (m *FakePerformanceQuery) AddEnglishCounterSupported() bool {
return m.addEnglishSupported
}
func createPerfObject(measurement string, object string, instances []string, counters []string, failOnMissing bool, includeTotal bool) []perfobject {
@@ -219,7 +198,7 @@ func TestAddItemSimple(t *testing.T) {
expandPaths: map[string][]string{
cps1[0]: cps1,
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -237,7 +216,7 @@ func TestAddItemInvalidCountPath(t *testing.T) {
expandPaths: map[string][]string{
cps1[0]: {"\\O/C"},
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -259,7 +238,7 @@ func TestParseConfigBasic(t *testing.T) {
cps1[2]: {cps1[2]},
cps1[3]: {cps1[3]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -291,7 +270,7 @@ func TestParseConfigNoInstance(t *testing.T) {
cps1[0]: {cps1[0]},
cps1[1]: {cps1[1]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -324,7 +303,7 @@ func TestParseConfigInvalidCounterError(t *testing.T) {
cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -355,7 +334,7 @@ func TestParseConfigInvalidCounterNoError(t *testing.T) {
cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -385,7 +364,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -402,7 +381,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -422,7 +401,7 @@ func TestParseConfigExpand(t *testing.T) {
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
err = m.query.Open()
require.NoError(t, err)
@@ -446,7 +425,7 @@ func TestSimpleGather(t *testing.T) {
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: false,
addEnglishSupported: false,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
@@ -470,65 +449,7 @@ func TestSimpleGather(t *testing.T) {
err = m.Gather(&acc2)
require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
}
func TestSimpleGatherWithTimestamp(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
cp1 := "\\O(I)\\C"
m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap([]string{cp1}, []float64{1.2}),
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: true,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.NoError(t, err)
fields1 := map[string]interface{}{
"C": float32(1.2),
}
tags1 := map[string]string{
"instance": "I",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
assert.True(t, acc1.HasTimestamp(measurement, MetricTime))
}
func TestGatherError(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
cp1 := "\\O(I)\\C"
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap([]string{cp1}, []float64{-2}),
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: false,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.Error(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.Error(t, err)
}
func TestGatherInvalidDataIgnore(t *testing.T) {
@@ -546,7 +467,7 @@ func TestGatherInvalidDataIgnore(t *testing.T) {
cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]},
},
vistaAndNewer: false,
addEnglishSupported: false,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
@@ -585,7 +506,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
},
vistaAndNewer: true,
addEnglishSupported: true,
}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator
@@ -619,7 +540,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
expandPaths: map[string][]string{
"\\O(*)\\*": cps2,
},
vistaAndNewer: true,
addEnglishSupported: true,
}
m.query = fpm
fpm.Open()
@@ -671,7 +592,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator
@@ -707,7 +628,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]},
"\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}
m.query = fpm
fpm.Open()
@@ -741,7 +662,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C2": {cps3[1], cps3[4]},
"\\O(*)\\C3": {cps3[2], cps3[5]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}
m.query = fpm
m.Object = perfObjects
@@ -789,7 +710,7 @@ func TestGatherTotalNoExpansion(t *testing.T) {
"\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]},
},
vistaAndNewer: true,
addEnglishSupported: true,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)

View File

@@ -22,7 +22,6 @@ var (
`%`, "-",
"#", "-",
"$", "-")
defaultHttpPath = "/api/put"
defaultSeperator = "_"
)
@@ -32,8 +31,7 @@ type OpenTSDB struct {
Host string
Port int
HttpBatchSize int // deprecated httpBatchSize form in 1.8
HttpPath string
HttpBatchSize int
Debug bool
@@ -54,11 +52,7 @@ var sampleConfig = `
## Number of data points to send to OpenTSDB in Http requests.
## Not used with telnet API.
http_batch_size = 50
## URI Path for Http requests to OpenTSDB.
## Used in cases where OpenTSDB is located behind a reverse proxy.
http_path = "/api/put"
httpBatchSize = 50
## Debug true - Prints OpenTSDB communication
debug = false
@@ -127,7 +121,6 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
Scheme: u.Scheme,
User: u.User,
BatchSize: o.HttpBatchSize,
Path: o.HttpPath,
Debug: o.Debug,
}
@@ -267,7 +260,6 @@ func sanitize(value string) string {
func init() {
outputs.Add("opentsdb", func() telegraf.Output {
return &OpenTSDB{
HttpPath: defaultHttpPath,
Separator: defaultSeperator,
}
})

View File

@@ -26,7 +26,6 @@ type openTSDBHttp struct {
Scheme string
User *url.Userinfo
BatchSize int
Path string
Debug bool
metricCounter int
@@ -124,7 +123,7 @@ func (o *openTSDBHttp) flush() error {
Scheme: o.Scheme,
User: o.User,
Host: fmt.Sprintf("%s:%d", o.Host, o.Port),
Path: o.Path,
Path: "/api/put",
}
if o.Debug {

View File

@@ -156,7 +156,6 @@ func BenchmarkHttpSend(b *testing.B) {
Port: port,
Prefix: "",
HttpBatchSize: BatchSize,
HttpPath: "/api/put",
}
b.ResetTimer()

View File

@@ -2,7 +2,6 @@ package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex"

View File

@@ -1,33 +0,0 @@
# Enum Processor Plugin
The Enum Processor allows the configuration of value mappings for metric fields.
The main use-case for this is to rewrite status codes such as _red_, _amber_ and
_green_ by numeric values such as 0, 1, 2. The plugin supports string and bool
types for the field values. Multiple Fields can be configured with separate
value mappings for each field. Default mapping values can be configured to be
used for all values, which are not contained in the value_mappings. The
processor supports explicit configuration of a destination field. By default the
source field is overwritten.
### Configuration:
```toml
[[processors.enum]]
[[processors.enum.fields]]
## Name of the field to map
source = "name"
## Destination field to be used for the mapped value. By default the source
## field is used, overwriting the original value.
# destination = "mapped"
## Default value to be used for all values not contained in the mapping
## table. When unset, the unmodified value for the field will be used if no
## match is found.
# default = 0
## Table of mappings
[processors.enum.fields.value_mappings]
value1 = 1
value2 = 2
```

View File

@@ -1,104 +0,0 @@
package enum
import (
"strconv"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
var sampleConfig = `
[[processors.enum.fields]]
## Name of the field to map
source = "name"
## Destination field to be used for the mapped value. By default the source
## field is used, overwriting the original value.
# destination = "mapped"
## Default value to be used for all values not contained in the mapping
## table. When unset, the unmodified value for the field will be used if no
## match is found.
# default = 0
## Table of mappings
[processors.enum.fields.value_mappings]
value1 = 1
value2 = 2
`
type EnumMapper struct {
Fields []Mapping
}
type Mapping struct {
Source string
Destination string
Default interface{}
ValueMappings map[string]interface{}
}
func (mapper *EnumMapper) SampleConfig() string {
return sampleConfig
}
func (mapper *EnumMapper) Description() string {
return "Map enum values according to given table."
}
func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
for i := 0; i < len(in); i++ {
in[i] = mapper.applyMappings(in[i])
}
return in
}
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
for _, mapping := range mapper.Fields {
if originalValue, isPresent := metric.GetField(mapping.Source); isPresent == true {
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
writeField(metric, mapping.getDestination(), mappedValue)
}
}
}
}
return metric
}
func adjustBoolValue(in interface{}) interface{} {
if mappedBool, isBool := in.(bool); isBool == true {
return strconv.FormatBool(mappedBool)
}
return in
}
func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
if mapped, found := mapping.ValueMappings[original]; found == true {
return mapped, true
}
if mapping.Default != nil {
return mapping.Default, true
}
return original, false
}
func (mapping *Mapping) getDestination() string {
if mapping.Destination != "" {
return mapping.Destination
}
return mapping.Source
}
func writeField(metric telegraf.Metric, name string, value interface{}) {
if metric.HasField(name) {
metric.RemoveField(name)
}
metric.AddField(name, value)
}
func init() {
processors.Add("enum", func() telegraf.Processor {
return &EnumMapper{}
})
}

View File

@@ -1,106 +0,0 @@
package enum
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
func createTestMetric() telegraf.Metric {
metric, _ := metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{
"string_value": "test",
"int_value": int(13),
"true_value": true,
},
time.Now(),
)
return metric
}
func calculateProcessedValues(mapper EnumMapper, metric telegraf.Metric) map[string]interface{} {
processed := mapper.Apply(metric)
return processed[0].Fields()
}
func assertFieldValue(t *testing.T, expected interface{}, field string, fields map[string]interface{}) {
value, present := fields[field]
assert.True(t, present, "value of field '"+field+"' was not present")
assert.EqualValues(t, expected, value)
}
func TestRetainsMetric(t *testing.T) {
mapper := EnumMapper{}
source := createTestMetric()
target := mapper.Apply(source)[0]
fields := target.Fields()
assertFieldValue(t, "test", "string_value", fields)
assertFieldValue(t, 13, "int_value", fields)
assertFieldValue(t, true, "true_value", fields)
assert.Equal(t, "m1", target.Name())
assert.Equal(t, source.Tags(), target.Tags())
assert.Equal(t, source.Time(), target.Time())
}
func TestMapsSingleStringValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields)
}
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 13, "int_value", fields)
}
func TestMapSingleBoolValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "true_value", fields)
}
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 42, "string_value", fields)
}
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields)
}
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields)
}
func TestWritesToDestination(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Destination: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields)
assertFieldValue(t, 1, "string_code", fields)
}