Compare commits

...

27 Commits

Author SHA1 Message Date
Greg Linton
6f4bd9ad82 Don't read/check msg length 2018-07-03 15:58:03 -06:00
Greg Linton
4ea618ea26 Enhance syslog reading and ensure proper length is read 2018-07-03 15:05:59 -06:00
Greg Linton
a7545e6cac Don't close connection if readTimeout exceeded 2018-07-02 18:56:58 -06:00
Greg Linton
58e815fdd1 Merge branch 'master' into bugfix/4335 2018-07-02 18:13:44 -06:00
Greg Linton
06682c6350 Cleanup syslog messages
Remove leading and trailing spaces and/or newlines from syslog message
field.
2018-07-02 18:11:58 -06:00
Greg Linton
839ca60b0e Improve syslog connection handling
Resolves #4335
2018-07-02 17:49:13 -06:00
Ayrdrie
9fe90d71f4 Add plugin development framework (#4324) 2018-07-02 16:34:40 -07:00
Daniel Nelson
2ee374cf50 Deprecate camelCase config options in opentsdb output 2018-07-02 15:10:10 -07:00
Daniel Nelson
220e6c5361 Update changelog 2018-07-02 15:06:35 -07:00
Jacob Lisi
c7cfc2ec39 Add http path configuration for OpenTSDB output (#4347) 2018-07-02 15:04:01 -07:00
Greg Linton
9bc63c2f7a Improve CircleCI dependency caching 2018-07-02 14:23:29 -06:00
Daniel Nelson
56ea2eb57b Update changelog 2018-07-02 13:10:39 -07:00
Daniel Nelson
210dfcee83 Fix field name typo in swap documentation 2018-07-02 13:08:43 -07:00
Steve Domino
b7a02c73b3 Document swap input plugin and move to separate file (#4342) 2018-07-02 13:07:57 -07:00
Daniel Nelson
73e2e6afc5 Update changelog 2018-07-02 11:58:14 -07:00
Pierrick Brossin
b2586a7eaf Add energy and power field and device id tag to fibaro input (#4343) 2018-07-02 11:57:05 -07:00
Daniel Nelson
abfbf4f4f2 Update changelog 2018-06-29 19:08:09 -07:00
Adrián López
85eacf268b Fix minmax and basicstats aggregators to use uint64 (#4294) 2018-06-29 19:07:08 -07:00
Daniel Nelson
1a781a5851 Update changelog 2018-06-29 19:02:53 -07:00
Vlasta Hajek
ed2bc1151b Allow use of counter time in win perf counters (#4267) 2018-06-29 19:01:28 -07:00
Daniel Nelson
b2e972cd81 Update changelog 2018-06-29 18:18:58 -07:00
Greg
54056f3808 Handle mysql input variations in the user_statistics collecting (#4306) 2018-06-29 18:16:52 -07:00
Daniel Nelson
5aa199e2b3 Update changelog 2018-06-29 18:06:45 -07:00
Daniel Nelson
9bd5e10133 Fix syslog timestamp parsing with single digit day of month (#4334) 2018-06-29 18:05:46 -07:00
Daniel Nelson
beaef8e3da Update changelog 2018-06-29 16:20:03 -07:00
maxunt
a10262c5d6 Add log message when tail is added or removed from a file (#4322) 2018-06-29 16:15:33 -07:00
Daniel Nelson
8bf18d6ac7 Fix name of hadoop example config 2018-06-29 16:07:30 -07:00
40 changed files with 1000 additions and 359 deletions

View File

@@ -16,11 +16,11 @@ jobs:
steps: steps:
- checkout - checkout
- restore_cache: - restore_cache:
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }} key: vendor-{{ checksum "Gopkg.lock" }}
- run: 'make deps' - run: 'make deps'
- save_cache: - save_cache:
name: 'vendored deps' name: 'vendored deps'
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }} key: vendor-{{ checksum "Gopkg.lock" }}
paths: paths:
- './vendor' - './vendor'
- persist_to_workspace: - persist_to_workspace:

View File

@@ -22,6 +22,10 @@
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input. - [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format. - [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin. - [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
- [#4322](https://github.com/influxdata/telegraf/pull/4322): Add log message when tail is added or removed from a file.
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
## v1.7.1 [unreleased] ## v1.7.1 [unreleased]
@@ -30,6 +34,10 @@
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser. - [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues. - [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
- [#4334](https://github.com/influxdata/telegraf/pull/4334): Fix syslog timestamp parsing with single digit day of month.
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
## v1.7 [2018-06-12] ## v1.7 [2018-06-12]

View File

@@ -100,6 +100,13 @@ func init() {
} }
``` ```
### Input Plugin Development
* Run `make static` followed by `make plugin-[pluginName]` to spin up a docker dev environment
using docker-compose.
* ***[Optional]*** When developing a plugin, add a `dev` directory with a `docker-compose.yml` and `telegraf.conf`
as well as any other supporting files, where sensible.
## Adding Typed Metrics ## Adding Typed Metrics
In addition the the `AddFields` function, the accumulator also supports an In addition the the `AddFields` function, the accumulator also supports an

View File

@@ -92,4 +92,15 @@ docker-image:
plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl
ragel -Z -G2 $^ -o $@ ragel -Z -G2 $^ -o $@
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static:
@echo "Building static linux binary..."
@CGO_ENABLED=0 \
GOOS=linux \
GOARCH=amd64 \
go build -ldflags "$(LDFLAGS)" ./cmd/telegraf
plugin-%:
@echo "Starting dev environment for $${$(@)} input plugin..."
@docker-compose -f plugins/inputs/$${$(@)}/dev/docker-compose.yml up
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static

View File

@@ -723,6 +723,10 @@
# ## Not used with telnet API. # ## Not used with telnet API.
# httpBatchSize = 50 # httpBatchSize = 50
# #
# ## URI Path for Http requests to OpenTSDB.
# ## Used in cases where OpenTSDB is located behind a reverse proxy.
# httpPath = "/api/put"
#
# ## Debug true - Prints OpenTSDB communication # ## Debug true - Prints OpenTSDB communication
# debug = false # debug = false
# #

View File

@@ -246,6 +246,8 @@ func convert(in interface{}) (float64, bool) {
return v, true return v, true
case int64: case int64:
return float64(v), true return float64(v), true
case uint64:
return float64(v), true
default: default:
return 0, false return 0, false
} }

View File

@@ -28,6 +28,7 @@ var m2, _ = metric.New("m1",
"c": float64(4), "c": float64(4),
"d": float64(6), "d": float64(6),
"e": float64(200), "e": float64(200),
"f": uint64(200),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
}, },
@@ -81,6 +82,10 @@ func TestBasicStatsWithPeriod(t *testing.T) {
"e_max": float64(200), "e_max": float64(200),
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -144,6 +149,10 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"e_max": float64(200), "e_max": float64(200),
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
} }
expectedTags = map[string]string{ expectedTags = map[string]string{
"foo": "bar", "foo": "bar",
@@ -169,6 +178,7 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
"c_count": float64(2), "c_count": float64(2),
"d_count": float64(2), "d_count": float64(2),
"e_count": float64(1), "e_count": float64(1),
"f_count": float64(1),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -194,6 +204,7 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
"c_min": float64(2), "c_min": float64(2),
"d_min": float64(2), "d_min": float64(2),
"e_min": float64(200), "e_min": float64(200),
"f_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -219,6 +230,7 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
"c_max": float64(4), "c_max": float64(4),
"d_max": float64(6), "d_max": float64(6),
"e_max": float64(200), "e_max": float64(200),
"f_max": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -244,6 +256,7 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
"c_mean": float64(3), "c_mean": float64(3),
"d_mean": float64(4), "d_mean": float64(4),
"e_mean": float64(200), "e_mean": float64(200),
"f_mean": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -269,6 +282,7 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
"c_sum": float64(6), "c_sum": float64(6),
"d_sum": float64(8), "d_sum": float64(8),
"e_sum": float64(200), "e_sum": float64(200),
"f_sum": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -399,6 +413,8 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
"d_min": float64(2), "d_min": float64(2),
"e_max": float64(200), //e "e_max": float64(200), //e
"e_min": float64(200), "e_min": float64(200),
"f_max": float64(200), //f
"f_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -450,6 +466,11 @@ func TestBasicStatsWithAllStats(t *testing.T) {
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"e_sum": float64(200), "e_sum": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"f_sum": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",

View File

@@ -107,6 +107,8 @@ func convert(in interface{}) (float64, bool) {
return v, true return v, true
case int64: case int64:
return float64(v), true return float64(v), true
case uint64:
return float64(v), true
default: default:
return 0, false return 0, false
} }

View File

@@ -38,6 +38,7 @@ var m2, _ = metric.New("m1",
"i": float64(1), "i": float64(1),
"j": float64(1), "j": float64(1),
"k": float64(200), "k": float64(200),
"l": uint64(200),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
}, },
@@ -85,6 +86,8 @@ func TestMinMaxWithPeriod(t *testing.T) {
"j_min": float64(1), "j_min": float64(1),
"k_max": float64(200), "k_max": float64(200),
"k_min": float64(200), "k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -154,6 +157,8 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
"j_min": float64(1), "j_min": float64(1),
"k_max": float64(200), "k_max": float64(200),
"k_min": float64(200), "k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
} }
expectedTags = map[string]string{ expectedTags = map[string]string{
"foo": "bar", "foo": "bar",

View File

@@ -24,11 +24,14 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
- fibaro - fibaro
- tags: - tags:
- deviceId (device id)
- section (section name) - section (section name)
- room (room name) - room (room name)
- name (device name) - name (device name)
- type (device type) - type (device type)
- fields: - fields:
- energy (float, when available from device)
- power (float, when available from device)
- value (float) - value (float)
- value2 (float, when available from device) - value2 (float, when available from device)
@@ -36,16 +39,17 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
### Example Output: ### Example Output:
``` ```
fibaro,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1523351010000000000 fibaro,deviceId=9,host=vm1,name=Fenêtre\ haute,room=Cuisine,section=Cuisine,type=com.fibaro.FGRM222 energy=2.04,power=0.7,value=99,value2=99 1529996807000000000
fibaro,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 value=99,value2=99 1523351010000000000 fibaro,deviceId=10,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1529996807000000000
fibaro,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1523351010000000000 fibaro,deviceId=13,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 energy=4.33,power=0.7,value=99,value2=99 1529996807000000000
fibaro,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1523351010000000000 fibaro,deviceId=21,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1529996807000000000
fibaro,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=17.87 1523351010000000000 fibaro,deviceId=90,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1529996807000000000
fibaro,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1523351010000000000 fibaro,deviceId=163,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=21.62 1529996807000000000
fibaro,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=92 1523351010000000000 fibaro,deviceId=191,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1529996807000000000
fibaro,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1523351010000000000 fibaro,deviceId=193,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=195 1529996807000000000
fibaro,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=880 1523351010000000000 fibaro,deviceId=200,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1529996807000000000
fibaro,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=53 1523351010000000000 fibaro,deviceId=220,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=536 1529996807000000000
fibaro,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1006.9 1523351010000000000 fibaro,deviceId=221,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=61 1529996807000000000
fibaro,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=58 1523351010000000000 fibaro,deviceId=222,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1013.7 1529996807000000000
fibaro,deviceId=223,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=44 1529996807000000000
``` ```

View File

@@ -67,6 +67,8 @@ type Devices struct {
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
Properties struct { Properties struct {
Dead interface{} `json:"dead"` Dead interface{} `json:"dead"`
Energy interface{} `json:"energy"`
Power interface{} `json:"power"`
Value interface{} `json:"value"` Value interface{} `json:"value"`
Value2 interface{} `json:"value2"` Value2 interface{} `json:"value2"`
} `json:"properties"` } `json:"properties"`
@@ -162,13 +164,26 @@ func (f *Fibaro) Gather(acc telegraf.Accumulator) error {
} }
tags := map[string]string{ tags := map[string]string{
"section": sections[rooms[device.RoomID].SectionID], "deviceId": strconv.FormatUint(uint64(device.ID), 10),
"room": rooms[device.RoomID].Name, "section": sections[rooms[device.RoomID].SectionID],
"name": device.Name, "room": rooms[device.RoomID].Name,
"type": device.Type, "name": device.Name,
"type": device.Type,
} }
fields := make(map[string]interface{}) fields := make(map[string]interface{})
if device.Properties.Energy != nil {
if fValue, err := strconv.ParseFloat(device.Properties.Energy.(string), 64); err == nil {
fields["energy"] = fValue
}
}
if device.Properties.Power != nil {
if fValue, err := strconv.ParseFloat(device.Properties.Power.(string), 64); err == nil {
fields["power"] = fValue
}
}
if device.Properties.Value != nil { if device.Properties.Value != nil {
value := device.Properties.Value value := device.Properties.Value
switch value { switch value {

View File

@@ -119,6 +119,8 @@ const devicesJSON = `
"type": "com.fibaro.FGRM222", "type": "com.fibaro.FGRM222",
"enabled": true, "enabled": true,
"properties": { "properties": {
"energy": "4.33",
"power": "0.7",
"dead": "false", "dead": "false",
"value": "50", "value": "50",
"value2": "75" "value2": "75"
@@ -178,27 +180,27 @@ func TestJSONSuccess(t *testing.T) {
assert.Equal(t, uint64(5), acc.NMetrics()) assert.Equal(t, uint64(5), acc.NMetrics())
// Ensure fields / values are correct - Device 1 // Ensure fields / values are correct - Device 1
tags := map[string]string{"section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"} tags := map[string]string{"deviceId": "1", "section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}
fields := map[string]interface{}{"value": float64(0)} fields := map[string]interface{}{"value": float64(0)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 2 // Ensure fields / values are correct - Device 2
tags = map[string]string{"section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"} tags = map[string]string{"deviceId": "2", "section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"}
fields = map[string]interface{}{"value": float64(1)} fields = map[string]interface{}{"value": float64(1)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 3 // Ensure fields / values are correct - Device 3
tags = map[string]string{"section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"} tags = map[string]string{"deviceId": "3", "section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"}
fields = map[string]interface{}{"value": float64(67)} fields = map[string]interface{}{"value": float64(67)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 4 // Ensure fields / values are correct - Device 4
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"} tags = map[string]string{"deviceId": "4", "section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"}
fields = map[string]interface{}{"value": float64(22.8)} fields = map[string]interface{}{"value": float64(22.8)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 5 // Ensure fields / values are correct - Device 5
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"} tags = map[string]string{"deviceId": "5", "section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"}
fields = map[string]interface{}{"value": float64(50), "value2": float64(75)} fields = map[string]interface{}{"energy": float64(4.33), "power": float64(0.7), "value": float64(50), "value2": float64(75)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
} }

View File

@@ -0,0 +1,13 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./test.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -0,0 +1,12 @@
[agent]
interval="1s"
flush_interval="1s"
[[inputs.logparser]]
files = ["/var/log/test.log"]
from_beginning = true
[inputs.logparser.grok]
patterns = [ "%{COMBINED_LOG_FORMAT}", "%{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{TIMESTAMP_ISO8601:timestamp}\\] \"(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})\" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) %{QS:referrer} %{QS:agent}"]
[[outputs.file]]
files = ["stdout"]

View File

@@ -0,0 +1,2 @@
127.0.0.1 ident auth [10/Oct/2000:13:55:36 -0700] "GET /anything HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"
127.0.0.1 ident auth [2018-02-21 13:10:34,555] "GET /peter HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"

View File

@@ -293,7 +293,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
timestamp = time.Unix(0, iv) timestamp = time.Unix(0, iv)
} }
case SYSLOG_TIMESTAMP: case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc) ts, err := time.ParseInLocation(time.Stamp, v, p.loc)
if err == nil { if err == nil {
if ts.Year() == 0 { if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0) ts = ts.AddDate(timestamp.Year(), 0, 0)

View File

@@ -971,16 +971,41 @@ func TestNewlineInPatterns(t *testing.T) {
require.NotNil(t, m) require.NotNil(t, m)
} }
func TestSyslogTimestampParser(t *testing.T) { func TestSyslogTimestamp(t *testing.T) {
p := &Parser{ tests := []struct {
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`}, name string
timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) }, line string
expected time.Time
}{
{
name: "two digit day of month",
line: "Sep 25 09:01:55 value=42",
expected: time.Date(2018, time.September, 25, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month single space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month double space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2017, time.April, 1, 0, 0, 0, 0, time.UTC) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine(tt.line)
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, tt.expected, m.Time())
})
} }
require.NoError(t, p.Compile())
m, err := p.ParseLine("Sep 25 09:01:55 value=42")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
} }
func TestReplaceTimestampComma(t *testing.T) { func TestReplaceTimestampComma(t *testing.T) {

View File

@@ -203,6 +203,10 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
Poll: poll, Poll: poll,
Logger: tail.DiscardingLogger, Logger: tail.DiscardingLogger,
}) })
//add message saying a new tailer was added for the file
log.Printf("D! tail added for file: %v", file)
if err != nil { if err != nil {
l.acc.AddError(err) l.acc.AddError(err)
continue continue
@@ -287,6 +291,10 @@ func (l *LogParserPlugin) Stop() {
for _, t := range l.tailers { for _, t := range l.tailers {
err := t.Stop() err := t.Stop()
//message for a stopped tailer
log.Printf("D! tail dropped for file: %v", t.Filename)
if err != nil { if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename) log.Printf("E! Error stopping tail on file %s\n", t.Filename)
} }

View File

@@ -0,0 +1,16 @@
version: '3'
services:
mongodb:
image: mongo
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
depends_on:
- mongodb
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -0,0 +1,9 @@
[agent]
interval="1s"
flush_interval="3s"
[[inputs.mongodb]]
servers = ["mongodb://mongodb:27017"]
[[outputs.file]]
files = ["stdout"]

View File

@@ -0,0 +1,42 @@
version: '3'
services:
mysql:
image: mysql:5.7
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
maria:
image: mariadb
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
command: mysqld --userstat=1
percona:
image: percona
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
telegraf:
image: glinton/scratch
depends_on:
- mysql
- maria
- percona
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -0,0 +1,61 @@
# Uncomment each input as needed to test plugin
## mysql
#[[inputs.mysql]]
# servers = ["root:telegraf@tcp(mysql:3306)/"]
# gather_table_schema = true
# gather_process_list = true
# gather_user_statistics = true
# gather_info_schema_auto_inc = true
# gather_innodb_metrics = true
# gather_slave_status = true
# gather_binary_logs = false
# gather_table_io_waits = true
# gather_table_lock_waits = true
# gather_index_io_waits = true
# gather_event_waits = true
# gather_file_events_stats = true
# gather_perf_events_statements = true
# interval_slow = "30m"
# table_schema_databases = []
#
## mariadb
#[[inputs.mysql]]
# servers = ["root:telegraf@tcp(maria:3306)/"]
# gather_table_schema = true
# gather_process_list = true
# gather_user_statistics = true
# gather_info_schema_auto_inc = true
# gather_innodb_metrics = true
# gather_slave_status = true
# gather_binary_logs = false
# gather_table_io_waits = true
# gather_table_lock_waits = true
# gather_index_io_waits = true
# gather_event_waits = true
# gather_file_events_stats = true
# gather_perf_events_statements = true
# interval_slow = "30m"
# table_schema_databases = []
# percona
[[inputs.mysql]]
servers = ["root:telegraf@tcp(percona:3306)/"]
gather_table_schema = true
gather_process_list = true
gather_user_statistics = true
gather_info_schema_auto_inc = true
gather_innodb_metrics = true
gather_slave_status = true
gather_binary_logs = false
gather_table_io_waits = true
gather_table_lock_waits = true
gather_index_io_waits = true
gather_event_waits = true
gather_file_events_stats = true
gather_perf_events_statements = true
interval_slow = "30m"
table_schema_databases = []
[[outputs.file]]
files = ["stdout"]

View File

@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"database/sql" "database/sql"
"fmt" "fmt"
"log"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -80,7 +79,7 @@ var sampleConfig = `
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true gather_process_list = true
# #
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS ## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true gather_user_statistics = true
# #
## gather auto_increment columns and max values from information schema ## gather auto_increment columns and max values from information schema
@@ -282,9 +281,8 @@ const (
GROUP BY command,state GROUP BY command,state
ORDER BY null` ORDER BY null`
infoSchemaUserStatisticsQuery = ` infoSchemaUserStatisticsQuery = `
SELECT *,count(*) SELECT *
FROM information_schema.user_statistics FROM information_schema.user_statistics`
GROUP BY user`
infoSchemaAutoIncQuery = ` infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment, SELECT table_schema, table_name, column_name, auto_increment,
CAST(pow(2, case data_type CAST(pow(2, case data_type
@@ -761,103 +759,6 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
if len(fields) > 0 { if len(fields) > 0 {
acc.AddFields("mysql", fields, tags) acc.AddFields("mysql", fields, tags)
} }
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering process list: %s", err)
} else {
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
if err != nil {
return err
}
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
}
}
// gather connection metrics from user_statistics for each user
if m.GatherUserStatistics {
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering user stats: %s", err)
} else {
for conn_rows.Next() {
var user string
var total_connections int64
var concurrent_connections int64
var connected_time int64
var busy_time int64
var cpu_time int64
var bytes_received int64
var bytes_sent int64
var binlog_bytes_written int64
var rows_fetched int64
var rows_updated int64
var table_rows_read int64
var select_commands int64
var update_commands int64
var other_commands int64
var commit_transactions int64
var rollback_transactions int64
var denied_connections int64
var lost_connections int64
var access_denied int64
var empty_queries int64
var total_ssl_connections int64
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
}
}
return nil return nil
} }
@@ -908,6 +809,29 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
} else { } else {
acc.AddFields("mysql_process_list", fields, tags) acc.AddFields("mysql_process_list", fields, tags)
} }
// get count of connections from each user
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
return err
}
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
return nil return nil
} }
@@ -917,77 +841,190 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
// run query // run query
rows, err := db.Query(infoSchemaUserStatisticsQuery) rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil { if err != nil {
// disable collecting if table is not found (mysql specific error)
// (suppresses repeat errors)
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
m.GatherUserStatistics = false
}
return err return err
} }
defer rows.Close() defer rows.Close()
var (
user string cols, err := columnsToLower(rows.Columns())
total_connections int64 if err != nil {
concurrent_connections int64 return err
connected_time int64 }
busy_time int64
cpu_time int64 read, err := getColSlice(len(cols))
bytes_received int64 if err != nil {
bytes_sent int64 return err
binlog_bytes_written int64 }
rows_fetched int64
rows_updated int64
table_rows_read int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
count uint32
)
servtag := getDSNTag(serv) servtag := getDSNTag(serv)
for rows.Next() { for rows.Next() {
err = rows.Scan(&user, &total_connections, &concurrent_connections, err = rows.Scan(read...)
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections, &count,
)
if err != nil { if err != nil {
return err return err
} }
tags := map[string]string{"server": servtag, "user": user} tags := map[string]string{"server": servtag, "user": *read[0].(*string)}
fields := map[string]interface{}{ fields := map[string]interface{}{}
"total_connections": total_connections, for i := range cols {
"concurrent_connections": concurrent_connections, if i == 0 {
"connected_time": connected_time, continue // skip "user"
"busy_time": busy_time, }
"cpu_time": cpu_time, switch v := read[i].(type) {
"bytes_received": bytes_received, case *int64:
"bytes_sent": bytes_sent, fields[cols[i]] = *v
"binlog_bytes_written": binlog_bytes_written, case *float64:
"rows_fetched": rows_fetched, fields[cols[i]] = *v
"rows_updated": rows_updated, case *string:
"table_rows_read": table_rows_read, fields[cols[i]] = *v
"select_commands": select_commands, default:
"update_commands": update_commands, return fmt.Errorf("Unknown column type - %T", v)
"other_commands": other_commands, }
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
} }
acc.AddFields("mysql_user_stats", fields, tags) acc.AddFields("mysql_user_stats", fields, tags)
} }
return nil return nil
} }
// columnsToLower converts selected column names to lowercase.
func columnsToLower(s []string, e error) ([]string, error) {
if e != nil {
return nil, e
}
d := make([]string, len(s))
for i := range s {
d[i] = strings.ToLower(s[i])
}
return d, nil
}
// getColSlice returns an in interface slice that can be used in the row.Scan().
func getColSlice(l int) ([]interface{}, error) {
// list of all possible column names
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_read int64
rows_sent int64
rows_deleted int64
rows_inserted int64
rows_updated int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
max_statement_time_exceeded int64
// maria specific
fbusy_time float64
fcpu_time float64
// percona specific
rows_fetched int64
table_rows_read int64
)
switch l {
case 23: // maria5
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
}, nil
case 25: // maria10
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
&max_statement_time_exceeded,
}, nil
case 22: // percona
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&busy_time,
&cpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_fetched,
&rows_updated,
&table_rows_read,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
}, nil
}
return nil, fmt.Errorf("Not Supported - %d columns", l)
}
// gatherPerfTableIOWaits can be used to get total count and time // gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process // of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {

