Compare commits

..

34 Commits

Author SHA1 Message Date
Greg Linton
6f4bd9ad82 Don't read/check msg length 2018-07-03 15:58:03 -06:00
Greg Linton
4ea618ea26 Enhance syslog reading and ensure proper length is read 2018-07-03 15:05:59 -06:00
Greg Linton
a7545e6cac Don't close connection if readTimeout exceeded 2018-07-02 18:56:58 -06:00
Greg Linton
58e815fdd1 Merge branch 'master' into bugfix/4335 2018-07-02 18:13:44 -06:00
Greg Linton
06682c6350 Cleanup syslog messages
Remove leading and trailing spaces and/or newlines from syslog message
field.
2018-07-02 18:11:58 -06:00
Greg Linton
839ca60b0e Improve syslog connection handling
Resolves #4335
2018-07-02 17:49:13 -06:00
Ayrdrie
9fe90d71f4 Add plugin development framework (#4324) 2018-07-02 16:34:40 -07:00
Daniel Nelson
2ee374cf50 Deprecate camelCase config options in opentsdb output 2018-07-02 15:10:10 -07:00
Daniel Nelson
220e6c5361 Update changelog 2018-07-02 15:06:35 -07:00
Jacob Lisi
c7cfc2ec39 Add http path configuration for OpenTSDB output (#4347) 2018-07-02 15:04:01 -07:00
Greg Linton
9bc63c2f7a Improve CircleCI dependency caching 2018-07-02 14:23:29 -06:00
Daniel Nelson
56ea2eb57b Update changelog 2018-07-02 13:10:39 -07:00
Daniel Nelson
210dfcee83 Fix field name typo in swap documentation 2018-07-02 13:08:43 -07:00
Steve Domino
b7a02c73b3 Document swap input plugin and move to separate file (#4342) 2018-07-02 13:07:57 -07:00
Daniel Nelson
73e2e6afc5 Update changelog 2018-07-02 11:58:14 -07:00
Pierrick Brossin
b2586a7eaf Add energy and power field and device id tag to fibaro input (#4343) 2018-07-02 11:57:05 -07:00
Daniel Nelson
abfbf4f4f2 Update changelog 2018-06-29 19:08:09 -07:00
Adrián López
85eacf268b Fix minmax and basicstats aggregators to use uint64 (#4294) 2018-06-29 19:07:08 -07:00
Daniel Nelson
1a781a5851 Update changelog 2018-06-29 19:02:53 -07:00
Vlasta Hajek
ed2bc1151b Allow use of counter time in win perf counters (#4267) 2018-06-29 19:01:28 -07:00
Daniel Nelson
b2e972cd81 Update changelog 2018-06-29 18:18:58 -07:00
Greg
54056f3808 Handle mysql input variations in the user_statistics collecting (#4306) 2018-06-29 18:16:52 -07:00
Daniel Nelson
5aa199e2b3 Update changelog 2018-06-29 18:06:45 -07:00
Daniel Nelson
9bd5e10133 Fix syslog timestamp parsing with single digit day of month (#4334) 2018-06-29 18:05:46 -07:00
Daniel Nelson
beaef8e3da Update changelog 2018-06-29 16:20:03 -07:00
maxunt
a10262c5d6 Add log message when tail is added or removed from a file (#4322) 2018-06-29 16:15:33 -07:00
Daniel Nelson
8bf18d6ac7 Fix name of hadoop example config 2018-06-29 16:07:30 -07:00
Daniel Nelson
23523ffd10 Document path tag in tail input 2018-06-21 18:02:34 -07:00
Daniel Nelson
523d761f34 Update changelog 2018-06-21 17:59:31 -07:00
JongHyok Lee
3f28add025 Added path tag to tail input plugin (#4292) 2018-06-21 17:55:54 -07:00
Daniel Nelson
ee6e4b0afd Run windows tests with -short 2018-06-21 17:46:58 -07:00
Patrick Hemmer
16454e25ba Fix postfix input handling of multi-level queues (#4333) 2018-06-21 16:01:38 -07:00
Daniel Nelson
2a1feb6db9 Update changelog 2018-06-21 14:20:35 -07:00
Ayrdrie
61e197d254 Add support for comma in logparser timestamp format (#4311) 2018-06-21 14:19:15 -07:00
55 changed files with 1066 additions and 1338 deletions

View File

@@ -16,11 +16,11 @@ jobs:
steps: steps:
- checkout - checkout
- restore_cache: - restore_cache:
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }} key: vendor-{{ checksum "Gopkg.lock" }}
- run: 'make deps' - run: 'make deps'
- save_cache: - save_cache:
name: 'vendored deps' name: 'vendored deps'
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }} key: vendor-{{ checksum "Gopkg.lock" }}
paths: paths:
- './vendor' - './vendor'
- persist_to_workspace: - persist_to_workspace:

View File

@@ -20,6 +20,12 @@
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input. - [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin. - [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input. - [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
- [#4322](https://github.com/influxdata/telegraf/pull/4322): Add log message when tail is added or removed from a file.
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
## v1.7.1 [unreleased] ## v1.7.1 [unreleased]
@@ -27,6 +33,11 @@
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser. - [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
- [#4334](https://github.com/influxdata/telegraf/pull/4334): Fix syslog timestamp parsing with single digit day of month.
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
## v1.7 [2018-06-12] ## v1.7 [2018-06-12]

View File

@@ -100,6 +100,13 @@ func init() {
} }
``` ```
### Input Plugin Development
* Run `make static` followed by `make plugin-[pluginName]` to spin up a docker dev environment
using docker-compose.
* ***[Optional]*** When developing a plugin, add a `dev` directory with a `docker-compose.yml` and `telegraf.conf`
as well as any other supporting files, where sensible.
## Adding Typed Metrics ## Adding Typed Metrics
In addition the the `AddFields` function, the accumulator also supports an In addition the the `AddFields` function, the accumulator also supports an

View File

@@ -54,11 +54,11 @@ fmtcheck:
@echo '[INFO] done.' @echo '[INFO] done.'
test-windows: test-windows:
go test ./plugins/inputs/ping/... go test -short ./plugins/inputs/ping/...
go test ./plugins/inputs/win_perf_counters/... go test -short ./plugins/inputs/win_perf_counters/...
go test ./plugins/inputs/win_services/... go test -short ./plugins/inputs/win_services/...
go test ./plugins/inputs/procstat/... go test -short ./plugins/inputs/procstat/...
go test ./plugins/inputs/ntpq/... go test -short ./plugins/inputs/ntpq/...
# vet runs the Go source code static analysis tool `vet` to find # vet runs the Go source code static analysis tool `vet` to find
# any common errors. # any common errors.
@@ -92,4 +92,15 @@ docker-image:
plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl
ragel -Z -G2 $^ -o $@ ragel -Z -G2 $^ -o $@
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static:
@echo "Building static linux binary..."
@CGO_ENABLED=0 \
GOOS=linux \
GOARCH=amd64 \
go build -ldflags "$(LDFLAGS)" ./cmd/telegraf
plugin-%:
@echo "Starting dev environment for $${$(@)} input plugin..."
@docker-compose -f plugins/inputs/$${$(@)}/dev/docker-compose.yml up
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static

View File

@@ -723,6 +723,10 @@
# ## Not used with telnet API. # ## Not used with telnet API.
# httpBatchSize = 50 # httpBatchSize = 50
# #
# ## URI Path for Http requests to OpenTSDB.
# ## Used in cases where OpenTSDB is located behind a reverse proxy.
# httpPath = "/api/put"
#
# ## Debug true - Prints OpenTSDB communication # ## Debug true - Prints OpenTSDB communication
# debug = false # debug = false
# #

View File

@@ -246,6 +246,8 @@ func convert(in interface{}) (float64, bool) {
return v, true return v, true
case int64: case int64:
return float64(v), true return float64(v), true
case uint64:
return float64(v), true
default: default:
return 0, false return 0, false
} }

View File

@@ -28,6 +28,7 @@ var m2, _ = metric.New("m1",
"c": float64(4), "c": float64(4),
"d": float64(6), "d": float64(6),
"e": float64(200), "e": float64(200),
"f": uint64(200),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
}, },
@@ -81,6 +82,10 @@ func TestBasicStatsWithPeriod(t *testing.T) {
"e_max": float64(200), "e_max": float64(200),
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -144,6 +149,10 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"e_max": float64(200), "e_max": float64(200),
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
} }
expectedTags = map[string]string{ expectedTags = map[string]string{
"foo": "bar", "foo": "bar",
@@ -169,6 +178,7 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
"c_count": float64(2), "c_count": float64(2),
"d_count": float64(2), "d_count": float64(2),
"e_count": float64(1), "e_count": float64(1),
"f_count": float64(1),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -194,6 +204,7 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
"c_min": float64(2), "c_min": float64(2),
"d_min": float64(2), "d_min": float64(2),
"e_min": float64(200), "e_min": float64(200),
"f_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -219,6 +230,7 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
"c_max": float64(4), "c_max": float64(4),
"d_max": float64(6), "d_max": float64(6),
"e_max": float64(200), "e_max": float64(200),
"f_max": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -244,6 +256,7 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
"c_mean": float64(3), "c_mean": float64(3),
"d_mean": float64(4), "d_mean": float64(4),
"e_mean": float64(200), "e_mean": float64(200),
"f_mean": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -269,6 +282,7 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
"c_sum": float64(6), "c_sum": float64(6),
"d_sum": float64(8), "d_sum": float64(8),
"e_sum": float64(200), "e_sum": float64(200),
"f_sum": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -399,6 +413,8 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
"d_min": float64(2), "d_min": float64(2),
"e_max": float64(200), //e "e_max": float64(200), //e
"e_min": float64(200), "e_min": float64(200),
"f_max": float64(200), //f
"f_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -450,6 +466,11 @@ func TestBasicStatsWithAllStats(t *testing.T) {
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"e_sum": float64(200), "e_sum": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"f_sum": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",

View File

@@ -107,6 +107,8 @@ func convert(in interface{}) (float64, bool) {
return v, true return v, true
case int64: case int64:
return float64(v), true return float64(v), true
case uint64:
return float64(v), true
default: default:
return 0, false return 0, false
} }

View File

@@ -38,6 +38,7 @@ var m2, _ = metric.New("m1",
"i": float64(1), "i": float64(1),
"j": float64(1), "j": float64(1),
"k": float64(200), "k": float64(200),
"l": uint64(200),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
}, },
@@ -85,6 +86,8 @@ func TestMinMaxWithPeriod(t *testing.T) {
"j_min": float64(1), "j_min": float64(1),
"k_max": float64(200), "k_max": float64(200),
"k_min": float64(200), "k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -154,6 +157,8 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
"j_min": float64(1), "j_min": float64(1),
"k_max": float64(200), "k_max": float64(200),
"k_min": float64(200), "k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
} }
expectedTags = map[string]string{ expectedTags = map[string]string{
"foo": "bar", "foo": "bar",

View File

@@ -24,11 +24,14 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
- fibaro - fibaro
- tags: - tags:
- deviceId (device id)
- section (section name) - section (section name)
- room (room name) - room (room name)
- name (device name) - name (device name)
- type (device type) - type (device type)
- fields: - fields:
- energy (float, when available from device)
- power (float, when available from device)
- value (float) - value (float)
- value2 (float, when available from device) - value2 (float, when available from device)
@@ -36,16 +39,17 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
### Example Output: ### Example Output:
``` ```
fibaro,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1523351010000000000 fibaro,deviceId=9,host=vm1,name=Fenêtre\ haute,room=Cuisine,section=Cuisine,type=com.fibaro.FGRM222 energy=2.04,power=0.7,value=99,value2=99 1529996807000000000
fibaro,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 value=99,value2=99 1523351010000000000 fibaro,deviceId=10,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1529996807000000000
fibaro,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1523351010000000000 fibaro,deviceId=13,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 energy=4.33,power=0.7,value=99,value2=99 1529996807000000000
fibaro,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1523351010000000000 fibaro,deviceId=21,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1529996807000000000
fibaro,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=17.87 1523351010000000000 fibaro,deviceId=90,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1529996807000000000
fibaro,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1523351010000000000 fibaro,deviceId=163,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=21.62 1529996807000000000
fibaro,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=92 1523351010000000000 fibaro,deviceId=191,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1529996807000000000
fibaro,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1523351010000000000 fibaro,deviceId=193,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=195 1529996807000000000
fibaro,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=880 1523351010000000000 fibaro,deviceId=200,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1529996807000000000
fibaro,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=53 1523351010000000000 fibaro,deviceId=220,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=536 1529996807000000000
fibaro,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1006.9 1523351010000000000 fibaro,deviceId=221,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=61 1529996807000000000
fibaro,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=58 1523351010000000000 fibaro,deviceId=222,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1013.7 1529996807000000000
fibaro,deviceId=223,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=44 1529996807000000000
``` ```

View File

@@ -67,6 +67,8 @@ type Devices struct {
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
Properties struct { Properties struct {
Dead interface{} `json:"dead"` Dead interface{} `json:"dead"`
Energy interface{} `json:"energy"`
Power interface{} `json:"power"`
Value interface{} `json:"value"` Value interface{} `json:"value"`
Value2 interface{} `json:"value2"` Value2 interface{} `json:"value2"`
} `json:"properties"` } `json:"properties"`
@@ -162,13 +164,26 @@ func (f *Fibaro) Gather(acc telegraf.Accumulator) error {
} }
tags := map[string]string{ tags := map[string]string{
"section": sections[rooms[device.RoomID].SectionID], "deviceId": strconv.FormatUint(uint64(device.ID), 10),
"room": rooms[device.RoomID].Name, "section": sections[rooms[device.RoomID].SectionID],
"name": device.Name, "room": rooms[device.RoomID].Name,
"type": device.Type, "name": device.Name,
"type": device.Type,
} }
fields := make(map[string]interface{}) fields := make(map[string]interface{})
if device.Properties.Energy != nil {
if fValue, err := strconv.ParseFloat(device.Properties.Energy.(string), 64); err == nil {
fields["energy"] = fValue
}
}
if device.Properties.Power != nil {
if fValue, err := strconv.ParseFloat(device.Properties.Power.(string), 64); err == nil {
fields["power"] = fValue
}
}
if device.Properties.Value != nil { if device.Properties.Value != nil {
value := device.Properties.Value value := device.Properties.Value
switch value { switch value {

View File

@@ -119,6 +119,8 @@ const devicesJSON = `
"type": "com.fibaro.FGRM222", "type": "com.fibaro.FGRM222",
"enabled": true, "enabled": true,
"properties": { "properties": {
"energy": "4.33",
"power": "0.7",
"dead": "false", "dead": "false",
"value": "50", "value": "50",
"value2": "75" "value2": "75"
@@ -178,27 +180,27 @@ func TestJSONSuccess(t *testing.T) {
assert.Equal(t, uint64(5), acc.NMetrics()) assert.Equal(t, uint64(5), acc.NMetrics())
// Ensure fields / values are correct - Device 1 // Ensure fields / values are correct - Device 1
tags := map[string]string{"section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"} tags := map[string]string{"deviceId": "1", "section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}
fields := map[string]interface{}{"value": float64(0)} fields := map[string]interface{}{"value": float64(0)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 2 // Ensure fields / values are correct - Device 2
tags = map[string]string{"section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"} tags = map[string]string{"deviceId": "2", "section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"}
fields = map[string]interface{}{"value": float64(1)} fields = map[string]interface{}{"value": float64(1)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 3 // Ensure fields / values are correct - Device 3
tags = map[string]string{"section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"} tags = map[string]string{"deviceId": "3", "section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"}
fields = map[string]interface{}{"value": float64(67)} fields = map[string]interface{}{"value": float64(67)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 4 // Ensure fields / values are correct - Device 4
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"} tags = map[string]string{"deviceId": "4", "section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"}
fields = map[string]interface{}{"value": float64(22.8)} fields = map[string]interface{}{"value": float64(22.8)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
// Ensure fields / values are correct - Device 5 // Ensure fields / values are correct - Device 5
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"} tags = map[string]string{"deviceId": "5", "section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"}
fields = map[string]interface{}{"value": float64(50), "value2": float64(75)} fields = map[string]interface{}{"energy": float64(4.33), "power": float64(0.7), "value": float64(50), "value2": float64(75)}
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags) acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
} }

View File

@@ -108,7 +108,9 @@ You must capture at least one field per line.
- ts-"CUSTOM" - ts-"CUSTOM"
CUSTOM time layouts must be within quotes and be the representation of the CUSTOM time layouts must be within quotes and be the representation of the
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006` "reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`.
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
To match a comma decimal point you can use a period in the pattern string.
See https://golang.org/pkg/time/#Parse for more details. See https://golang.org/pkg/time/#Parse for more details.
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns), Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),

View File

@@ -0,0 +1,13 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./test.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -0,0 +1,12 @@
[agent]
interval="1s"
flush_interval="1s"
[[inputs.logparser]]
files = ["/var/log/test.log"]
from_beginning = true
[inputs.logparser.grok]
patterns = [ "%{COMBINED_LOG_FORMAT}", "%{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{TIMESTAMP_ISO8601:timestamp}\\] \"(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})\" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) %{QS:referrer} %{QS:agent}"]
[[outputs.file]]
files = ["stdout"]

View File

@@ -0,0 +1,2 @@
127.0.0.1 ident auth [10/Oct/2000:13:55:36 -0700] "GET /anything HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"
127.0.0.1 ident auth [2018-02-21 13:10:34,555] "GET /peter HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"

View File

@@ -293,7 +293,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
timestamp = time.Unix(0, iv) timestamp = time.Unix(0, iv)
} }
case SYSLOG_TIMESTAMP: case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc) ts, err := time.ParseInLocation(time.Stamp, v, p.loc)
if err == nil { if err == nil {
if ts.Year() == 0 { if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0) ts = ts.AddDate(timestamp.Year(), 0, 0)
@@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc) ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts

View File

@@ -971,14 +971,57 @@ func TestNewlineInPatterns(t *testing.T) {
require.NotNil(t, m) require.NotNil(t, m)
} }
func TestSyslogTimestampParser(t *testing.T) { func TestSyslogTimestamp(t *testing.T) {
p := &Parser{ tests := []struct {
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`}, name string
timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) }, line string
expected time.Time
}{
{
name: "two digit day of month",
line: "Sep 25 09:01:55 value=42",
expected: time.Date(2018, time.September, 25, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month single space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month double space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
} }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2017, time.April, 1, 0, 0, 0, 0, time.UTC) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine(tt.line)
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, tt.expected, m.Time())
})
}
}
func TestReplaceTimestampComma(t *testing.T) {
p := &Parser{
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
}
require.NoError(t, p.Compile()) require.NoError(t, p.Compile())
m, err := p.ParseLine("Sep 25 09:01:55 value=42") m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, m) require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year()) require.Equal(t, 2018, m.Time().Year())
require.Equal(t, 13, m.Time().Hour())
require.Equal(t, 34, m.Time().Second())
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
} }

View File

@@ -203,6 +203,10 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
Poll: poll, Poll: poll,
Logger: tail.DiscardingLogger, Logger: tail.DiscardingLogger,
}) })
//add message saying a new tailer was added for the file
log.Printf("D! tail added for file: %v", file)
if err != nil { if err != nil {
l.acc.AddError(err) l.acc.AddError(err)
continue continue
@@ -287,6 +291,10 @@ func (l *LogParserPlugin) Stop() {
for _, t := range l.tailers { for _, t := range l.tailers {
err := t.Stop() err := t.Stop()
//message for a stopped tailer
log.Printf("D! tail dropped for file: %v", t.Filename)
if err != nil { if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename) log.Printf("E! Error stopping tail on file %s\n", t.Filename)
} }

View File

@@ -0,0 +1,16 @@
version: '3'
services:
mongodb:
image: mongo
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
depends_on:
- mongodb
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -0,0 +1,9 @@
[agent]
interval="1s"
flush_interval="3s"
[[inputs.mongodb]]
servers = ["mongodb://mongodb:27017"]
[[outputs.file]]
files = ["stdout"]

View File

@@ -0,0 +1,42 @@
version: '3'
services:
mysql:
image: mysql:5.7
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
maria:
image: mariadb
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
command: mysqld --userstat=1
percona:
image: percona
restart: always
environment:
MYSQL_ROOT_PASSWORD: telegraf
MYSQL_DATABASE: telegraf
MYSQL_USER: telegraf
MYSQL_PASSWORD: telegraf
telegraf:
image: glinton/scratch
depends_on:
- mysql
- maria
- percona
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -0,0 +1,61 @@
# Uncomment each input as needed to test plugin
## mysql
#[[inputs.mysql]]
# servers = ["root:telegraf@tcp(mysql:3306)/"]
# gather_table_schema = true
# gather_process_list = true
# gather_user_statistics = true
# gather_info_schema_auto_inc = true
# gather_innodb_metrics = true
# gather_slave_status = true
# gather_binary_logs = false
# gather_table_io_waits = true
# gather_table_lock_waits = true
# gather_index_io_waits = true
# gather_event_waits = true
# gather_file_events_stats = true
# gather_perf_events_statements = true
# interval_slow = "30m"
# table_schema_databases = []
#
## mariadb
#[[inputs.mysql]]
# servers = ["root:telegraf@tcp(maria:3306)/"]
# gather_table_schema = true
# gather_process_list = true
# gather_user_statistics = true
# gather_info_schema_auto_inc = true
# gather_innodb_metrics = true
# gather_slave_status = true
# gather_binary_logs = false
# gather_table_io_waits = true
# gather_table_lock_waits = true
# gather_index_io_waits = true
# gather_event_waits = true
# gather_file_events_stats = true
# gather_perf_events_statements = true
# interval_slow = "30m"
# table_schema_databases = []
# percona
[[inputs.mysql]]
servers = ["root:telegraf@tcp(percona:3306)/"]
gather_table_schema = true
gather_process_list = true
gather_user_statistics = true
gather_info_schema_auto_inc = true
gather_innodb_metrics = true
gather_slave_status = true
gather_binary_logs = false
gather_table_io_waits = true
gather_table_lock_waits = true
gather_index_io_waits = true
gather_event_waits = true
gather_file_events_stats = true
gather_perf_events_statements = true
interval_slow = "30m"
table_schema_databases = []
[[outputs.file]]
files = ["stdout"]

View File

@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"database/sql" "database/sql"
"fmt" "fmt"
"log"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -80,7 +79,7 @@ var sampleConfig = `
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true gather_process_list = true
# #
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS ## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true gather_user_statistics = true
# #
## gather auto_increment columns and max values from information schema ## gather auto_increment columns and max values from information schema
@@ -282,9 +281,8 @@ const (
GROUP BY command,state GROUP BY command,state
ORDER BY null` ORDER BY null`
infoSchemaUserStatisticsQuery = ` infoSchemaUserStatisticsQuery = `
SELECT *,count(*) SELECT *
FROM information_schema.user_statistics FROM information_schema.user_statistics`
GROUP BY user`
infoSchemaAutoIncQuery = ` infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment, SELECT table_schema, table_name, column_name, auto_increment,
CAST(pow(2, case data_type CAST(pow(2, case data_type
@@ -761,103 +759,6 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
if len(fields) > 0 { if len(fields) > 0 {
acc.AddFields("mysql", fields, tags) acc.AddFields("mysql", fields, tags)
} }
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering process list: %s", err)
} else {
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
if err != nil {
return err
}
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
}
}
// gather connection metrics from user_statistics for each user
if m.GatherUserStatistics {
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering user stats: %s", err)
} else {
for conn_rows.Next() {
var user string
var total_connections int64
var concurrent_connections int64
var connected_time int64
var busy_time int64
var cpu_time int64
var bytes_received int64
var bytes_sent int64
var binlog_bytes_written int64
var rows_fetched int64
var rows_updated int64
var table_rows_read int64
var select_commands int64
var update_commands int64
var other_commands int64
var commit_transactions int64
var rollback_transactions int64
var denied_connections int64
var lost_connections int64
var access_denied int64
var empty_queries int64
var total_ssl_connections int64
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
}
}
return nil return nil
} }
@@ -908,6 +809,29 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
} else { } else {
acc.AddFields("mysql_process_list", fields, tags) acc.AddFields("mysql_process_list", fields, tags)
} }
// get count of connections from each user
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
return err
}
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
return nil return nil
} }
@@ -917,77 +841,190 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
// run query // run query
rows, err := db.Query(infoSchemaUserStatisticsQuery) rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil { if err != nil {
// disable collecting if table is not found (mysql specific error)
// (suppresses repeat errors)
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
m.GatherUserStatistics = false
}
return err return err
} }
defer rows.Close() defer rows.Close()
var (
user string cols, err := columnsToLower(rows.Columns())
total_connections int64 if err != nil {
concurrent_connections int64 return err
connected_time int64 }
busy_time int64
cpu_time int64 read, err := getColSlice(len(cols))
bytes_received int64 if err != nil {
bytes_sent int64 return err
binlog_bytes_written int64 }
rows_fetched int64
rows_updated int64
table_rows_read int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
count uint32
)
servtag := getDSNTag(serv) servtag := getDSNTag(serv)
for rows.Next() { for rows.Next() {
err = rows.Scan(&user, &total_connections, &concurrent_connections, err = rows.Scan(read...)
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections, &count,
)
if err != nil { if err != nil {
return err return err
} }
tags := map[string]string{"server": servtag, "user": user} tags := map[string]string{"server": servtag, "user": *read[0].(*string)}
fields := map[string]interface{}{ fields := map[string]interface{}{}
"total_connections": total_connections, for i := range cols {
"concurrent_connections": concurrent_connections, if i == 0 {
"connected_time": connected_time, continue // skip "user"
"busy_time": busy_time, }
"cpu_time": cpu_time, switch v := read[i].(type) {
"bytes_received": bytes_received, case *int64:
"bytes_sent": bytes_sent, fields[cols[i]] = *v
"binlog_bytes_written": binlog_bytes_written, case *float64:
"rows_fetched": rows_fetched, fields[cols[i]] = *v
"rows_updated": rows_updated, case *string:
"table_rows_read": table_rows_read, fields[cols[i]] = *v
"select_commands": select_commands, default:
"update_commands": update_commands, return fmt.Errorf("Unknown column type - %T", v)
"other_commands": other_commands, }
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
} }
acc.AddFields("mysql_user_stats", fields, tags) acc.AddFields("mysql_user_stats", fields, tags)
} }
return nil return nil
} }
// columnsToLower converts selected column names to lowercase.
func columnsToLower(s []string, e error) ([]string, error) {
if e != nil {
return nil, e
}
d := make([]string, len(s))
for i := range s {
d[i] = strings.ToLower(s[i])
}
return d, nil
}
// getColSlice returns an in interface slice that can be used in the row.Scan().
func getColSlice(l int) ([]interface{}, error) {
// list of all possible column names
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_read int64
rows_sent int64
rows_deleted int64
rows_inserted int64
rows_updated int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
max_statement_time_exceeded int64
// maria specific
fbusy_time float64
fcpu_time float64
// percona specific
rows_fetched int64
table_rows_read int64
)
switch l {
case 23: // maria5
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
}, nil
case 25: // maria10
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
&max_statement_time_exceeded,
}, nil
case 22: // percona
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&busy_time,
&cpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_fetched,
&rows_updated,
&table_rows_read,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
}, nil
}
return nil, fmt.Errorf("Not Supported - %d columns", l)
}
// gatherPerfTableIOWaits can be used to get total count and time // gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process // of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"path" "path/filepath"
"strings" "strings"
"time" "time"
@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil return strings.TrimSpace(string(qd)), nil
} }
func qScan(path string) (int64, int64, int64, error) { func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
var length, size int64 var length, size int64
var oldest time.Time var oldest time.Time
for _, finfo := range finfos { err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
length++ length++
size += finfo.Size() size += finfo.Size()
ctime := statCTime(finfo.Sys()) ctime := statCTime(finfo.Sys())
if ctime.IsZero() { if ctime.IsZero() {
continue return nil
} }
if oldest.IsZero() || ctime.Before(oldest) { if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime oldest = ctime
} }
return nil
})
if err != nil {
return 0, 0, 0, err
} }
var age int64 var age int64
if !oldest.IsZero() { if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second) age = int64(time.Now().Sub(oldest) / time.Second)
} else if len(finfos) != 0 { } else if length != 0 {
// system doesn't support ctime // system doesn't support ctime
age = -1 age = -1
} }
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
} }
} }
for _, q := range []string{"active", "hold", "incoming", "maildrop"} { for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, q)) length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err)) acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue continue
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
} }
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil return nil
} }

View File

@@ -3,7 +3,7 @@ package postfix
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path/filepath"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(td) defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
require.NoError(t, os.Mkdir(path.Join(td, q), 0755)) require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
} }
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
p := Postfix{ p := Postfix{
QueueDirectory: td, QueueDirectory: td,

View File

@@ -1,8 +1,6 @@
# Procstat Input Plugin # Procstat Input Plugin
The procstat plugin can be used to monitor the system resource usage of one or more processes. The procstat plugin can be used to monitor the system resource usage of one or more processes.
The procstat_lookup metric displays the query information,
specifically the number of PIDs returned on a search
Processes can be selected for monitoring using one of several methods: Processes can be selected for monitoring using one of several methods:
- pidfile - pidfile

View File

@@ -1,117 +0,0 @@
package reader
import (
"io/ioutil"
"log"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/parsers"
)
type Reader struct {
Filepaths []string `toml:"files"`
FromBeginning bool
DataFormat string `toml:"data_format"`
ParserConfig parsers.Config
Parser parsers.Parser
Tags []string
Filenames []string
//for grok parser
Patterns []string
namedPatterns []string
CustomPatterns string
CustomPatternFiles []string
}
const sampleConfig = `## Files to parse.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""`
// SampleConfig returns the default configuration of the Input
func (r *Reader) SampleConfig() string {
return sampleConfig
}
func (r *Reader) Description() string {
return "reload and gather from file[s] on telegraf's interval"
}
func (r *Reader) Gather(acc telegraf.Accumulator) error {
r.refreshFilePaths()
for _, k := range r.Filenames {
metrics, err := r.readMetric(k)
if err != nil {
return err
}
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags())
}
}
return nil
}
func (r *Reader) compileParser() {
if r.DataFormat == "" {
log.Printf("E! No data_format specified")
return
}
r.ParserConfig = parsers.Config{
DataFormat: r.DataFormat,
TagKeys: r.Tags,
//grok settings
Patterns: r.Patterns,
NamedPatterns: r.namedPatterns,
CustomPatterns: r.CustomPatterns,
CustomPatternFiles: r.CustomPatternFiles,
}
nParser, err := parsers.NewParser(&r.ParserConfig)
if err != nil {
log.Printf("E! Error building parser: %v", err)
}
r.Parser = nParser
}
func (r *Reader) refreshFilePaths() {
var allFiles []string
for _, filepath := range r.Filepaths {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
continue
}
files := g.Match()
for k := range files {
allFiles = append(allFiles, k)
}
}
r.Filenames = allFiles
}
//requires that Parser has been compiled
func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) {
fileContents, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("E! File could not be opened: %v", filename)
}
return r.Parser.Parse(fileContents)
}

View File

@@ -1,58 +0,0 @@
package reader
import (
"log"
"runtime"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestRefreshFilePaths(t *testing.T) {
testDir := getPluginDir()
r := Reader{
Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"},
}
r.refreshFilePaths()
//log.Printf("filenames: %v", filenames)
assert.Equal(t, len(r.Filenames), 2)
}
func TestJSONParserCompile(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/json_a.log"},
DataFormat: "json",
Tags: []string{"parent_ignored_child"},
}
r.compileParser()
r.Gather(&acc)
log.Printf("acc: %v", acc.Metrics[0].Tags)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields))
}
func TestGrokParser(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"},
DataFormat: "grok",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
r.compileParser()
err := r.Gather(&acc)
log.Printf("err: %v", err)
log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields)
log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields)
assert.Equal(t, 2, len(acc.Metrics))
}
func getPluginDir() string {
_, filename, _, _ := runtime.Caller(1)
return strings.Replace(filename, "/reader/reader_test.go", "", 1)
}

View File

@@ -1,2 +0,0 @@
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45

View File

@@ -1,14 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -1,6 +1,7 @@
package syslog package syslog
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io" "io"
@@ -279,20 +280,51 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
conn.Close() conn.Close()
}() }()
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { for {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) data := &bytes.Buffer{}
}
var p *rfc5425.Parser // read the data
if s.BestEffort { if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort()) conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
} else { }
p = rfc5425.NewParser(conn)
}
p.ParseExecuting(func(r *rfc5425.Result) { n, err := io.Copy(data, conn)
s.store(*r, acc) if err != nil {
}) // read timeout reached, parse what we have
if er, ok := err.(net.Error); ok && er.Timeout() {
if n == 0 {
continue
}
goto parseMsg
}
// client has closed connection, return
if err == io.EOF {
if n > 0 {
goto parseMsg
}
return
}
// other error, log and return
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
return
}
// handle client disconnect
if n == 0 {
return
}
parseMsg:
var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(data, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(data)
}
p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
}
} }
func (s *Syslog) setKeepAlive(c *net.TCPConn) error { func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
@@ -361,7 +393,7 @@ func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
} }
if msg.Message() != nil { if msg.Message() != nil {
flds["message"] = *msg.Message() flds["message"] = strings.TrimSpace(*msg.Message())
} }
if msg.StructuredData() != nil { if msg.StructuredData() != nil {

View File

@@ -0,0 +1,30 @@
# Swap Input Plugin
The swap plugin collects system swap metrics.
For a more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
### Configuration:
```toml
# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
```
### Metrics:
- swap
- fields:
- free (int)
- total (int)
- used (int)
- used_percent (float)
- in (int)
- out (int)
### Example Output:
```
swap total=20855394304i,used_percent=45.43883523785713,used=9476448256i,free=1715331072i 1511894782000000000
```

View File

@@ -42,45 +42,9 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() { func init() {
ps := newSystemPS() ps := newSystemPS()
inputs.Add("mem", func() telegraf.Input { inputs.Add("mem", func() telegraf.Input {
return &MemStats{ps: ps} return &MemStats{ps: ps}
}) })
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
} }

View File

@@ -30,17 +30,6 @@ func TestMemStats(t *testing.T) {
mps.On("VMStat").Return(vms, nil) mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc) err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@@ -61,15 +50,4 @@ func TestMemStats(t *testing.T) {
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string)) acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))
acc.Metrics = nil acc.Metrics = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
} }