View File

@@ -1,6 +1,7 @@
package syslog package syslog
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io" "io"
@@ -279,20 +280,51 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
conn.Close() conn.Close()
}() }()
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { for {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) data := &bytes.Buffer{}
}
var p *rfc5425.Parser // read the data
if s.BestEffort { if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort()) conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
} else { }
p = rfc5425.NewParser(conn)
}
p.ParseExecuting(func(r *rfc5425.Result) { n, err := io.Copy(data, conn)
s.store(*r, acc) if err != nil {
}) // read timeout reached, parse what we have
if er, ok := err.(net.Error); ok && er.Timeout() {
if n == 0 {
continue
}
goto parseMsg
}
// client has closed connection, return
if err == io.EOF {
if n > 0 {
goto parseMsg
}
return
}
// other error, log and return
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
return
}
// handle client disconnect
if n == 0 {
return
}
parseMsg:
var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(data, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(data)
}
p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
}
} }
func (s *Syslog) setKeepAlive(c *net.TCPConn) error { func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
@@ -361,7 +393,7 @@ func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
} }
if msg.Message() != nil { if msg.Message() != nil {
flds["message"] = *msg.Message() flds["message"] = strings.TrimSpace(*msg.Message())
} }
if msg.StructuredData() != nil { if msg.StructuredData() != nil {

View File

@@ -0,0 +1,30 @@
# Swap Input Plugin
The swap plugin collects system swap metrics.
For a more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
### Configuration:
```toml
# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
```
### Metrics:
- swap
- fields:
- free (int)
- total (int)
- used (int)
- used_percent (float)
- in (int)
- out (int)
### Example Output:
```
swap total=20855394304i,used_percent=45.43883523785713,used=9476448256i,free=1715331072i 1511894782000000000
```

View File

@@ -42,45 +42,9 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() { func init() {
ps := newSystemPS() ps := newSystemPS()
inputs.Add("mem", func() telegraf.Input { inputs.Add("mem", func() telegraf.Input {
return &MemStats{ps: ps} return &MemStats{ps: ps}
}) })
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
} }

View File

@@ -30,17 +30,6 @@ func TestMemStats(t *testing.T) {
mps.On("VMStat").Return(vms, nil) mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc) err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@@ -61,15 +50,4 @@ func TestMemStats(t *testing.T) {
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string)) acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))
acc.Metrics = nil acc.Metrics = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
} }

View File

@@ -0,0 +1,47 @@
package system
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -0,0 +1,38 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/require"
)
func TestSwapStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -72,6 +72,15 @@ It is recommended NOT to use this on OSes starting with Vista and newer because
Example for Windows Server 2003, this would be set to true: Example for Windows Server 2003, this would be set to true:
`PreVistaSupport=true` `PreVistaSupport=true`
#### UsePerfCounterTime
Bool, if set to `true` will request a timestamp along with the PerfCounter data.
If se to `false`, current time will be used.
Supported on Windows Vista/Windows Server 2008 and newer
Example:
`UsePerfCounterTime=true`
### Object ### Object
See Entry below. See Entry below.

View File

@@ -0,0 +1,73 @@
// Copyright (c) 2010 The win Authors. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// 3. The names of the authors may not be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
// THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// This is the official list of 'win' authors for copyright purposes.
//
// Alexander Neumann <an2048@googlemail.com>
// Joseph Watson <jtwatson@linux-consulting.us>
// Kevin Pors <krpors@gmail.com>
// +build windows
package win_perf_counters
import (
"syscall"
)
type SYSTEMTIME struct {
wYear uint16
wMonth uint16
wDayOfWeek uint16
wDay uint16
wHour uint16
wMinute uint16
wSecond uint16
wMilliseconds uint16
}
type FILETIME struct {
dwLowDateTime uint32
dwHighDateTime uint32
}
var (
// Library
libkrnDll *syscall.DLL
// Functions
krn_FileTimeToSystemTime *syscall.Proc
krn_FileTimeToLocalFileTime *syscall.Proc
krn_LocalFileTimeToFileTime *syscall.Proc
krn_WideCharToMultiByte *syscall.Proc
)
func init() {
libkrnDll = syscall.MustLoadDLL("Kernel32.dll")
krn_FileTimeToSystemTime = libkrnDll.MustFindProc("FileTimeToSystemTime")
krn_FileTimeToLocalFileTime = libkrnDll.MustFindProc("FileTimeToLocalFileTime")
krn_LocalFileTimeToFileTime = libkrnDll.MustFindProc("LocalFileTimeToFileTime")
krn_WideCharToMultiByte = libkrnDll.MustFindProc("WideCharToMultiByte")
}