View File

@@ -0,0 +1,47 @@
package system
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -0,0 +1,38 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/require"
)
func TestSwapStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -1,4 +1,4 @@
# tail Input Plugin # Tail Input Plugin
The tail plugin "tails" a logfile and parses each log message. The tail plugin "tails" a logfile and parses each log message.
@@ -49,3 +49,7 @@ The plugin expects messages in one of the
data_format = "influx" data_format = "influx"
``` ```
### Metrics:
Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.

View File

@@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) {
m, err = t.parser.ParseLine(text) m, err = t.parser.ParseLine(text)
if err == nil { if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else { } else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err)) tailer.Filename, line.Text, err))

View File

@@ -72,6 +72,15 @@ It is recommended NOT to use this on OSes starting with Vista and newer because
Example for Windows Server 2003, this would be set to true: Example for Windows Server 2003, this would be set to true:
`PreVistaSupport=true` `PreVistaSupport=true`
#### UsePerfCounterTime
Bool, if set to `true` will request a timestamp along with the PerfCounter data.
If se to `false`, current time will be used.
Supported on Windows Vista/Windows Server 2008 and newer
Example:
`UsePerfCounterTime=true`
### Object ### Object
See Entry below. See Entry below.

View File

@@ -0,0 +1,73 @@
// Copyright (c) 2010 The win Authors. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// 3. The names of the authors may not be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
// THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// This is the official list of 'win' authors for copyright purposes.
//
// Alexander Neumann <an2048@googlemail.com>
// Joseph Watson <jtwatson@linux-consulting.us>
// Kevin Pors <krpors@gmail.com>
// +build windows
package win_perf_counters
import (
"syscall"
)
type SYSTEMTIME struct {
wYear uint16
wMonth uint16
wDayOfWeek uint16
wDay uint16
wHour uint16
wMinute uint16
wSecond uint16
wMilliseconds uint16
}
type FILETIME struct {
dwLowDateTime uint32
dwHighDateTime uint32
}
var (
// Library
libkrnDll *syscall.DLL
// Functions
krn_FileTimeToSystemTime *syscall.Proc
krn_FileTimeToLocalFileTime *syscall.Proc
krn_LocalFileTimeToFileTime *syscall.Proc
krn_WideCharToMultiByte *syscall.Proc
)
func init() {
libkrnDll = syscall.MustLoadDLL("Kernel32.dll")
krn_FileTimeToSystemTime = libkrnDll.MustFindProc("FileTimeToSystemTime")
krn_FileTimeToLocalFileTime = libkrnDll.MustFindProc("FileTimeToLocalFileTime")
krn_LocalFileTimeToFileTime = libkrnDll.MustFindProc("LocalFileTimeToFileTime")
krn_WideCharToMultiByte = libkrnDll.MustFindProc("WideCharToMultiByte")
}

View File

@@ -38,12 +38,15 @@ import (
"unsafe" "unsafe"
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
"time"
) )
// Error codes // Error codes
const ( const (
ERROR_SUCCESS = 0 ERROR_SUCCESS = 0
ERROR_INVALID_FUNCTION = 1 ERROR_FAILURE = 1
ERROR_INVALID_FUNCTION = 1
EPOCH_DIFFERENCE_MICROS int64 = 11644473600000000
) )
type ( type (
@@ -170,6 +173,7 @@ var (
pdh_AddEnglishCounterW *syscall.Proc pdh_AddEnglishCounterW *syscall.Proc
pdh_CloseQuery *syscall.Proc pdh_CloseQuery *syscall.Proc
pdh_CollectQueryData *syscall.Proc pdh_CollectQueryData *syscall.Proc
pdh_CollectQueryDataWithTime *syscall.Proc
pdh_GetFormattedCounterValue *syscall.Proc pdh_GetFormattedCounterValue *syscall.Proc
pdh_GetFormattedCounterArrayW *syscall.Proc pdh_GetFormattedCounterArrayW *syscall.Proc
pdh_OpenQuery *syscall.Proc pdh_OpenQuery *syscall.Proc
@@ -187,6 +191,7 @@ func init() {
pdh_AddEnglishCounterW, _ = libpdhDll.FindProc("PdhAddEnglishCounterW") // XXX: only supported on versions > Vista. pdh_AddEnglishCounterW, _ = libpdhDll.FindProc("PdhAddEnglishCounterW") // XXX: only supported on versions > Vista.
pdh_CloseQuery = libpdhDll.MustFindProc("PdhCloseQuery") pdh_CloseQuery = libpdhDll.MustFindProc("PdhCloseQuery")
pdh_CollectQueryData = libpdhDll.MustFindProc("PdhCollectQueryData") pdh_CollectQueryData = libpdhDll.MustFindProc("PdhCollectQueryData")
pdh_CollectQueryDataWithTime, _ = libpdhDll.FindProc("PdhCollectQueryDataWithTime")
pdh_GetFormattedCounterValue = libpdhDll.MustFindProc("PdhGetFormattedCounterValue") pdh_GetFormattedCounterValue = libpdhDll.MustFindProc("PdhGetFormattedCounterValue")
pdh_GetFormattedCounterArrayW = libpdhDll.MustFindProc("PdhGetFormattedCounterArrayW") pdh_GetFormattedCounterArrayW = libpdhDll.MustFindProc("PdhGetFormattedCounterArrayW")
pdh_OpenQuery = libpdhDll.MustFindProc("PdhOpenQuery") pdh_OpenQuery = libpdhDll.MustFindProc("PdhOpenQuery")
@@ -303,6 +308,37 @@ func PdhCollectQueryData(hQuery PDH_HQUERY) uint32 {
return uint32(ret) return uint32(ret)
} }
// PdhCollectQueryDataWithTime queries data from perfmon, retrieving the device/windows timestamp from the node it was collected on.
// Converts the filetime structure to a GO time class and returns the native time.
//
func PdhCollectQueryDataWithTime(hQuery PDH_HQUERY) (uint32, time.Time) {
var localFileTime FILETIME
ret, _, _ := pdh_CollectQueryDataWithTime.Call(uintptr(hQuery), uintptr(unsafe.Pointer(&localFileTime)))
if ret == ERROR_SUCCESS {
var utcFileTime FILETIME
ret, _, _ := krn_LocalFileTimeToFileTime.Call(
uintptr(unsafe.Pointer(&localFileTime)),
uintptr(unsafe.Pointer(&utcFileTime)))
if ret == 0 {
return uint32(ERROR_FAILURE), time.Now()
}
// First convert 100-ns intervals to microseconds, then adjust for the
// epoch difference
var totalMicroSeconds int64
totalMicroSeconds = ((int64(utcFileTime.dwHighDateTime) << 32) | int64(utcFileTime.dwLowDateTime)) / 10
totalMicroSeconds -= EPOCH_DIFFERENCE_MICROS
retTime := time.Unix(0, totalMicroSeconds*1000)
return uint32(ERROR_SUCCESS), retTime
}
return uint32(ret), time.Now()
}
// PdhGetFormattedCounterValueDouble formats the given hCounter using a 'double'. The result is set into the specialized union struct pValue. // PdhGetFormattedCounterValueDouble formats the given hCounter using a 'double'. The result is set into the specialized union struct pValue.
// This function does not directly translate to a Windows counterpart due to union specialization tricks. // This function does not directly translate to a Windows counterpart due to union specialization tricks.
func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32, pValue *PDH_FMT_COUNTERVALUE_DOUBLE) uint32 { func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32, pValue *PDH_FMT_COUNTERVALUE_DOUBLE) uint32 {

View File

@@ -6,6 +6,7 @@ package win_perf_counters
import ( import (
"errors" "errors"
"syscall" "syscall"
"time"
"unsafe" "unsafe"
) )
@@ -26,7 +27,8 @@ type PerformanceQuery interface {
GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error)
GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error)
CollectData() error CollectData() error
AddEnglishCounterSupported() bool CollectDataWithTime() (time.Time, error)
IsVistaOrNewer() bool
} }
//PdhError represents error returned from Performance Counters API //PdhError represents error returned from Performance Counters API
@@ -61,8 +63,8 @@ func (m *PerformanceQueryImpl) Open() error {
} }
} }
var handle PDH_HQUERY var handle PDH_HQUERY
ret := PdhOpenQuery(0, 0, &handle)
if ret != ERROR_SUCCESS { if ret := PdhOpenQuery(0, 0, &handle); ret != ERROR_SUCCESS {
return NewPdhError(ret) return NewPdhError(ret)
} }
m.query = handle m.query = handle
@@ -74,8 +76,8 @@ func (m *PerformanceQueryImpl) Close() error {
if m.query == 0 { if m.query == 0 {
return errors.New("uninitialised query") return errors.New("uninitialised query")
} }
ret := PdhCloseQuery(m.query)
if ret != ERROR_SUCCESS { if ret := PdhCloseQuery(m.query); ret != ERROR_SUCCESS {
return NewPdhError(ret) return NewPdhError(ret)
} }
m.query = 0 m.query = 0
@@ -87,8 +89,8 @@ func (m *PerformanceQueryImpl) AddCounterToQuery(counterPath string) (PDH_HCOUNT
if m.query == 0 { if m.query == 0 {
return 0, errors.New("uninitialised query") return 0, errors.New("uninitialised query")
} }
ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle)
if ret != ERROR_SUCCESS { if ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
return 0, NewPdhError(ret) return 0, NewPdhError(ret)
} }
return counterHandle, nil return counterHandle, nil
@@ -99,8 +101,7 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
if m.query == 0 { if m.query == 0 {
return 0, errors.New("uninitialised query") return 0, errors.New("uninitialised query")
} }
ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle) if ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
if ret != ERROR_SUCCESS {
return 0, NewPdhError(ret) return 0, NewPdhError(ret)
} }
return counterHandle, nil return counterHandle, nil
@@ -110,13 +111,11 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) { func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) {
var bufSize uint32 var bufSize uint32
var buff []byte var buff []byte
var ret uint32
ret := PdhGetCounterInfo(counterHandle, 0, &bufSize, nil) if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, nil); ret == PDH_MORE_DATA {
if ret == PDH_MORE_DATA {
buff = make([]byte, bufSize) buff = make([]byte, bufSize)
bufSize = uint32(len(buff)) bufSize = uint32(len(buff))
ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0]) if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0]); ret == ERROR_SUCCESS {
if ret == ERROR_SUCCESS {
ci := (*PDH_COUNTER_INFO)(unsafe.Pointer(&buff[0])) ci := (*PDH_COUNTER_INFO)(unsafe.Pointer(&buff[0]))
return UTF16PtrToString(ci.SzFullPath), nil return UTF16PtrToString(ci.SzFullPath), nil
} }
@@ -128,9 +127,9 @@ func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (strin
func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string, error) { func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string, error) {
var bufSize uint32 var bufSize uint32
var buff []uint16 var buff []uint16
var ret uint32
ret := PdhExpandWildCardPath(counterPath, nil, &bufSize) if ret = PdhExpandWildCardPath(counterPath, nil, &bufSize); ret == PDH_MORE_DATA {
if ret == PDH_MORE_DATA {
buff = make([]uint16, bufSize) buff = make([]uint16, bufSize)
bufSize = uint32(len(buff)) bufSize = uint32(len(buff))
ret = PdhExpandWildCardPath(counterPath, &buff[0], &bufSize) ret = PdhExpandWildCardPath(counterPath, &buff[0], &bufSize)
@@ -146,8 +145,9 @@ func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string,
func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) { func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) {
var counterType uint32 var counterType uint32
var value PDH_FMT_COUNTERVALUE_DOUBLE var value PDH_FMT_COUNTERVALUE_DOUBLE
ret := PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value) var ret uint32
if ret == ERROR_SUCCESS {
if ret = PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value); ret == ERROR_SUCCESS {
if value.CStatus == PDH_CSTATUS_VALID_DATA || value.CStatus == PDH_CSTATUS_NEW_DATA { if value.CStatus == PDH_CSTATUS_VALID_DATA || value.CStatus == PDH_CSTATUS_NEW_DATA {
return value.DoubleValue, nil return value.DoubleValue, nil
} else { } else {
@@ -161,11 +161,12 @@ func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUN
func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) { func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
var buffSize uint32 var buffSize uint32
var itemCount uint32 var itemCount uint32
ret := PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil) var ret uint32
if ret == PDH_MORE_DATA {
if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil); ret == PDH_MORE_DATA {
buff := make([]byte, buffSize) buff := make([]byte, buffSize)
ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0])
if ret == ERROR_SUCCESS { if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0]); ret == ERROR_SUCCESS {
items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount] items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount]
values := make([]CounterValue, 0, itemCount) values := make([]CounterValue, 0, itemCount)
for _, item := range items { for _, item := range items {
@@ -181,17 +182,29 @@ func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
} }
func (m *PerformanceQueryImpl) CollectData() error { func (m *PerformanceQueryImpl) CollectData() error {
var ret uint32
if m.query == 0 { if m.query == 0 {
return errors.New("uninitialised query") return errors.New("uninitialised query")
} }
ret := PdhCollectQueryData(m.query)
if ret != ERROR_SUCCESS { if ret = PdhCollectQueryData(m.query); ret != ERROR_SUCCESS {
return NewPdhError(ret) return NewPdhError(ret)
} }
return nil return nil
} }
func (m *PerformanceQueryImpl) AddEnglishCounterSupported() bool { func (m *PerformanceQueryImpl) CollectDataWithTime() (time.Time, error) {
if m.query == 0 {
return time.Now(), errors.New("uninitialised query")
}
ret, mtime := PdhCollectQueryDataWithTime(m.query)
if ret != ERROR_SUCCESS {
return time.Now(), NewPdhError(ret)
}
return mtime, nil
}
func (m *PerformanceQueryImpl) IsVistaOrNewer() bool {
return PdhAddEnglishCounterSupported() return PdhAddEnglishCounterSupported()
} }

View File

@@ -23,6 +23,8 @@ var sampleConfig = `
## agent, it will not be gathered. ## agent, it will not be gathered.
## Settings: ## Settings:
# PrintValid = false # Print All matching performance counters # PrintValid = false # Print All matching performance counters
# Whether request a timestamp along with the PerfCounter data or just use current time
# UsePerfCounterTime=true
# If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded # If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded
# and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names. # and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names.
# If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names. # If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names.
@@ -78,6 +80,7 @@ type Win_PerfCounters struct {
PrintValid bool PrintValid bool
//deprecated: determined dynamically //deprecated: determined dynamically
PreVistaSupport bool PreVistaSupport bool
UsePerfCounterTime bool
Object []perfobject Object []perfobject
CountersRefreshInterval internal.Duration CountersRefreshInterval internal.Duration
UseWildcardsExpansion bool UseWildcardsExpansion bool
@@ -107,6 +110,12 @@ type counter struct {
counterHandle PDH_HCOUNTER counterHandle PDH_HCOUNTER
} }
type instanceGrouping struct {
name string
instance string
objectname string
}
var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec",
" ", "_", "%", "Percent", `\`, "") " ", "_", "%", "Percent", `\`, "")
@@ -147,7 +156,7 @@ func (m *Win_PerfCounters) SampleConfig() string {
func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error { func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error {
var err error var err error
var counterHandle PDH_HCOUNTER var counterHandle PDH_HCOUNTER
if !m.query.AddEnglishCounterSupported() { if !m.query.IsVistaOrNewer() {
counterHandle, err = m.query.AddCounterToQuery(counterPath) counterHandle, err = m.query.AddCounterToQuery(counterPath)
if err != nil { if err != nil {
return err return err
@@ -249,18 +258,15 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
m.counters = m.counters[:0] m.counters = m.counters[:0]
} }
err = m.query.Open() if err = m.query.Open(); err != nil {
if err != nil {
return err return err
} }
err = m.ParseConfig() if err = m.ParseConfig(); err != nil {
if err != nil {
return err return err
} }
//some counters need two data samples before computing a value //some counters need two data samples before computing a value
err = m.query.CollectData() if err = m.query.CollectData(); err != nil {
if err != nil {
return err return err
} }
m.lastRefreshed = time.Now() m.lastRefreshed = time.Now()
@@ -268,37 +274,31 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
type InstanceGrouping struct { var collectFields = make(map[instanceGrouping]map[string]interface{})
name string
instance string var timestamp time.Time
objectname string if m.UsePerfCounterTime && m.query.IsVistaOrNewer() {
timestamp, err = m.query.CollectDataWithTime()
if err != nil {
return err
}
} else {
timestamp = time.Now()
if err = m.query.CollectData(); err != nil {
return err
}
} }
var collectFields = make(map[InstanceGrouping]map[string]interface{})
err = m.query.CollectData()
if err != nil {
return err
}
// For iterate over the known metrics and get the samples. // For iterate over the known metrics and get the samples.
for _, metric := range m.counters { for _, metric := range m.counters {
// collect // collect
if m.UseWildcardsExpansion { if m.UseWildcardsExpansion {
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle) value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
if err == nil { if err == nil {
measurement := sanitizedChars.Replace(metric.measurement) addCounterMeasurement(metric, metric.instance, value, collectFields)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else { } else {
//ignore invalid data from as some counters from process instances returns this sometimes //ignore invalid data as some counters from process instances returns this sometimes
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE { if !isKnownCounterDataError(err) {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err) return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
} }
} }
@@ -326,18 +326,14 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
} }
if add { if add {
measurement := sanitizedChars.Replace(metric.measurement) addCounterMeasurement(metric, cValue.InstanceName, cValue.Value, collectFields)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, cValue.InstanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(cValue.Value)
} }
} }
} else {
//ignore invalid data as some counters from process instances returns this sometimes
if !isKnownCounterDataError(err) {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
} }
} }
} }
@@ -349,12 +345,33 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
if len(instance.instance) > 0 { if len(instance.instance) > 0 {
tags["instance"] = instance.instance tags["instance"] = instance.instance
} }
acc.AddFields(instance.name, fields, tags) acc.AddFields(instance.name, fields, tags, timestamp)
} }
return nil return nil
} }
func addCounterMeasurement(metric *counter, instanceName string, value float64, collectFields map[instanceGrouping]map[string]interface{}) {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = instanceGrouping{measurement, instanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
}
func isKnownCounterDataError(err error) bool {
if pdhErr, ok := err.(*PdhError); ok && (pdhErr.ErrorCode == PDH_INVALID_DATA ||
pdhErr.ErrorCode == PDH_CALC_NEGATIVE_VALUE ||
pdhErr.ErrorCode == PDH_CSTATUS_INVALID_DATA) {
return true
}
return false
}
func init() { func init() {
inputs.Add("win_perf_counters", func() telegraf.Input { inputs.Add("win_perf_counters", func() telegraf.Input {
return &Win_PerfCounters{query: &PerformanceQueryImpl{}, CountersRefreshInterval: internal.Duration{Duration: time.Second * 60}} return &Win_PerfCounters{query: &PerformanceQueryImpl{}, CountersRefreshInterval: internal.Duration{Duration: time.Second * 60}}

View File

@@ -70,6 +70,11 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
_, err = query.GetFormattedCounterValueDouble(hCounter) _, err = query.GetFormattedCounterValueDouble(hCounter)
require.NoError(t, err) require.NoError(t, err)
now := time.Now()
mtime, err := query.CollectDataWithTime()
require.NoError(t, err)
assert.True(t, mtime.Sub(now) < time.Second)
counterPath = "\\Process(*)\\% Processor Time" counterPath = "\\Process(*)\\% Processor Time"
paths, err := query.ExpandWildCardPath(counterPath) paths, err := query.ExpandWildCardPath(counterPath)
require.NoError(t, err) require.NoError(t, err)
@@ -98,6 +103,10 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
arr, err := query.GetFormattedCounterArrayDouble(hCounter) arr, err := query.GetFormattedCounterArrayDouble(hCounter)
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
time.Sleep(time.Second)
arr, err = query.GetFormattedCounterArrayDouble(hCounter)
}
require.NoError(t, err) require.NoError(t, err)
assert.True(t, len(arr) > 0, "Too") assert.True(t, len(arr) > 0, "Too")
@@ -596,7 +605,7 @@ func TestWinPerfcountersCollect2(t *testing.T) {
perfobjects[0] = PerfObject perfobjects[0] = PerfObject
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true} m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
var acc testutil.Accumulator var acc testutil.Accumulator
err := m.Gather(&acc) err := m.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -19,12 +19,14 @@ type testCounter struct {
value float64 value float64
} }
type FakePerformanceQuery struct { type FakePerformanceQuery struct {
counters map[string]testCounter counters map[string]testCounter
addEnglishSupported bool vistaAndNewer bool
expandPaths map[string][]string expandPaths map[string][]string
openCalled bool openCalled bool
} }
var MetricTime = time.Date(2018, 5, 28, 12, 0, 0, 0, time.UTC)
func (m *testCounter) ToCounterValue() *CounterValue { func (m *testCounter) ToCounterValue() *CounterValue {
_, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path) _, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path)
if inst == "" { if inst == "" {
@@ -102,8 +104,10 @@ func (m *FakePerformanceQuery) GetFormattedCounterValueDouble(counterHandle PDH_
} else { } else {
if counter.value == 0 { if counter.value == 0 {
return 0, NewPdhError(PDH_INVALID_DATA) return 0, NewPdhError(PDH_INVALID_DATA)
} else { } else if counter.value == -1 {
return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE) return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
} else {
return 0, NewPdhError(PDH_ACCESS_DENIED)
} }
} }
} }
@@ -138,8 +142,18 @@ func (m *FakePerformanceQuery) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
counters := make([]CounterValue, 0, len(e)) counters := make([]CounterValue, 0, len(e))
for _, p := range e { for _, p := range e {
counter := m.findCounterByPath(p) counter := m.findCounterByPath(p)
if counter != nil && counter.value > 0 { if counter != nil {
counters = append(counters, *counter.ToCounterValue()) if counter.value > 0 {
counters = append(counters, *counter.ToCounterValue())
} else {
if counter.value == 0 {
return nil, NewPdhError(PDH_INVALID_DATA)
} else if counter.value == -1 {
return nil, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
} else {
return nil, NewPdhError(PDH_ACCESS_DENIED)
}
}
} else { } else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p) return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p)
} }
@@ -160,8 +174,15 @@ func (m *FakePerformanceQuery) CollectData() error {
return nil return nil
} }
func (m *FakePerformanceQuery) AddEnglishCounterSupported() bool { func (m *FakePerformanceQuery) CollectDataWithTime() (time.Time, error) {
return m.addEnglishSupported if !m.openCalled {
return time.Now(), errors.New("CollectData: uninitialised query")
}
return MetricTime, nil
}
func (m *FakePerformanceQuery) IsVistaOrNewer() bool {
return m.vistaAndNewer
} }
func createPerfObject(measurement string, object string, instances []string, counters []string, failOnMissing bool, includeTotal bool) []perfobject { func createPerfObject(measurement string, object string, instances []string, counters []string, failOnMissing bool, includeTotal bool) []perfobject {
@@ -198,7 +219,7 @@ func TestAddItemSimple(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cps1[0]: cps1, cps1[0]: cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -216,7 +237,7 @@ func TestAddItemInvalidCountPath(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cps1[0]: {"\\O/C"}, cps1[0]: {"\\O/C"},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -238,7 +259,7 @@ func TestParseConfigBasic(t *testing.T) {
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
cps1[3]: {cps1[3]}, cps1[3]: {cps1[3]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -270,7 +291,7 @@ func TestParseConfigNoInstance(t *testing.T) {
cps1[0]: {cps1[0]}, cps1[0]: {cps1[0]},
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -303,7 +324,7 @@ func TestParseConfigInvalidCounterError(t *testing.T) {
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -334,7 +355,7 @@ func TestParseConfigInvalidCounterNoError(t *testing.T) {
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -364,7 +385,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -381,7 +402,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -401,7 +422,7 @@ func TestParseConfigExpand(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
@@ -425,7 +446,7 @@ func TestSimpleGather(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cp1: {cp1}, cp1: {cp1},
}, },
addEnglishSupported: false, vistaAndNewer: false,
}} }}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)
@@ -449,7 +470,65 @@ func TestSimpleGather(t *testing.T) {
err = m.Gather(&acc2) err = m.Gather(&acc2)
require.NoError(t, err) require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1) acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
}
func TestSimpleGatherWithTimestamp(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
cp1 := "\\O(I)\\C"
m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap([]string{cp1}, []float64{1.2}),
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: true,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.NoError(t, err)
fields1 := map[string]interface{}{
"C": float32(1.2),
}
tags1 := map[string]string{
"instance": "I",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
assert.True(t, acc1.HasTimestamp(measurement, MetricTime))
}
func TestGatherError(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
cp1 := "\\O(I)\\C"
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap([]string{cp1}, []float64{-2}),
expandPaths: map[string][]string{
cp1: {cp1},
},
vistaAndNewer: false,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.Error(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.Error(t, err)
} }
func TestGatherInvalidDataIgnore(t *testing.T) { func TestGatherInvalidDataIgnore(t *testing.T) {
@@ -467,7 +546,7 @@ func TestGatherInvalidDataIgnore(t *testing.T) {
cps1[1]: {cps1[1]}, cps1[1]: {cps1[1]},
cps1[2]: {cps1[2]}, cps1[2]: {cps1[2]},
}, },
addEnglishSupported: false, vistaAndNewer: false,
}} }}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)
@@ -506,7 +585,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}} m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
@@ -540,7 +619,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps2, "\\O(*)\\*": cps2,
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m.query = fpm m.query = fpm
fpm.Open() fpm.Open()
@@ -592,7 +671,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C1": {cps1[0], cps1[2]}, "\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]}, "\\O(*)\\C2": {cps1[1], cps1[3]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}} m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
@@ -628,7 +707,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]}, "\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]},
"\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]}, "\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m.query = fpm m.query = fpm
fpm.Open() fpm.Open()
@@ -662,7 +741,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
"\\O(*)\\C2": {cps3[1], cps3[4]}, "\\O(*)\\C2": {cps3[1], cps3[4]},
"\\O(*)\\C3": {cps3[2], cps3[5]}, "\\O(*)\\C3": {cps3[2], cps3[5]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
} }
m.query = fpm m.query = fpm
m.Object = perfObjects m.Object = perfObjects
@@ -710,7 +789,7 @@ func TestGatherTotalNoExpansion(t *testing.T) {
"\\O(*)\\C1": {cps1[0], cps1[2]}, "\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]}, "\\O(*)\\C2": {cps1[1], cps1[3]},
}, },
addEnglishSupported: true, vistaAndNewer: true,
}} }}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)

View File

@@ -22,6 +22,7 @@ var (
`%`, "-", `%`, "-",
"#", "-", "#", "-",
"$", "-") "$", "-")
defaultHttpPath = "/api/put"
defaultSeperator = "_" defaultSeperator = "_"
) )
@@ -31,7 +32,8 @@ type OpenTSDB struct {
Host string Host string
Port int Port int
HttpBatchSize int HttpBatchSize int // deprecated httpBatchSize form in 1.8
HttpPath string
Debug bool Debug bool
@@ -52,7 +54,11 @@ var sampleConfig = `
## Number of data points to send to OpenTSDB in Http requests. ## Number of data points to send to OpenTSDB in Http requests.
## Not used with telnet API. ## Not used with telnet API.
httpBatchSize = 50 http_batch_size = 50
## URI Path for Http requests to OpenTSDB.
## Used in cases where OpenTSDB is located behind a reverse proxy.
http_path = "/api/put"
## Debug true - Prints OpenTSDB communication ## Debug true - Prints OpenTSDB communication
debug = false debug = false
@@ -121,6 +127,7 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
Scheme: u.Scheme, Scheme: u.Scheme,
User: u.User, User: u.User,
BatchSize: o.HttpBatchSize, BatchSize: o.HttpBatchSize,
Path: o.HttpPath,
Debug: o.Debug, Debug: o.Debug,
} }
@@ -260,6 +267,7 @@ func sanitize(value string) string {
func init() { func init() {
outputs.Add("opentsdb", func() telegraf.Output { outputs.Add("opentsdb", func() telegraf.Output {
return &OpenTSDB{ return &OpenTSDB{
HttpPath: defaultHttpPath,
Separator: defaultSeperator, Separator: defaultSeperator,
} }
}) })

View File

@@ -26,6 +26,7 @@ type openTSDBHttp struct {
Scheme string Scheme string
User *url.Userinfo User *url.Userinfo
BatchSize int BatchSize int
Path string
Debug bool Debug bool
metricCounter int metricCounter int
@@ -123,7 +124,7 @@ func (o *openTSDBHttp) flush() error {
Scheme: o.Scheme, Scheme: o.Scheme,
User: o.User, User: o.User,
Host: fmt.Sprintf("%s:%d", o.Host, o.Port), Host: fmt.Sprintf("%s:%d", o.Host, o.Port),
Path: "/api/put", Path: o.Path,
} }
if o.Debug { if o.Debug {

View File

@@ -156,6 +156,7 @@ func BenchmarkHttpSend(b *testing.B) {
Port: port, Port: port,
Prefix: "", Prefix: "",
HttpBatchSize: BatchSize, HttpBatchSize: BatchSize,
HttpPath: "/api/put",
} }
b.ResetTimer() b.ResetTimer()

View File

@@ -1,73 +0,0 @@
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}

View File

@@ -1,78 +0,0 @@
package grok
// DEFAULT_PATTERNS SHOULD BE KEPT IN-SYNC WITH patterns/influx-patterns
const DEFAULT_PATTERNS = `
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPV6}|%{IPV4}|%{HOSTNAME}|%{HOSTPORT})
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
`

View File

@@ -1,527 +0,0 @@
package grok
import (
"bufio"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/vjeantet/grok"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
var timeLayouts = map[string]string{
"ts-ansic": "Mon Jan _2 15:04:05 2006",
"ts-unix": "Mon Jan _2 15:04:05 MST 2006",
"ts-ruby": "Mon Jan 02 15:04:05 -0700 2006",
"ts-rfc822": "02 Jan 06 15:04 MST",
"ts-rfc822z": "02 Jan 06 15:04 -0700", // RFC822 with numeric zone
"ts-rfc850": "Monday, 02-Jan-06 15:04:05 MST",
"ts-rfc1123": "Mon, 02 Jan 2006 15:04:05 MST",
"ts-rfc1123z": "Mon, 02 Jan 2006 15:04:05 -0700", // RFC1123 with numeric zone
"ts-rfc3339": "2006-01-02T15:04:05Z07:00",
"ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00",
"ts-httpd": "02/Jan/2006:15:04:05 -0700",
// These three are not exactly "layouts", but they are special cases that
// will get handled in the ParseLine function.
"ts-epoch": "EPOCH",
"ts-epochnano": "EPOCH_NANO",
"ts-syslog": "SYSLOG_TIMESTAMP",
"ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts.
}
const (
INT = "int"
TAG = "tag"
FLOAT = "float"
STRING = "string"
DURATION = "duration"
DROP = "drop"
EPOCH = "EPOCH"
EPOCH_NANO = "EPOCH_NANO"
SYSLOG_TIMESTAMP = "SYSLOG_TIMESTAMP"
GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP"
)
var (
// matches named captures that contain a modifier.
// ie,
// %{NUMBER:bytes:int}
// %{IPORHOST:clientip:tag}
// %{HTTPDATE:ts1:ts-http}
// %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"}
modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`)
// matches a plain pattern name. ie, %{NUMBER}
patternOnlyRe = regexp.MustCompile(`%{(\w+)}`)
)
// Parser is the primary struct to handle and grok-patterns defined in the config toml
type Parser struct {
Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
Measurement string
// Timezone is an optional component to help render log dates to
// your chosen zone.
// Default: "" which renders UTC
// Options are as follows:
// 1. Local -- interpret based on machine localtime
// 2. "America/Chicago" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
// 3. UTC -- or blank/unspecified, will return timestamp in UTC
Timezone string
loc *time.Location
// typeMap is a map of patterns -> capture name -> modifier,
// ie, {
// "%{TESTLOG}":
// {
// "bytes": "int",
// "clientip": "tag"
// }
// }
typeMap map[string]map[string]string
// tsMap is a map of patterns -> capture name -> timestamp layout.
// ie, {
// "%{TESTLOG}":
// {
// "httptime": "02/Jan/2006:15:04:05 -0700"
// }
// }
tsMap map[string]map[string]string
// patterns is a map of all of the parsed patterns from CustomPatterns
// and CustomPatternFiles.
// ie, {
// "DURATION": "%{NUMBER}[nuµm]?s"
// "RESPONSE_CODE": "%{NUMBER:rc:tag}"
// }
patterns map[string]string
// foundTsLayouts is a slice of timestamp patterns that have been found
// in the log lines. This slice gets updated if the user uses the generic
// 'ts' modifier for timestamps. This slice is checked first for matches,
// so that previously-matched layouts get priority over all other timestamp
// layouts.
foundTsLayouts []string
timeFunc func() time.Time
g *grok.Grok
tsModder *tsModder
}
// Compile is a bound method to Parser which will process the options for our parser
func (p *Parser) Compile() error {
p.typeMap = make(map[string]map[string]string)
p.tsMap = make(map[string]map[string]string)
p.patterns = make(map[string]string)
p.tsModder = &tsModder{}
var err error
p.g, err = grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true})
if err != nil {
return err
}
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.NamedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns {
pattern = strings.TrimSpace(pattern)
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.NamedPatterns = append(p.NamedPatterns, "%{"+name+"}")
}
if len(p.NamedPatterns) == 0 {
return fmt.Errorf("pattern required")
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}
// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, fileErr := os.Open(filename)
if fileErr != nil {
return fileErr
}
scanner := bufio.NewScanner(bufio.NewReader(file))
p.addCustomPatterns(scanner)
}
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
p.loc, err = time.LoadLocation(p.Timezone)
if err != nil {
log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone)
p.loc, _ = time.LoadLocation("UTC")
}
if p.timeFunc == nil {
p.timeFunc = time.Now
}
return p.compileCustomPatterns()
}
// ParseLine is the primary function to process individual lines, returning the metrics
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var err error
// values are the parsed fields from the log line
var values map[string]string
// the matching pattern string
var patternName string
for _, pattern := range p.NamedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err
}
if len(values) != 0 {
patternName = pattern
break
}
}
if len(values) == 0 {
log.Printf("D! Grok no match found for: %q", line)
return nil, nil
}
fields := make(map[string]interface{})
tags := make(map[string]string)
timestamp := time.Now()
for k, v := range values {
if k == "" || v == "" {
continue
}
// t is the modifier of the field
var t string
// check if pattern has some modifiers
if types, ok := p.typeMap[patternName]; ok {
t = types[k]
}
// if we didn't find a modifier, check if we have a timestamp layout
if t == "" {
if ts, ok := p.tsMap[patternName]; ok {
// check if the modifier is a timestamp layout
if layout, ok := ts[k]; ok {
t = layout
}
}
}
// if we didn't find a type OR timestamp modifier, assume string
if t == "" {
t = STRING
}
switch t {
case INT:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
fields[k] = iv
}
case FLOAT:
fv, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("E! Error parsing %s to float: %s", v, err)
} else {
fields[k] = fv
}
case DURATION:
d, err := time.ParseDuration(v)
if err != nil {
log.Printf("E! Error parsing %s to duration: %s", v, err)
} else {
fields[k] = int64(d)
}
case TAG:
tags[k] = v
case STRING:
fields[k] = strings.Trim(v, `"`)
case EPOCH:
parts := strings.SplitN(v, ".", 2)
if len(parts) == 0 {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
sec, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
ts := time.Unix(sec, 0)
if len(parts) == 2 {
padded := fmt.Sprintf("%-9s", parts[1])
nsString := strings.Replace(padded[:9], " ", "0", -1)
nanosec, err := strconv.ParseInt(nsString, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
ts = ts.Add(time.Duration(nanosec) * time.Nanosecond)
}
timestamp = ts
case EPOCH_NANO:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
timestamp = time.Unix(0, iv)
}
case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
if err == nil {
if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0)
}
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
case GENERIC_TIMESTAMP:
var foundTs bool
// first try timestamp layouts that we've already found
for _, layout := range p.foundTsLayouts {
ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil {
timestamp = ts
foundTs = true
break
}
}
// if we haven't found a timestamp layout yet, try all timestamp
// layouts.
if !foundTs {
for _, layout := range timeLayouts {
ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil {
timestamp = ts
foundTs = true
p.foundTsLayouts = append(p.foundTsLayouts, layout)
break
}
}
}
// if we still haven't found a timestamp layout, log it and we will
// just use time.Now()
if !foundTs {
log.Printf("E! Error parsing timestamp [%s], could not find any "+
"suitable time layouts.", v)
}
case DROP:
// goodbye!
default:
ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil {
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
}
}
if len(fields) == 0 {
return nil, fmt.Errorf("logparser_grok: must have one or more fields")
}
return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
lines := strings.Split(string(buf), "\n")
var metrics []telegraf.Metric
for _, line := range lines {
m, err := p.ParseLine(line)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
//needs implementation
}
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) > 0 && line[0] != '#' {
names := strings.SplitN(line, " ", 2)
p.patterns[names[0]] = names[1]
}
}
}
func (p *Parser) compileCustomPatterns() error {
var err error
// check if the pattern contains a subpattern that is already defined
// replace it with the subpattern for modifier inheritance.
for i := 0; i < 2; i++ {
for name, pattern := range p.patterns {
subNames := patternOnlyRe.FindAllStringSubmatch(pattern, -1)
for _, subName := range subNames {
if subPattern, ok := p.patterns[subName[1]]; ok {
pattern = strings.Replace(pattern, subName[0], subPattern, 1)
}
}
p.patterns[name] = pattern
}
}
// check if pattern contains modifiers. Parse them out if it does.
for name, pattern := range p.patterns {
if modifierRe.MatchString(pattern) {
// this pattern has modifiers, so parse out the modifiers
pattern, err = p.parseTypedCaptures(name, pattern)
if err != nil {
return err
}
p.patterns[name] = pattern
}
}
return p.g.AddPatternsFromMap(p.patterns)
}
// parseTypedCaptures parses the capture modifiers, and then deletes the
// modifier from the line so that it is a valid "grok" pattern again.
// ie,
// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int)
// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag)
func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
matches := modifierRe.FindAllStringSubmatch(pattern, -1)
// grab the name of the capture pattern
patternName := "%{" + name + "}"
// create type map for this pattern
p.typeMap[patternName] = make(map[string]string)
p.tsMap[patternName] = make(map[string]string)
// boolean to verify that each pattern only has a single ts- data type.
hasTimestamp := false
for _, match := range matches {
// regex capture 1 is the name of the capture
// regex capture 2 is the modifier of the capture
if strings.HasPrefix(match[2], "ts") {
if hasTimestamp {
return pattern, fmt.Errorf("logparser pattern compile error: "+
"Each pattern is allowed only one named "+
"timestamp data type. pattern: %s", pattern)
}
if layout, ok := timeLayouts[match[2]]; ok {
// built-in time format
p.tsMap[patternName][match[1]] = layout
} else {
// custom time format
p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`)
}
hasTimestamp = true
} else {
p.typeMap[patternName][match[1]] = match[2]
}
// the modifier is not a valid part of a "grok" pattern, so remove it
// from the pattern.
pattern = strings.Replace(pattern, ":"+match[2]+"}", "}", 1)
}
return pattern, nil
}
// tsModder is a struct for incrementing identical timestamps of log lines
// so that we don't push identical metrics that will get overwritten.
type tsModder struct {
dupe time.Time
last time.Time
incr time.Duration
incrn time.Duration
rollover time.Duration
}
// tsMod increments the given timestamp one unit more from the previous
// duplicate timestamp.
// the increment unit is determined as the next smallest time unit below the
// most significant time unit of ts.
// ie, if the input is at ms precision, it will increment it 1µs.
func (t *tsModder) tsMod(ts time.Time) time.Time {
defer func() { t.last = ts }()
// don't mod the time if we don't need to
if t.last.IsZero() || ts.IsZero() {
t.incrn = 0
t.rollover = 0
return ts
}
if !ts.Equal(t.last) && !ts.Equal(t.dupe) {
t.incr = 0
t.incrn = 0
t.rollover = 0
return ts
}
if ts.Equal(t.last) {
t.dupe = ts
}
if ts.Equal(t.dupe) && t.incr == time.Duration(0) {
tsNano := ts.UnixNano()
d := int64(10)
counter := 1
for {
a := tsNano % d
if a > 0 {
break
}
d = d * 10
counter++
}
switch {
case counter <= 6:
t.incr = time.Nanosecond
case counter <= 9:
t.incr = time.Microsecond
case counter > 9:
t.incr = time.Millisecond
}
}
t.incrn++
if t.incrn == 999 && t.incr > time.Nanosecond {
t.rollover = t.incr * t.incrn
t.incrn = 1
t.incr = t.incr / 1000
if t.incr < time.Nanosecond {
t.incr = time.Nanosecond
}
}
return ts.Add(t.incr*t.incrn + t.rollover)
}

View File

@@ -1,19 +0,0 @@
package grok
import (
"log"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGrokParse(t *testing.T) {
parser := Parser{
Measurement: "t_met",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
parser.Compile()
metrics, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`))
log.Printf("metric_tags: %v, metric_fields: %v", metrics[0].Tags(), metrics[0].Fields())
assert.NoError(t, err)
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@@ -88,12 +87,6 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values // an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags // used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string DropwizardTagPathsMap map[string]string
//grok patterns
Patterns []string
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@@ -127,36 +120,12 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "grok":
parser, err = NewGrokParser(
config.MetricName,
config.Patterns,
config.NamedPatterns,
config.CustomPatterns,
config.CustomPatternFiles)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return parser, err return parser, err
} }
func NewGrokParser(metricName string,
patterns []string,
nPatterns []string,
cPatterns string,
cPatternFiles []string) (Parser, error) {
parser := grok.Parser{
Measurement: metricName,
Patterns: patterns,
NamedPatterns: nPatterns,
CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles,
}
parser.Compile()
return &parser, nil
}
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,