View File

@@ -38,12 +38,15 @@ import (
"unsafe" "unsafe"
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
"time"
) )
// Error codes // Error codes
const ( const (
ERROR_SUCCESS = 0 ERROR_SUCCESS = 0
ERROR_INVALID_FUNCTION = 1 ERROR_FAILURE = 1
ERROR_INVALID_FUNCTION = 1
EPOCH_DIFFERENCE_MICROS int64 = 11644473600000000
) )
type ( type (
@@ -170,6 +173,7 @@ var (
pdh_AddEnglishCounterW *syscall.Proc pdh_AddEnglishCounterW *syscall.Proc
pdh_CloseQuery *syscall.Proc pdh_CloseQuery *syscall.Proc
pdh_CollectQueryData *syscall.Proc pdh_CollectQueryData *syscall.Proc
pdh_CollectQueryDataWithTime *syscall.Proc
pdh_GetFormattedCounterValue *syscall.Proc pdh_GetFormattedCounterValue *syscall.Proc
pdh_GetFormattedCounterArrayW *syscall.Proc pdh_GetFormattedCounterArrayW *syscall.Proc
pdh_OpenQuery *syscall.Proc pdh_OpenQuery *syscall.Proc
@@ -187,6 +191,7 @@ func init() {
pdh_AddEnglishCounterW, _ = libpdhDll.FindProc("PdhAddEnglishCounterW") // XXX: only supported on versions > Vista. pdh_AddEnglishCounterW, _ = libpdhDll.FindProc("PdhAddEnglishCounterW") // XXX: only supported on versions > Vista.
pdh_CloseQuery = libpdhDll.MustFindProc("PdhCloseQuery") pdh_CloseQuery = libpdhDll.MustFindProc("PdhCloseQuery")
pdh_CollectQueryData = libpdhDll.MustFindProc("PdhCollectQueryData") pdh_CollectQueryData = libpdhDll.MustFindProc("PdhCollectQueryData")
pdh_CollectQueryDataWithTime, _ = libpdhDll.FindProc("PdhCollectQueryDataWithTime")
pdh_GetFormattedCounterValue = libpdhDll.MustFindProc("PdhGetFormattedCounterValue") pdh_GetFormattedCounterValue = libpdhDll.MustFindProc("PdhGetFormattedCounterValue")
pdh_GetFormattedCounterArrayW = libpdhDll.MustFindProc("PdhGetFormattedCounterArrayW") pdh_GetFormattedCounterArrayW = libpdhDll.MustFindProc("PdhGetFormattedCounterArrayW")
pdh_OpenQuery = libpdhDll.MustFindProc("PdhOpenQuery") pdh_OpenQuery = libpdhDll.MustFindProc("PdhOpenQuery")
@@ -303,6 +308,37 @@ func PdhCollectQueryData(hQuery PDH_HQUERY) uint32 {
return uint32(ret) return uint32(ret)
} }
// PdhCollectQueryDataWithTime queries data from perfmon, retrieving the device/windows timestamp from the node it was collected on.
// Converts the filetime structure to a GO time class and returns the native time.
//
func PdhCollectQueryDataWithTime(hQuery PDH_HQUERY) (uint32, time.Time) {
var localFileTime FILETIME
ret, _, _ := pdh_CollectQueryDataWithTime.Call(uintptr(hQuery), uintptr(unsafe.Pointer(&localFileTime)))
if ret == ERROR_SUCCESS {
var utcFileTime FILETIME
ret, _, _ := krn_LocalFileTimeToFileTime.Call(
uintptr(unsafe.Pointer(&localFileTime)),
uintptr(unsafe.Pointer(&utcFileTime)))
if ret == 0 {
return uint32(ERROR_FAILURE), time.Now()
}
// First convert 100-ns intervals to microseconds, then adjust for the
// epoch difference
var totalMicroSeconds int64
totalMicroSeconds = ((int64(utcFileTime.dwHighDateTime) << 32) | int64(utcFileTime.dwLowDateTime)) / 10
totalMicroSeconds -= EPOCH_DIFFERENCE_MICROS
retTime := time.Unix(0, totalMicroSeconds*1000)
return uint32(ERROR_SUCCESS), retTime
}
return uint32(ret), time.Now()
}
// PdhGetFormattedCounterValueDouble formats the given hCounter using a 'double'. The result is set into the specialized union struct pValue. // PdhGetFormattedCounterValueDouble formats the given hCounter using a 'double'. The result is set into the specialized union struct pValue.
// This function does not directly translate to a Windows counterpart due to union specialization tricks. // This function does not directly translate to a Windows counterpart due to union specialization tricks.
func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32, pValue *PDH_FMT_COUNTERVALUE_DOUBLE) uint32 { func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32, pValue *PDH_FMT_COUNTERVALUE_DOUBLE) uint32 {

View File

@@ -6,6 +6,7 @@ package win_perf_counters
import ( import (
"errors" "errors"
"syscall" "syscall"
"time"
"unsafe" "unsafe"
) )
@@ -26,7 +27,8 @@ type PerformanceQuery interface {
GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error)
GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error)
CollectData() error CollectData() error
AddEnglishCounterSupported() bool CollectDataWithTime() (time.Time, error)
IsVistaOrNewer() bool
} }
//PdhError represents error returned from Performance Counters API //PdhError represents error returned from Performance Counters API
@@ -61,8 +63,8 @@ func (m *PerformanceQueryImpl) Open() error {
} }
} }
var handle PDH_HQUERY var handle PDH_HQUERY
ret := PdhOpenQuery(0, 0, &handle)
if ret != ERROR_SUCCESS { if ret := PdhOpenQuery(0, 0, &handle); ret != ERROR_SUCCESS {
return NewPdhError(ret) return NewPdhError(ret)
} }
m.query = handle m.query = handle
@@ -74,8 +76,8 @@ func (m *PerformanceQueryImpl) Close() error {
if m.query == 0 { if m.query == 0 {
return errors.New("uninitialised query") return errors.New("uninitialised query")
} }
ret := PdhCloseQuery(m.query)
if ret != ERROR_SUCCESS { if ret := PdhCloseQuery(m.query); ret != ERROR_SUCCESS {
return NewPdhError(ret) return NewPdhError(ret)
} }
m.query = 0 m.query = 0
@@ -87,8 +89,8 @@ func (m *PerformanceQueryImpl) AddCounterToQuery(counterPath string) (PDH_HCOUNT
if m.query == 0 { if m.query == 0 {
return 0, errors.New("uninitialised query") return 0, errors.New("uninitialised query")
} }
ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle)
if ret != ERROR_SUCCESS { if ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
return 0, NewPdhError(ret) return 0, NewPdhError(ret)
} }
return counterHandle, nil return counterHandle, nil
@@ -99,8 +101,7 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
if m.query == 0 { if m.query == 0 {
return 0, errors.New("uninitialised query") return 0, errors.New("uninitialised query")
} }
ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle) if ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
if ret != ERROR_SUCCESS {
return 0, NewPdhError(ret) return 0, NewPdhError(ret)
} }
return counterHandle, nil return counterHandle, nil
@@ -110,13 +111,11 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) { func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) {
var bufSize uint32 var bufSize uint32
var buff []byte var buff []byte
var ret uint32
ret := PdhGetCounterInfo(counterHandle, 0, &bufSize, nil) if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, nil); ret == PDH_MORE_DATA {
if ret == PDH_MORE_DATA {
buff = make([]byte, bufSize) buff = make([]byte, bufSize)
bufSize = uint32(len(buff)) bufSize = uint32(len(buff))
ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0]) if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0]); ret == ERROR_SUCCESS {
if ret == ERROR_SUCCESS {
ci := (*PDH_COUNTER_INFO)(unsafe.Pointer(&buff[0])) ci := (*PDH_COUNTER_INFO)(unsafe.Pointer(&buff[0]))
return UTF16PtrToString(ci.SzFullPath), nil return UTF16PtrToString(ci.SzFullPath), nil
} }
@@ -128,9 +127,9 @@ func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (strin
func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string, error) { func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string, error) {
var bufSize uint32 var bufSize uint32
var buff []uint16 var buff []uint16
var ret uint32
ret := PdhExpandWildCardPath(counterPath, nil, &bufSize) if ret = PdhExpandWildCardPath(counterPath, nil, &bufSize); ret == PDH_MORE_DATA {
if ret == PDH_MORE_DATA {
buff = make([]uint16, bufSize) buff = make([]uint16, bufSize)
bufSize = uint32(len(buff)) bufSize = uint32(len(buff))
ret = PdhExpandWildCardPath(counterPath, &buff[0], &bufSize) ret = PdhExpandWildCardPath(counterPath, &buff[0], &bufSize)
@@ -146,8 +145,9 @@ func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string,
func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) { func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) {
var counterType uint32 var counterType uint32
var value PDH_FMT_COUNTERVALUE_DOUBLE var value PDH_FMT_COUNTERVALUE_DOUBLE
ret := PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value) var ret uint32
if ret == ERROR_SUCCESS {
if ret = PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value); ret == ERROR_SUCCESS {
if value.CStatus == PDH_CSTATUS_VALID_DATA || value.CStatus == PDH_CSTATUS_NEW_DATA { if value.CStatus == PDH_CSTATUS_VALID_DATA || value.CStatus == PDH_CSTATUS_NEW_DATA {
return value.DoubleValue, nil return value.DoubleValue, nil
} else { } else {
@@ -161,11 +161,12 @@ func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUN
func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) { func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
var buffSize uint32 var buffSize uint32
var itemCount uint32 var itemCount uint32
ret := PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil) var ret uint32
if ret == PDH_MORE_DATA {
if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil); ret == PDH_MORE_DATA {
buff := make([]byte, buffSize) buff := make([]byte, buffSize)
ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0])
if ret == ERROR_SUCCESS { if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0]); ret == ERROR_SUCCESS {
items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount] items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount]
values := make([]CounterValue, 0, itemCount) values := make([]CounterValue, 0, itemCount)
for _, item := range items { for _, item := range items {
@@ -181,17 +182,29 @@ func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
} }
func (m *PerformanceQueryImpl) CollectData() error { func (m *PerformanceQueryImpl) CollectData() error {
var ret uint32
if m.query == 0 { if m.query == 0 {
return errors.New("uninitialised query") return errors.New("uninitialised query")
} }
ret := PdhCollectQueryData(m.query)
if ret != ERROR_SUCCESS { if ret = PdhCollectQueryData(m.query); ret != ERROR_SUCCESS {
return NewPdhError(ret) return NewPdhError(ret)
} }
return nil return nil
} }
func (m *PerformanceQueryImpl) AddEnglishCounterSupported() bool { func (m *PerformanceQueryImpl) CollectDataWithTime() (time.Time, error) {
if m.query == 0 {
return time.Now(), errors.New("uninitialised query")
}
ret, mtime := PdhCollectQueryDataWithTime(m.query)
if ret != ERROR_SUCCESS {
return time.Now(), NewPdhError(ret)
}
return mtime, nil
}
func (m *PerformanceQueryImpl) IsVistaOrNewer() bool {
return PdhAddEnglishCounterSupported() return PdhAddEnglishCounterSupported()
} }

View File

@@ -23,6 +23,8 @@ var sampleConfig = `
## agent, it will not be gathered. ## agent, it will not be gathered.
## Settings: ## Settings:
# PrintValid = false # Print All matching performance counters # PrintValid = false # Print All matching performance counters
# Whether request a timestamp along with the PerfCounter data or just use current time
# UsePerfCounterTime=true
# If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded # If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded
# and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names. # and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names.
# If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names. # If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names.
@@ -78,6 +80,7 @@ type Win_PerfCounters struct {
PrintValid bool PrintValid bool
//deprecated: determined dynamically //deprecated: determined dynamically
PreVistaSupport bool PreVistaSupport bool
UsePerfCounterTime bool
Object []perfobject Object []perfobject
CountersRefreshInterval internal.Duration CountersRefreshInterval internal.Duration
UseWildcardsExpansion bool UseWildcardsExpansion bool
@@ -107,6 +110,12 @@ type counter struct {
counterHandle PDH_HCOUNTER counterHandle PDH_HCOUNTER
} }
type instanceGrouping struct {
name string
instance string
objectname string
}
var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec",
" ", "_", "%", "Percent", `\`, "") " ", "_", "%", "Percent", `\`, "")
@@ -147,7 +156,7 @@ func (m *Win_PerfCounters) SampleConfig() string {
func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error { func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error {
var err error var err error
var counterHandle PDH_HCOUNTER var counterHandle PDH_HCOUNTER
if !m.query.AddEnglishCounterSupported() { if !m.query.IsVistaOrNewer() {
counterHandle, err = m.query.AddCounterToQuery(counterPath) counterHandle, err = m.query.AddCounterToQuery(counterPath)
if err != nil { if err != nil {
return err return err
@@ -249,18 +258,15 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
m.counters = m.counters[:0] m.counters = m.counters[:0]
} }
err = m.query.Open() if err = m.query.Open(); err != nil {
if err != nil {
return err return err
} }
err = m.ParseConfig() if err = m.ParseConfig(); err != nil {
if err != nil {
return err return err
} }
//some counters need two data samples before computing a value //some counters need two data samples before computing a value
err = m.query.CollectData() if err = m.query.CollectData(); err != nil {
if err != nil {
return err return err
} }
m.lastRefreshed = time.Now() m.lastRefreshed = time.Now()
@@ -268,37 +274,31 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
type InstanceGrouping struct { var collectFields = make(map[instanceGrouping]map[string]interface{})
name string
instance string var timestamp time.Time
objectname string if m.UsePerfCounterTime && m.query.IsVistaOrNewer() {
timestamp, err = m.query.CollectDataWithTime()
if err != nil {
return err
}
} else {
timestamp = time.Now()
if err = m.query.CollectData(); err != nil {
return err
}
} }
var collectFields = make(map[InstanceGrouping]map[string]interface{})
err = m.query.CollectData()
if err != nil {
return err
}
// For iterate over the known metrics and get the samples. // For iterate over the known metrics and get the samples.
for _, metric := range m.counters { for _, metric := range m.counters {
// collect // collect
if m.UseWildcardsExpansion { if m.UseWildcardsExpansion {
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle) value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
if err == nil { if err == nil {
measurement := sanitizedChars.Replace(metric.measurement) addCounterMeasurement(metric, metric.instance, value, collectFields)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else { } else {
//ignore invalid data from as some counters from process instances returns this sometimes //ignore invalid data as some counters from process instances returns this sometimes
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE { if !isKnownCounterDataError(err) {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err) return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
} }
} }
@@ -326,18 +326,14 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
} }
if add { if add {
measurement := sanitizedChars.Replace(metric.measurement) addCounterMeasurement(metric, cValue.InstanceName, cValue.Value, collectFields)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, cValue.InstanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(cValue.Value)
} }
} }
} else {
//ignore invalid data as some counters from process instances returns this sometimes
if !isKnownCounterDataError(err) {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
} }
} }
} }
@@ -349,12 +345,33 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
if len(instance.instance) > 0 { if len(instance.instance) > 0 {
tags["instance"] = instance.instance tags["instance"] = instance.instance
} }
acc.AddFields(instance.name, fields, tags) acc.AddFields(instance.name, fields, tags, timestamp)
} }
return nil return nil
} }
func addCounterMeasurement(metric *counter, instanceName string, value float64, collectFields map[instanceGrouping]map[string]interface{}) {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = instanceGrouping{measurement, instanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
}
func isKnownCounterDataError(err error) bool {
if pdhErr, ok := err.(*PdhError); ok && (pdhErr.ErrorCode == PDH_INVALID_DATA ||
pdhErr.ErrorCode == PDH_CALC_NEGATIVE_VALUE ||
pdhErr.ErrorCode == PDH_CSTATUS_INVALID_DATA) {
return true
}
return false
}
func init() { func init() {
inputs.Add("win_perf_counters", func() telegraf.Input { inputs.Add("win_perf_counters", func() telegraf.Input {
return &Win_PerfCounters{query: &PerformanceQueryImpl{}, CountersRefreshInterval: internal.Duration{Duration: time.Second * 60}} return &Win_PerfCounters{query: &PerformanceQueryImpl{}, CountersRefreshInterval: internal.Duration{Duration: time.Second * 60}}

View File

@@ -70,6 +70,11 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
_, err = query.GetFormattedCounterValueDouble(hCounter) _, err = query.GetFormattedCounterValueDouble(hCounter)
require.NoError(t, err) require.NoError(t, err)
now := time.Now()
mtime, err := query.CollectDataWithTime()
require.NoError(t, err)
assert.True(t, mtime.Sub(now) < time.Second)
counterPath = "\\Process(*)\\% Processor Time" counterPath = "\\Process(*)\\% Processor Time"
paths, err := query.ExpandWildCardPath(counterPath) paths, err := query.ExpandWildCardPath(counterPath)
require.NoError(t, err) require.NoError(t, err)
@@ -98,6 +103,10 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
arr, err := query.GetFormattedCounterArrayDouble(hCounter) arr, err := query.GetFormattedCounterArrayDouble(hCounter)
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
time.Sleep(time.Second)
arr, err = query.GetFormattedCounterArrayDouble(hCounter)
}
require.NoError(t, err) require.NoError(t, err)
assert.True(t, len(arr) > 0, "Too") assert.True(t, len(arr) > 0, "Too")
@@ -596,7 +605,7 @@ func TestWinPerfcountersCollect2(t *testing.T) {
perfobjects[0] = PerfObject perfobjects[0] = PerfObject
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true} m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
var acc testutil.Accumulator var acc testutil.Accumulator
err := m.Gather(&acc) err := m.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -19,12 +19,14 @@ type testCounter struct {
value float64 value float64
} }
type FakePerformanceQuery struct { type FakePerformanceQuery struct {
counters map[string]testCounter counters map[string]testCounter
addEnglishSupported bool vistaAndNewer bool
expandPaths map[string][]string expandPaths map[string][]string
openCalled bool openCalled bool
} }
var MetricTime = time.Date(2018, 5, 28, 12, 0, 0, 0, time.UTC)
func (m *testCounter) ToCounterValue() *CounterValue { func (m *testCounter) ToCounterValue() *CounterValue {
_, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path) _, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path)
if inst == "" { if inst == "" {
@@ -102,8 +104,10 @@ func (m *FakePerformanceQuery) GetFormattedCounterValueDouble(counterHandle PDH_
} else { } else {
if counter.value == 0 { if counter.value == 0 {
return 0, NewPdhError(PDH_INVALID_DATA) return 0, NewPdhError(PDH_INVALID_DATA)
} else { } else if counter.value == -1 {
return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE) return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
} else {
return 0, NewPdhError(PDH_ACCESS_DENIED)
} }
} }
} }
@@ -138,8 +142,18 @@ func (m *FakePerformanceQuery) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
counters := make([]CounterValue, 0, len(e)) counters := make([]CounterValue, 0, len(e))
for _, p := range e { for _, p := range e {
counter := m.findCounterByPath(p) counter := m.findCounterByPath(p)
if counter != nil && counter.value > 0 { if counter != nil {
counters = append(counters, *counter.ToCounterValue()) if counter.value > 0 {
counters = append(counters, *counter.ToCounterValue())
} else {
if counter.value == 0 {
return nil, NewPdhError(PDH_INVALID_DATA)
} else if counter.value == -1 {
return nil, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
} else {
return nil, NewPdhError(PDH_ACCESS_DENIED)
}
}
} else { } else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p) return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p)
} }
@@ -160,8 +174,15 @@ func (m *FakePerformanceQuery) CollectData() error {
return nil return nil
} }
func (m *FakePerformanceQuery) AddEnglishCounterSupported() bool { func (m *FakePerformanceQuery) CollectDataWithTime() (time.Time, error) {
return m.addEnglishSupported if !m.openCalled {
return time.Now(), errors.New("CollectData: uninitialised query")
}
return MetricTime, nil
}
func (m *FakePerformanceQuery) IsVistaOrNewer() bool {
return m.vistaAndNewer
} }
func createPerfObject(measurement string, object string, instances []string, counters []string, failOnMissing bool, includeTotal bool) []perfobject { func createPerfObject(measurement string, object string, instances []string, counters []string, failOnMissing bool, includeTotal bool) []perfobject {
@@ -198,7 +219,7 @@ func TestAddItemSimple(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cps1[0]: cps1, cps1[0]: cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -216,7 +237,7 @@ func TestAddItemInvalidCountPath(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cps1[0]: {"\\O/C"}, cps1[0]: {"\\O/C"},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -238,7 +259,7 @@ func TestParseConfigBasic(t *testing.T) {
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
cps1[3]: {cps1[3]}, cps1[3]: {cps1[3]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -270,7 +291,7 @@ func TestParseConfigNoInstance(t *testing.T) {
cps1[0]: {cps1[0]}, cps1[0]: {cps1[0]},
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -303,7 +324,7 @@ func TestParseConfigInvalidCounterError(t *testing.T) {
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -334,7 +355,7 @@ func TestParseConfigInvalidCounterNoError(t *testing.T) {
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -364,7 +385,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -381,7 +402,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -401,7 +422,7 @@ func TestParseConfigExpand(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -425,7 +446,7 @@ func TestSimpleGather(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cp1: {cp1}, cp1: {cp1},
}, },
addEnglishSupported: false, vistaAndNewer: false,
}} }}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)
@@ -449,7 +470,65 @@ func TestSimpleGather(t *testing.T) {
err = m.Gather(&acc2) err = m.Gather(&acc2)
require.NoError(t, err) require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1) acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
}
func TestSimpleGatherWithTimestamp(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
cp1 := "\\O(I)\\C"
m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap([]string{cp1}, []float64{1.2}),
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: true,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.NoError(t, err)
fields1 := map[string]interface{}{
"C": float32(1.2),
}
tags1 := map[string]string{
"instance": "I",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
assert.True(t, acc1.HasTimestamp(measurement, MetricTime))
}
func TestGatherError(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
cp1 := "\\O(I)\\C"
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap([]string{cp1}, []float64{-2}),
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: false,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.Error(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.Error(t, err)
} }
func TestGatherInvalidDataIgnore(t *testing.T) { func TestGatherInvalidDataIgnore(t *testing.T) {
@@ -467,7 +546,7 @@ func TestGatherInvalidDataIgnore(t *testing.T) {
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
}, },
addEnglishSupported: false, vistaAndNewer: false,
}} }}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)
@@ -506,7 +585,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}} m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
@@ -540,7 +619,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps2, "\\O(*)\\*": cps2,
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m.query = fpm m.query = fpm
fpm.Open() fpm.Open()
@@ -592,7 +671,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C1": {cps1[0], cps1[2]}, "\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]}, "\\O(*)\\C2": {cps1[1], cps1[3]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}} m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
@@ -628,7 +707,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]}, "\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]},
"\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]}, "\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m.query = fpm m.query = fpm
fpm.Open() fpm.Open()
@@ -662,7 +741,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C2": {cps3[1], cps3[4]}, "\\O(*)\\C2": {cps3[1], cps3[4]},
"\\O(*)\\C3": {cps3[2], cps3[5]}, "\\O(*)\\C3": {cps3[2], cps3[5]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m.query = fpm m.query = fpm
m.Object = perfObjects m.Object = perfObjects
@@ -710,7 +789,7 @@ func TestGatherTotalNoExpansion(t *testing.T) {
"\\O(*)\\C1": {cps1[0], cps1[2]}, "\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]}, "\\O(*)\\C2": {cps1[1], cps1[3]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)

View File

@@ -22,6 +22,7 @@ var (
`%`, "-", `%`, "-",
"#", "-", "#", "-",
"$", "-") "$", "-")
defaultHttpPath = "/api/put"
defaultSeperator = "_" defaultSeperator = "_"
) )
@@ -31,7 +32,8 @@ type OpenTSDB struct {
Host string Host string
Port int Port int
HttpBatchSize int HttpBatchSize int // deprecated httpBatchSize form in 1.8
HttpPath string
Debug bool Debug bool
@@ -52,7 +54,11 @@ var sampleConfig = `
## Number of data points to send to OpenTSDB in Http requests. ## Number of data points to send to OpenTSDB in Http requests.
## Not used with telnet API. ## Not used with telnet API.
httpBatchSize = 50 http_batch_size = 50
## URI Path for Http requests to OpenTSDB.
## Used in cases where OpenTSDB is located behind a reverse proxy.
http_path = "/api/put"
## Debug true - Prints OpenTSDB communication ## Debug true - Prints OpenTSDB communication
debug = false debug = false
@@ -121,6 +127,7 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
Scheme: u.Scheme, Scheme: u.Scheme,
User: u.User, User: u.User,
BatchSize: o.HttpBatchSize, BatchSize: o.HttpBatchSize,
Path: o.HttpPath,
Debug: o.Debug, Debug: o.Debug,
} }
@@ -260,6 +267,7 @@ func sanitize(value string) string {
func init() { func init() {
outputs.Add("opentsdb", func() telegraf.Output { outputs.Add("opentsdb", func() telegraf.Output {
return &OpenTSDB{ return &OpenTSDB{
HttpPath: defaultHttpPath,
Separator: defaultSeperator, Separator: defaultSeperator,
} }
}) })

View File

@@ -26,6 +26,7 @@ type openTSDBHttp struct {
Scheme string Scheme string
User *url.Userinfo User *url.Userinfo
BatchSize int BatchSize int
Path string
Debug bool Debug bool
metricCounter int metricCounter int
@@ -123,7 +124,7 @@ func (o *openTSDBHttp) flush() error {
Scheme: o.Scheme, Scheme: o.Scheme,
User: o.User, User: o.User,
Host: fmt.Sprintf("%s:%d", o.Host, o.Port), Host: fmt.Sprintf("%s:%d", o.Host, o.Port),
Path: "/api/put", Path: o.Path,
} }
if o.Debug { if o.Debug {

View File

@@ -156,6 +156,7 @@ func BenchmarkHttpSend(b *testing.B) {
Port: port, Port: port,
Prefix: "", Prefix: "",
HttpBatchSize: BatchSize, HttpBatchSize: BatchSize,
HttpPath: "/api/put",
} }
b.ResetTimer() b.ResetTimer()