Compare commits
4 Commits
bugfix/437
...
logparser-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f88f665cd9 | ||
|
|
a2df042d92 | ||
|
|
c7a72b9a9d | ||
|
|
09f884b4f0 |
@@ -16,11 +16,11 @@ jobs:
|
||||
steps:
|
||||
- checkout
|
||||
- restore_cache:
|
||||
key: vendor-{{ checksum "Gopkg.lock" }}
|
||||
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }}
|
||||
- run: 'make deps'
|
||||
- save_cache:
|
||||
name: 'vendored deps'
|
||||
key: vendor-{{ checksum "Gopkg.lock" }}
|
||||
key: vendor-{{ .Branch }}-{{ checksum "Gopkg.lock" }}
|
||||
paths:
|
||||
- './vendor'
|
||||
- persist_to_workspace:
|
||||
|
||||
18
CHANGELOG.md
18
CHANGELOG.md
@@ -6,10 +6,6 @@
|
||||
|
||||
- [tengine](./plugins/inputs/tengine/README.md) - Contributed by @ertaoxu
|
||||
|
||||
### New Processors
|
||||
|
||||
- [enum](./plugins/processors/enum/README.md) - Contributed by @KarstenSchnitter
|
||||
|
||||
### New Aggregators
|
||||
|
||||
- [valuecounter](./plugins/aggregators/valuecounter/README.md) - Contributed by @piotr1212
|
||||
@@ -26,26 +22,14 @@
|
||||
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
|
||||
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
|
||||
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
|
||||
- [#4322](https://github.com/influxdata/telegraf/pull/4322): Add log message when tail is added or removed from a file.
|
||||
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
|
||||
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
|
||||
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
|
||||
- [#4352](https://github.com/influxdata/telegraf/pull/4352): Gather IPMI metrics concurrently.
|
||||
- [#4362](https://github.com/influxdata/telegraf/pull/4362): Add mongo document and connection metrics.
|
||||
- [#3772](https://github.com/influxdata/telegraf/pull/3772): Add Enum Processor.
|
||||
|
||||
## v1.7.1 [2018-07-03]
|
||||
## v1.7.1 [unreleased]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
|
||||
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
|
||||
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
|
||||
- [#4334](https://github.com/influxdata/telegraf/pull/4334): Fix syslog timestamp parsing with single digit day of month.
|
||||
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
|
||||
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
|
||||
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
|
||||
- [#4316](https://github.com/influxdata/telegraf/issues/4316): Fix incorrect precision being applied to metric in http_listener.
|
||||
|
||||
## v1.7 [2018-06-12]
|
||||
|
||||
|
||||
@@ -100,13 +100,6 @@ func init() {
|
||||
}
|
||||
```
|
||||
|
||||
### Input Plugin Development
|
||||
|
||||
* Run `make static` followed by `make plugin-[pluginName]` to spin up a docker dev environment
|
||||
using docker-compose.
|
||||
* ***[Optional]*** When developing a plugin, add a `dev` directory with a `docker-compose.yml` and `telegraf.conf`
|
||||
as well as any other supporting files, where sensible.
|
||||
|
||||
## Adding Typed Metrics
|
||||
|
||||
In addition the the `AddFields` function, the accumulator also supports an
|
||||
|
||||
13
Makefile
13
Makefile
@@ -92,15 +92,4 @@ docker-image:
|
||||
plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl
|
||||
ragel -Z -G2 $^ -o $@
|
||||
|
||||
static:
|
||||
@echo "Building static linux binary..."
|
||||
@CGO_ENABLED=0 \
|
||||
GOOS=linux \
|
||||
GOARCH=amd64 \
|
||||
go build -ldflags "$(LDFLAGS)" ./cmd/telegraf
|
||||
|
||||
plugin-%:
|
||||
@echo "Starting dev environment for $${$(@)} input plugin..."
|
||||
@docker-compose -f plugins/inputs/$${$(@)}/dev/docker-compose.yml up
|
||||
|
||||
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static
|
||||
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64
|
||||
|
||||
@@ -723,10 +723,6 @@
|
||||
# ## Not used with telnet API.
|
||||
# httpBatchSize = 50
|
||||
#
|
||||
# ## URI Path for Http requests to OpenTSDB.
|
||||
# ## Used in cases where OpenTSDB is located behind a reverse proxy.
|
||||
# httpPath = "/api/put"
|
||||
#
|
||||
# ## Debug true - Prints OpenTSDB communication
|
||||
# debug = false
|
||||
#
|
||||
|
||||
@@ -246,8 +246,6 @@ func convert(in interface{}) (float64, bool) {
|
||||
return v, true
|
||||
case int64:
|
||||
return float64(v), true
|
||||
case uint64:
|
||||
return float64(v), true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ var m2, _ = metric.New("m1",
|
||||
"c": float64(4),
|
||||
"d": float64(6),
|
||||
"e": float64(200),
|
||||
"f": uint64(200),
|
||||
"ignoreme": "string",
|
||||
"andme": true,
|
||||
},
|
||||
@@ -82,10 +81,6 @@ func TestBasicStatsWithPeriod(t *testing.T) {
|
||||
"e_max": float64(200),
|
||||
"e_min": float64(200),
|
||||
"e_mean": float64(200),
|
||||
"f_count": float64(1), //f
|
||||
"f_max": float64(200),
|
||||
"f_min": float64(200),
|
||||
"f_mean": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -149,10 +144,6 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
|
||||
"e_max": float64(200),
|
||||
"e_min": float64(200),
|
||||
"e_mean": float64(200),
|
||||
"f_count": float64(1), //f
|
||||
"f_max": float64(200),
|
||||
"f_min": float64(200),
|
||||
"f_mean": float64(200),
|
||||
}
|
||||
expectedTags = map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -178,7 +169,6 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
|
||||
"c_count": float64(2),
|
||||
"d_count": float64(2),
|
||||
"e_count": float64(1),
|
||||
"f_count": float64(1),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -204,7 +194,6 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
|
||||
"c_min": float64(2),
|
||||
"d_min": float64(2),
|
||||
"e_min": float64(200),
|
||||
"f_min": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -230,7 +219,6 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
|
||||
"c_max": float64(4),
|
||||
"d_max": float64(6),
|
||||
"e_max": float64(200),
|
||||
"f_max": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -256,7 +244,6 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
|
||||
"c_mean": float64(3),
|
||||
"d_mean": float64(4),
|
||||
"e_mean": float64(200),
|
||||
"f_mean": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -282,7 +269,6 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
|
||||
"c_sum": float64(6),
|
||||
"d_sum": float64(8),
|
||||
"e_sum": float64(200),
|
||||
"f_sum": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -413,8 +399,6 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
|
||||
"d_min": float64(2),
|
||||
"e_max": float64(200), //e
|
||||
"e_min": float64(200),
|
||||
"f_max": float64(200), //f
|
||||
"f_min": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -466,11 +450,6 @@ func TestBasicStatsWithAllStats(t *testing.T) {
|
||||
"e_min": float64(200),
|
||||
"e_mean": float64(200),
|
||||
"e_sum": float64(200),
|
||||
"f_count": float64(1), //f
|
||||
"f_max": float64(200),
|
||||
"f_min": float64(200),
|
||||
"f_mean": float64(200),
|
||||
"f_sum": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
|
||||
@@ -107,8 +107,6 @@ func convert(in interface{}) (float64, bool) {
|
||||
return v, true
|
||||
case int64:
|
||||
return float64(v), true
|
||||
case uint64:
|
||||
return float64(v), true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
|
||||
@@ -38,7 +38,6 @@ var m2, _ = metric.New("m1",
|
||||
"i": float64(1),
|
||||
"j": float64(1),
|
||||
"k": float64(200),
|
||||
"l": uint64(200),
|
||||
"ignoreme": "string",
|
||||
"andme": true,
|
||||
},
|
||||
@@ -86,8 +85,6 @@ func TestMinMaxWithPeriod(t *testing.T) {
|
||||
"j_min": float64(1),
|
||||
"k_max": float64(200),
|
||||
"k_min": float64(200),
|
||||
"l_max": float64(200),
|
||||
"l_min": float64(200),
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"foo": "bar",
|
||||
@@ -157,8 +154,6 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
|
||||
"j_min": float64(1),
|
||||
"k_max": float64(200),
|
||||
"k_min": float64(200),
|
||||
"l_max": float64(200),
|
||||
"l_min": float64(200),
|
||||
}
|
||||
expectedTags = map[string]string{
|
||||
"foo": "bar",
|
||||
|
||||
@@ -366,22 +366,9 @@ func (d *Docker) gatherContainer(
|
||||
var v *types.StatsJSON
|
||||
// Parse container name
|
||||
cname := "unknown"
|
||||
match := false
|
||||
if len(container.Names) == 0 { // for tests
|
||||
match = true
|
||||
}
|
||||
|
||||
for i := range container.Names {
|
||||
if !match {
|
||||
match = d.containerFilter.Match(strings.TrimPrefix(container.Names[i], "/"))
|
||||
if match {
|
||||
cname = strings.TrimPrefix(container.Names[i], "/")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !match {
|
||||
return nil
|
||||
if len(container.Names) > 0 {
|
||||
// Not sure what to do with other names, just take the first.
|
||||
cname = strings.TrimPrefix(container.Names[0], "/")
|
||||
}
|
||||
|
||||
// the image name sometimes has a version part, or a private repo
|
||||
@@ -404,6 +391,10 @@ func (d *Docker) gatherContainer(
|
||||
"container_version": imageVersion,
|
||||
}
|
||||
|
||||
if !d.containerFilter.Match(cname) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
|
||||
defer cancel()
|
||||
r, err := d.client.ContainerStats(ctx, container.ID, false)
|
||||
@@ -420,11 +411,6 @@ func (d *Docker) gatherContainer(
|
||||
}
|
||||
daemonOSType := r.OSType
|
||||
|
||||
// use common (printed at `docker ps`) name for container
|
||||
if cname != strings.TrimPrefix(v.Name, "/") && v.Name != "" {
|
||||
tags["container_name"] = strings.TrimPrefix(v.Name, "/")
|
||||
}
|
||||
|
||||
// Add labels to tags
|
||||
for k, label := range container.Labels {
|
||||
if d.labelFilter.Match(k) {
|
||||
@@ -475,12 +461,12 @@ func (d *Docker) gatherContainer(
|
||||
acc.AddFields("docker_container_health", healthfields, tags, time.Now())
|
||||
}
|
||||
|
||||
parseContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total, daemonOSType)
|
||||
gatherContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total, daemonOSType)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseContainerStats(
|
||||
func gatherContainerStats(
|
||||
stat *types.StatsJSON,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
|
||||
@@ -107,7 +107,7 @@ func TestDockerGatherContainerStats(t *testing.T) {
|
||||
"container_image": "redis/image",
|
||||
}
|
||||
|
||||
parseContainerStats(stats, &acc, tags, "123456789", true, true, "linux")
|
||||
gatherContainerStats(stats, &acc, tags, "123456789", true, true, "linux")
|
||||
|
||||
// test docker_container_net measurement
|
||||
netfields := map[string]interface{}{
|
||||
|
||||
@@ -24,14 +24,11 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
|
||||
|
||||
- fibaro
|
||||
- tags:
|
||||
- deviceId (device id)
|
||||
- section (section name)
|
||||
- room (room name)
|
||||
- name (device name)
|
||||
- type (device type)
|
||||
- fields:
|
||||
- energy (float, when available from device)
|
||||
- power (float, when available from device)
|
||||
- value (float)
|
||||
- value2 (float, when available from device)
|
||||
|
||||
@@ -39,17 +36,16 @@ Those values could be true (1) or false (0) for switches, percentage for dimmers
|
||||
### Example Output:
|
||||
|
||||
```
|
||||
fibaro,deviceId=9,host=vm1,name=Fenêtre\ haute,room=Cuisine,section=Cuisine,type=com.fibaro.FGRM222 energy=2.04,power=0.7,value=99,value2=99 1529996807000000000
|
||||
fibaro,deviceId=10,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1529996807000000000
|
||||
fibaro,deviceId=13,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 energy=4.33,power=0.7,value=99,value2=99 1529996807000000000
|
||||
fibaro,deviceId=21,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1529996807000000000
|
||||
fibaro,deviceId=90,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1529996807000000000
|
||||
fibaro,deviceId=163,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=21.62 1529996807000000000
|
||||
fibaro,deviceId=191,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1529996807000000000
|
||||
fibaro,deviceId=193,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=195 1529996807000000000
|
||||
fibaro,deviceId=200,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1529996807000000000
|
||||
fibaro,deviceId=220,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=536 1529996807000000000
|
||||
fibaro,deviceId=221,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=61 1529996807000000000
|
||||
fibaro,deviceId=222,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1013.7 1529996807000000000
|
||||
fibaro,deviceId=223,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=44 1529996807000000000
|
||||
fibaro,host=vm1,name=Escaliers,room=Dégagement,section=Pièces\ communes,type=com.fibaro.binarySwitch value=0 1523351010000000000
|
||||
fibaro,host=vm1,name=Porte\ fenêtre,room=Salon,section=Pièces\ communes,type=com.fibaro.FGRM222 value=99,value2=99 1523351010000000000
|
||||
fibaro,host=vm1,name=LED\ îlot\ central,room=Cuisine,section=Cuisine,type=com.fibaro.binarySwitch value=0 1523351010000000000
|
||||
fibaro,host=vm1,name=Détérioration,room=Entrée,section=Pièces\ communes,type=com.fibaro.heatDetector value=0 1523351010000000000
|
||||
fibaro,host=vm1,name=Température,room=Cave,section=Cave,type=com.fibaro.temperatureSensor value=17.87 1523351010000000000
|
||||
fibaro,host=vm1,name=Présence,room=Garde-manger,section=Cuisine,type=com.fibaro.FGMS001 value=1 1523351010000000000
|
||||
fibaro,host=vm1,name=Luminosité,room=Garde-manger,section=Cuisine,type=com.fibaro.lightSensor value=92 1523351010000000000
|
||||
fibaro,host=vm1,name=Etat,room=Garage,section=Extérieur,type=com.fibaro.doorSensor value=0 1523351010000000000
|
||||
fibaro,host=vm1,name=CO2\ (ppm),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=880 1523351010000000000
|
||||
fibaro,host=vm1,name=Humidité\ (%),room=Salon,section=Pièces\ communes,type=com.fibaro.humiditySensor value=53 1523351010000000000
|
||||
fibaro,host=vm1,name=Pression\ (mb),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=1006.9 1523351010000000000
|
||||
fibaro,host=vm1,name=Bruit\ (db),room=Salon,section=Pièces\ communes,type=com.fibaro.multilevelSensor value=58 1523351010000000000
|
||||
```
|
||||
|
||||
@@ -67,8 +67,6 @@ type Devices struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
Properties struct {
|
||||
Dead interface{} `json:"dead"`
|
||||
Energy interface{} `json:"energy"`
|
||||
Power interface{} `json:"power"`
|
||||
Value interface{} `json:"value"`
|
||||
Value2 interface{} `json:"value2"`
|
||||
} `json:"properties"`
|
||||
@@ -164,26 +162,13 @@ func (f *Fibaro) Gather(acc telegraf.Accumulator) error {
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"deviceId": strconv.FormatUint(uint64(device.ID), 10),
|
||||
"section": sections[rooms[device.RoomID].SectionID],
|
||||
"room": rooms[device.RoomID].Name,
|
||||
"name": device.Name,
|
||||
"type": device.Type,
|
||||
"section": sections[rooms[device.RoomID].SectionID],
|
||||
"room": rooms[device.RoomID].Name,
|
||||
"name": device.Name,
|
||||
"type": device.Type,
|
||||
}
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
if device.Properties.Energy != nil {
|
||||
if fValue, err := strconv.ParseFloat(device.Properties.Energy.(string), 64); err == nil {
|
||||
fields["energy"] = fValue
|
||||
}
|
||||
}
|
||||
|
||||
if device.Properties.Power != nil {
|
||||
if fValue, err := strconv.ParseFloat(device.Properties.Power.(string), 64); err == nil {
|
||||
fields["power"] = fValue
|
||||
}
|
||||
}
|
||||
|
||||
if device.Properties.Value != nil {
|
||||
value := device.Properties.Value
|
||||
switch value {
|
||||
|
||||
@@ -119,8 +119,6 @@ const devicesJSON = `
|
||||
"type": "com.fibaro.FGRM222",
|
||||
"enabled": true,
|
||||
"properties": {
|
||||
"energy": "4.33",
|
||||
"power": "0.7",
|
||||
"dead": "false",
|
||||
"value": "50",
|
||||
"value2": "75"
|
||||
@@ -180,27 +178,27 @@ func TestJSONSuccess(t *testing.T) {
|
||||
assert.Equal(t, uint64(5), acc.NMetrics())
|
||||
|
||||
// Ensure fields / values are correct - Device 1
|
||||
tags := map[string]string{"deviceId": "1", "section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}
|
||||
tags := map[string]string{"section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}
|
||||
fields := map[string]interface{}{"value": float64(0)}
|
||||
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
|
||||
|
||||
// Ensure fields / values are correct - Device 2
|
||||
tags = map[string]string{"deviceId": "2", "section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"}
|
||||
tags = map[string]string{"section": "Section 2", "room": "Room 2", "name": "Device 2", "type": "com.fibaro.binarySwitch"}
|
||||
fields = map[string]interface{}{"value": float64(1)}
|
||||
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
|
||||
|
||||
// Ensure fields / values are correct - Device 3
|
||||
tags = map[string]string{"deviceId": "3", "section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"}
|
||||
tags = map[string]string{"section": "Section 3", "room": "Room 3", "name": "Device 3", "type": "com.fibaro.multilevelSwitch"}
|
||||
fields = map[string]interface{}{"value": float64(67)}
|
||||
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
|
||||
|
||||
// Ensure fields / values are correct - Device 4
|
||||
tags = map[string]string{"deviceId": "4", "section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"}
|
||||
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 4", "type": "com.fibaro.temperatureSensor"}
|
||||
fields = map[string]interface{}{"value": float64(22.8)}
|
||||
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
|
||||
|
||||
// Ensure fields / values are correct - Device 5
|
||||
tags = map[string]string{"deviceId": "5", "section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"}
|
||||
fields = map[string]interface{}{"energy": float64(4.33), "power": float64(0.7), "value": float64(50), "value2": float64(75)}
|
||||
tags = map[string]string{"section": "Section 3", "room": "Room 4", "name": "Device 5", "type": "com.fibaro.FGRM222"}
|
||||
fields = map[string]interface{}{"value": float64(50), "value2": float64(75)}
|
||||
acc.AssertContainsTaggedFields(t, "fibaro", fields, tags)
|
||||
}
|
||||
|
||||
@@ -343,9 +343,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
|
||||
h.handler.SetTimeFunc(func() time.Time { return t })
|
||||
metrics, err := h.parser.Parse(b)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
@@ -62,18 +61,13 @@ func (m *Ipmi) Gather(acc telegraf.Accumulator) error {
|
||||
}
|
||||
|
||||
if len(m.Servers) > 0 {
|
||||
wg := sync.WaitGroup{}
|
||||
for _, server := range m.Servers {
|
||||
wg.Add(1)
|
||||
go func(a telegraf.Accumulator, s string) {
|
||||
defer wg.Done()
|
||||
err := m.parse(a, s)
|
||||
if err != nil {
|
||||
a.AddError(err)
|
||||
}
|
||||
}(acc, server)
|
||||
err := m.parse(acc, server)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
} else {
|
||||
err := m.parse(acc, "")
|
||||
if err != nil {
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
version: '3'
|
||||
|
||||
services:
|
||||
telegraf:
|
||||
image: glinton/scratch
|
||||
volumes:
|
||||
- ./telegraf.conf:/telegraf.conf
|
||||
- ../../../../telegraf:/telegraf
|
||||
- ./test.log:/var/log/test.log
|
||||
entrypoint:
|
||||
- /telegraf
|
||||
- --config
|
||||
- /telegraf.conf
|
||||
@@ -1,12 +0,0 @@
|
||||
[agent]
|
||||
interval="1s"
|
||||
flush_interval="1s"
|
||||
|
||||
[[inputs.logparser]]
|
||||
files = ["/var/log/test.log"]
|
||||
from_beginning = true
|
||||
[inputs.logparser.grok]
|
||||
patterns = [ "%{COMBINED_LOG_FORMAT}", "%{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{TIMESTAMP_ISO8601:timestamp}\\] \"(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})\" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) %{QS:referrer} %{QS:agent}"]
|
||||
|
||||
[[outputs.file]]
|
||||
files = ["stdout"]
|
||||
@@ -1,2 +0,0 @@
|
||||
127.0.0.1 ident auth [10/Oct/2000:13:55:36 -0700] "GET /anything HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"
|
||||
127.0.0.1 ident auth [2018-02-21 13:10:34,555] "GET /peter HTTP/1.0" 200 2326 "http://localhost:8083/" "Chrome/51.0.2704.84"
|
||||
@@ -293,7 +293,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
timestamp = time.Unix(0, iv)
|
||||
}
|
||||
case SYSLOG_TIMESTAMP:
|
||||
ts, err := time.ParseInLocation(time.Stamp, v, p.loc)
|
||||
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
|
||||
if err == nil {
|
||||
if ts.Year() == 0 {
|
||||
ts = ts.AddDate(timestamp.Year(), 0, 0)
|
||||
@@ -340,6 +340,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
|
||||
ts, err := time.ParseInLocation(t, v, p.loc)
|
||||
if err == nil {
|
||||
if ts.Year() == 0 {
|
||||
ts = ts.AddDate(timestamp.Year(), 0, 0)
|
||||
}
|
||||
timestamp = ts
|
||||
} else {
|
||||
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
|
||||
@@ -469,7 +472,6 @@ func (t *tsModder) tsMod(ts time.Time) time.Time {
|
||||
t.rollover = 0
|
||||
return ts
|
||||
}
|
||||
|
||||
if ts.Equal(t.last) {
|
||||
t.dupe = ts
|
||||
}
|
||||
|
||||
@@ -971,41 +971,16 @@ func TestNewlineInPatterns(t *testing.T) {
|
||||
require.NotNil(t, m)
|
||||
}
|
||||
|
||||
func TestSyslogTimestamp(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
line string
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "two digit day of month",
|
||||
line: "Sep 25 09:01:55 value=42",
|
||||
expected: time.Date(2018, time.September, 25, 9, 1, 55, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
name: "one digit day of month single space",
|
||||
line: "Sep 2 09:01:55 value=42",
|
||||
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
name: "one digit day of month double space",
|
||||
line: "Sep 2 09:01:55 value=42",
|
||||
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
p := &Parser{
|
||||
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
|
||||
timeFunc: func() time.Time { return time.Date(2017, time.April, 1, 0, 0, 0, 0, time.UTC) },
|
||||
}
|
||||
require.NoError(t, p.Compile())
|
||||
m, err := p.ParseLine(tt.line)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, m)
|
||||
require.Equal(t, tt.expected, m.Time())
|
||||
})
|
||||
func TestSyslogTimestampParser(t *testing.T) {
|
||||
p := &Parser{
|
||||
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
|
||||
timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) },
|
||||
}
|
||||
require.NoError(t, p.Compile())
|
||||
m, err := p.ParseLine("Sep 25 09:01:55 value=42")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, m)
|
||||
require.Equal(t, 2018, m.Time().Year())
|
||||
}
|
||||
|
||||
func TestReplaceTimestampComma(t *testing.T) {
|
||||
@@ -1025,3 +1000,20 @@ func TestReplaceTimestampComma(t *testing.T) {
|
||||
//Convert Nanosecond to milisecond for compare
|
||||
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
|
||||
}
|
||||
|
||||
func TestEmptyYearInTimestamp(t *testing.T) {
|
||||
p := &Parser{
|
||||
Patterns: []string{`%{APPLE_SYSLOG_TIME_SHORT:timestamp:ts-"Jan 2 15:04:05"} %{HOSTNAME} %{APP_NAME:app_name}\[%{NUMBER:pid:int}\]%{GREEDYDATA:message}`},
|
||||
CustomPatterns: `
|
||||
APPLE_SYSLOG_TIME_SHORT %{MONTH} +%{MONTHDAY} %{TIME}
|
||||
APP_NAME [a-zA-Z0-9\.]+
|
||||
`,
|
||||
}
|
||||
require.NoError(t, p.Compile())
|
||||
p.ParseLine("Nov 6 13:57:03 generic iTunes[6504]: info> Scale factor of main display = 2.0")
|
||||
m, err := p.ParseLine("Nov 6 13:57:03 generic iTunes[6504]: objc[6504]: Object descriptor was null.")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, m)
|
||||
require.Equal(t, 2018, m.Time().Year())
|
||||
|
||||
}
|
||||
|
||||
@@ -203,10 +203,6 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
|
||||
Poll: poll,
|
||||
Logger: tail.DiscardingLogger,
|
||||
})
|
||||
|
||||
//add message saying a new tailer was added for the file
|
||||
log.Printf("D! tail added for file: %v", file)
|
||||
|
||||
if err != nil {
|
||||
l.acc.AddError(err)
|
||||
continue
|
||||
@@ -291,10 +287,6 @@ func (l *LogParserPlugin) Stop() {
|
||||
|
||||
for _, t := range l.tailers {
|
||||
err := t.Stop()
|
||||
|
||||
//message for a stopped tailer
|
||||
log.Printf("D! tail dropped for file: %v", t.Filename)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
|
||||
}
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
version: '3'
|
||||
services:
|
||||
mongodb:
|
||||
image: mongo
|
||||
|
||||
telegraf:
|
||||
image: glinton/scratch
|
||||
volumes:
|
||||
- ./telegraf.conf:/telegraf.conf
|
||||
- ../../../../telegraf:/telegraf
|
||||
depends_on:
|
||||
- mongodb
|
||||
entrypoint:
|
||||
- /telegraf
|
||||
- --config
|
||||
- /telegraf.conf
|
||||
@@ -1,9 +0,0 @@
|
||||
[agent]
|
||||
interval="1s"
|
||||
flush_interval="3s"
|
||||
|
||||
[[inputs.mongodb]]
|
||||
servers = ["mongodb://mongodb:27017"]
|
||||
|
||||
[[outputs.file]]
|
||||
files = ["stdout"]
|
||||
@@ -31,35 +31,28 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
|
||||
}
|
||||
|
||||
var DefaultStats = map[string]string{
|
||||
"inserts_per_sec": "Insert",
|
||||
"queries_per_sec": "Query",
|
||||
"updates_per_sec": "Update",
|
||||
"deletes_per_sec": "Delete",
|
||||
"getmores_per_sec": "GetMore",
|
||||
"commands_per_sec": "Command",
|
||||
"flushes_per_sec": "Flushes",
|
||||
"vsize_megabytes": "Virtual",
|
||||
"resident_megabytes": "Resident",
|
||||
"queued_reads": "QueuedReaders",
|
||||
"queued_writes": "QueuedWriters",
|
||||
"active_reads": "ActiveReaders",
|
||||
"active_writes": "ActiveWriters",
|
||||
"net_in_bytes": "NetIn",
|
||||
"net_out_bytes": "NetOut",
|
||||
"open_connections": "NumConnections",
|
||||
"ttl_deletes_per_sec": "DeletedDocuments",
|
||||
"ttl_passes_per_sec": "Passes",
|
||||
"cursor_timed_out": "TimedOutC",
|
||||
"cursor_no_timeout": "NoTimeoutC",
|
||||
"cursor_pinned": "PinnedC",
|
||||
"cursor_total": "TotalC",
|
||||
"document_deleted": "DeletedD",
|
||||
"document_inserted": "InsertedD",
|
||||
"document_returned": "ReturnedD",
|
||||
"document_updated": "UpdatedD",
|
||||
"connections_current": "CurrentC",
|
||||
"connections_available": "AvailableC",
|
||||
"connections_total_created": "TotalCreatedC",
|
||||
"inserts_per_sec": "Insert",
|
||||
"queries_per_sec": "Query",
|
||||
"updates_per_sec": "Update",
|
||||
"deletes_per_sec": "Delete",
|
||||
"getmores_per_sec": "GetMore",
|
||||
"commands_per_sec": "Command",
|
||||
"flushes_per_sec": "Flushes",
|
||||
"vsize_megabytes": "Virtual",
|
||||
"resident_megabytes": "Resident",
|
||||
"queued_reads": "QueuedReaders",
|
||||
"queued_writes": "QueuedWriters",
|
||||
"active_reads": "ActiveReaders",
|
||||
"active_writes": "ActiveWriters",
|
||||
"net_in_bytes": "NetIn",
|
||||
"net_out_bytes": "NetOut",
|
||||
"open_connections": "NumConnections",
|
||||
"ttl_deletes_per_sec": "DeletedDocuments",
|
||||
"ttl_passes_per_sec": "Passes",
|
||||
"cursor_timed_out": "TimedOutC",
|
||||
"cursor_no_timeout": "NoTimeoutC",
|
||||
"cursor_pinned": "PinnedC",
|
||||
"cursor_total": "TotalC",
|
||||
}
|
||||
|
||||
var DefaultReplStats = map[string]string{
|
||||
|
||||
@@ -38,13 +38,6 @@ func TestAddNonReplStats(t *testing.T) {
|
||||
NoTimeoutC: 0,
|
||||
PinnedC: 0,
|
||||
TotalC: 0,
|
||||
DeletedD: 0,
|
||||
InsertedD: 0,
|
||||
ReturnedD: 0,
|
||||
UpdatedD: 0,
|
||||
CurrentC: 0,
|
||||
AvailableC: 0,
|
||||
TotalCreatedC: 0,
|
||||
},
|
||||
tags,
|
||||
)
|
||||
@@ -189,50 +182,43 @@ func TestStateTag(t *testing.T) {
|
||||
d.AddDefaultStats()
|
||||
d.flush(&acc)
|
||||
fields := map[string]interface{}{
|
||||
"active_reads": int64(0),
|
||||
"active_writes": int64(0),
|
||||
"commands_per_sec": int64(0),
|
||||
"deletes_per_sec": int64(0),
|
||||
"flushes_per_sec": int64(0),
|
||||
"getmores_per_sec": int64(0),
|
||||
"inserts_per_sec": int64(0),
|
||||
"member_status": "PRI",
|
||||
"state": "PRIMARY",
|
||||
"net_in_bytes": int64(0),
|
||||
"net_out_bytes": int64(0),
|
||||
"open_connections": int64(0),
|
||||
"queries_per_sec": int64(0),
|
||||
"queued_reads": int64(0),
|
||||
"queued_writes": int64(0),
|
||||
"repl_commands_per_sec": int64(0),
|
||||
"repl_deletes_per_sec": int64(0),
|
||||
"repl_getmores_per_sec": int64(0),
|
||||
"repl_inserts_per_sec": int64(0),
|
||||
"repl_queries_per_sec": int64(0),
|
||||
"repl_updates_per_sec": int64(0),
|
||||
"repl_lag": int64(0),
|
||||
"repl_oplog_window_sec": int64(0),
|
||||
"resident_megabytes": int64(0),
|
||||
"updates_per_sec": int64(0),
|
||||
"vsize_megabytes": int64(0),
|
||||
"ttl_deletes_per_sec": int64(0),
|
||||
"ttl_passes_per_sec": int64(0),
|
||||
"jumbo_chunks": int64(0),
|
||||
"total_in_use": int64(0),
|
||||
"total_available": int64(0),
|
||||
"total_created": int64(0),
|
||||
"total_refreshing": int64(0),
|
||||
"cursor_timed_out": int64(0),
|
||||
"cursor_no_timeout": int64(0),
|
||||
"cursor_pinned": int64(0),
|
||||
"cursor_total": int64(0),
|
||||
"document_deleted": int64(0),
|
||||
"document_inserted": int64(0),
|
||||
"document_returned": int64(0),
|
||||
"document_updated": int64(0),
|
||||
"connections_current": int64(0),
|
||||
"connections_available": int64(0),
|
||||
"connections_total_created": int64(0),
|
||||
"active_reads": int64(0),
|
||||
"active_writes": int64(0),
|
||||
"commands_per_sec": int64(0),
|
||||
"deletes_per_sec": int64(0),
|
||||
"flushes_per_sec": int64(0),
|
||||
"getmores_per_sec": int64(0),
|
||||
"inserts_per_sec": int64(0),
|
||||
"member_status": "PRI",
|
||||
"state": "PRIMARY",
|
||||
"net_in_bytes": int64(0),
|
||||
"net_out_bytes": int64(0),
|
||||
"open_connections": int64(0),
|
||||
"queries_per_sec": int64(0),
|
||||
"queued_reads": int64(0),
|
||||
"queued_writes": int64(0),
|
||||
"repl_commands_per_sec": int64(0),
|
||||
"repl_deletes_per_sec": int64(0),
|
||||
"repl_getmores_per_sec": int64(0),
|
||||
"repl_inserts_per_sec": int64(0),
|
||||
"repl_queries_per_sec": int64(0),
|
||||
"repl_updates_per_sec": int64(0),
|
||||
"repl_lag": int64(0),
|
||||
"repl_oplog_window_sec": int64(0),
|
||||
"resident_megabytes": int64(0),
|
||||
"updates_per_sec": int64(0),
|
||||
"vsize_megabytes": int64(0),
|
||||
"ttl_deletes_per_sec": int64(0),
|
||||
"ttl_passes_per_sec": int64(0),
|
||||
"jumbo_chunks": int64(0),
|
||||
"total_in_use": int64(0),
|
||||
"total_available": int64(0),
|
||||
"total_created": int64(0),
|
||||
"total_refreshing": int64(0),
|
||||
"cursor_timed_out": int64(0),
|
||||
"cursor_no_timeout": int64(0),
|
||||
"cursor_pinned": int64(0),
|
||||
"cursor_total": int64(0),
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
|
||||
}
|
||||
|
||||
@@ -225,7 +225,7 @@ type FlushStats struct {
|
||||
type ConnectionStats struct {
|
||||
Current int64 `bson:"current"`
|
||||
Available int64 `bson:"available"`
|
||||
TotalCreated int64 `bson:"total_created"`
|
||||
TotalCreated int64 `bson:"totalCreated"`
|
||||
}
|
||||
|
||||
// DurTiming stores information related to journaling.
|
||||
@@ -289,9 +289,8 @@ type OpcountStats struct {
|
||||
|
||||
// MetricsStats stores information related to metrics
|
||||
type MetricsStats struct {
|
||||
TTL *TTLStats `bson:"ttl"`
|
||||
Cursor *CursorStats `bson:"cursor"`
|
||||
Document *DocumentStats `bson:"document"`
|
||||
TTL *TTLStats `bson:"ttl"`
|
||||
Cursor *CursorStats `bson:"cursor"`
|
||||
}
|
||||
|
||||
// TTLStats stores information related to documents with a ttl index.
|
||||
@@ -306,14 +305,6 @@ type CursorStats struct {
|
||||
Open *OpenCursorStats `bson:"open"`
|
||||
}
|
||||
|
||||
// DocumentStats stores information related to document metrics.
|
||||
type DocumentStats struct {
|
||||
Deleted int64 `bson:"deleted"`
|
||||
Inserted int64 `bson:"inserted"`
|
||||
Returned int64 `bson:"returned"`
|
||||
Updated int64 `bson:"updated"`
|
||||
}
|
||||
|
||||
// OpenCursorStats stores information related to open cursor metrics
|
||||
type OpenCursorStats struct {
|
||||
NoTimeout int64 `bson:"noTimeout"`
|
||||
@@ -466,12 +457,6 @@ type StatLine struct {
|
||||
TimedOutC int64
|
||||
NoTimeoutC, PinnedC, TotalC int64
|
||||
|
||||
// Document fields
|
||||
DeletedD, InsertedD, ReturnedD, UpdatedD int64
|
||||
|
||||
// Connection fields
|
||||
CurrentC, AvailableC, TotalCreatedC int64
|
||||
|
||||
// Collection locks (3.0 mmap only)
|
||||
CollectionLocks *CollectionLockStatus
|
||||
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
version: '3'
|
||||
|
||||
services:
|
||||
mysql:
|
||||
image: mysql:5.7
|
||||
restart: always
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: telegraf
|
||||
MYSQL_DATABASE: telegraf
|
||||
MYSQL_USER: telegraf
|
||||
MYSQL_PASSWORD: telegraf
|
||||
maria:
|
||||
image: mariadb
|
||||
restart: always
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: telegraf
|
||||
MYSQL_DATABASE: telegraf
|
||||
MYSQL_USER: telegraf
|
||||
MYSQL_PASSWORD: telegraf
|
||||
command: mysqld --userstat=1
|
||||
percona:
|
||||
image: percona
|
||||
restart: always
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: telegraf
|
||||
MYSQL_DATABASE: telegraf
|
||||
MYSQL_USER: telegraf
|
||||
MYSQL_PASSWORD: telegraf
|
||||
|
||||
telegraf:
|
||||
image: glinton/scratch
|
||||
depends_on:
|
||||
- mysql
|
||||
- maria
|
||||
- percona
|
||||
volumes:
|
||||
- ./telegraf.conf:/telegraf.conf
|
||||
- ../../../../telegraf:/telegraf
|
||||
entrypoint:
|
||||
- /telegraf
|
||||
- --config
|
||||
- /telegraf.conf
|
||||
@@ -1,61 +0,0 @@
|
||||
# Uncomment each input as needed to test plugin
|
||||
|
||||
## mysql
|
||||
#[[inputs.mysql]]
|
||||
# servers = ["root:telegraf@tcp(mysql:3306)/"]
|
||||
# gather_table_schema = true
|
||||
# gather_process_list = true
|
||||
# gather_user_statistics = true
|
||||
# gather_info_schema_auto_inc = true
|
||||
# gather_innodb_metrics = true
|
||||
# gather_slave_status = true
|
||||
# gather_binary_logs = false
|
||||
# gather_table_io_waits = true
|
||||
# gather_table_lock_waits = true
|
||||
# gather_index_io_waits = true
|
||||
# gather_event_waits = true
|
||||
# gather_file_events_stats = true
|
||||
# gather_perf_events_statements = true
|
||||
# interval_slow = "30m"
|
||||
# table_schema_databases = []
|
||||
#
|
||||
## mariadb
|
||||
#[[inputs.mysql]]
|
||||
# servers = ["root:telegraf@tcp(maria:3306)/"]
|
||||
# gather_table_schema = true
|
||||
# gather_process_list = true
|
||||
# gather_user_statistics = true
|
||||
# gather_info_schema_auto_inc = true
|
||||
# gather_innodb_metrics = true
|
||||
# gather_slave_status = true
|
||||
# gather_binary_logs = false
|
||||
# gather_table_io_waits = true
|
||||
# gather_table_lock_waits = true
|
||||
# gather_index_io_waits = true
|
||||
# gather_event_waits = true
|
||||
# gather_file_events_stats = true
|
||||
# gather_perf_events_statements = true
|
||||
# interval_slow = "30m"
|
||||
# table_schema_databases = []
|
||||
|
||||
# percona
|
||||
[[inputs.mysql]]
|
||||
servers = ["root:telegraf@tcp(percona:3306)/"]
|
||||
gather_table_schema = true
|
||||
gather_process_list = true
|
||||
gather_user_statistics = true
|
||||
gather_info_schema_auto_inc = true
|
||||
gather_innodb_metrics = true
|
||||
gather_slave_status = true
|
||||
gather_binary_logs = false
|
||||
gather_table_io_waits = true
|
||||
gather_table_lock_waits = true
|
||||
gather_index_io_waits = true
|
||||
gather_event_waits = true
|
||||
gather_file_events_stats = true
|
||||
gather_perf_events_statements = true
|
||||
interval_slow = "30m"
|
||||
table_schema_databases = []
|
||||
|
||||
[[outputs.file]]
|
||||
files = ["stdout"]
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -79,7 +80,7 @@ var sampleConfig = `
|
||||
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
||||
gather_process_list = true
|
||||
#
|
||||
## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS
|
||||
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
|
||||
gather_user_statistics = true
|
||||
#
|
||||
## gather auto_increment columns and max values from information schema
|
||||
@@ -281,8 +282,9 @@ const (
|
||||
GROUP BY command,state
|
||||
ORDER BY null`
|
||||
infoSchemaUserStatisticsQuery = `
|
||||
SELECT *
|
||||
FROM information_schema.user_statistics`
|
||||
SELECT *,count(*)
|
||||
FROM information_schema.user_statistics
|
||||
GROUP BY user`
|
||||
infoSchemaAutoIncQuery = `
|
||||
SELECT table_schema, table_name, column_name, auto_increment,
|
||||
CAST(pow(2, case data_type
|
||||
@@ -759,6 +761,103 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
|
||||
if len(fields) > 0 {
|
||||
acc.AddFields("mysql", fields, tags)
|
||||
}
|
||||
// gather connection metrics from processlist for each user
|
||||
if m.GatherProcessList {
|
||||
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
|
||||
if err != nil {
|
||||
log.Printf("E! MySQL Error gathering process list: %s", err)
|
||||
} else {
|
||||
for conn_rows.Next() {
|
||||
var user string
|
||||
var connections int64
|
||||
|
||||
err = conn_rows.Scan(&user, &connections)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"server": servtag, "user": user}
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fields["connections"] = connections
|
||||
acc.AddFields("mysql_users", fields, tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// gather connection metrics from user_statistics for each user
|
||||
if m.GatherUserStatistics {
|
||||
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
|
||||
if err != nil {
|
||||
log.Printf("E! MySQL Error gathering user stats: %s", err)
|
||||
} else {
|
||||
for conn_rows.Next() {
|
||||
var user string
|
||||
var total_connections int64
|
||||
var concurrent_connections int64
|
||||
var connected_time int64
|
||||
var busy_time int64
|
||||
var cpu_time int64
|
||||
var bytes_received int64
|
||||
var bytes_sent int64
|
||||
var binlog_bytes_written int64
|
||||
var rows_fetched int64
|
||||
var rows_updated int64
|
||||
var table_rows_read int64
|
||||
var select_commands int64
|
||||
var update_commands int64
|
||||
var other_commands int64
|
||||
var commit_transactions int64
|
||||
var rollback_transactions int64
|
||||
var denied_connections int64
|
||||
var lost_connections int64
|
||||
var access_denied int64
|
||||
var empty_queries int64
|
||||
var total_ssl_connections int64
|
||||
|
||||
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
|
||||
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
|
||||
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
|
||||
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
|
||||
&empty_queries, &total_ssl_connections,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"server": servtag, "user": user}
|
||||
fields := map[string]interface{}{
|
||||
"total_connections": total_connections,
|
||||
"concurrent_connections": concurrent_connections,
|
||||
"connected_time": connected_time,
|
||||
"busy_time": busy_time,
|
||||
"cpu_time": cpu_time,
|
||||
"bytes_received": bytes_received,
|
||||
"bytes_sent": bytes_sent,
|
||||
"binlog_bytes_written": binlog_bytes_written,
|
||||
"rows_fetched": rows_fetched,
|
||||
"rows_updated": rows_updated,
|
||||
"table_rows_read": table_rows_read,
|
||||
"select_commands": select_commands,
|
||||
"update_commands": update_commands,
|
||||
"other_commands": other_commands,
|
||||
"commit_transactions": commit_transactions,
|
||||
"rollback_transactions": rollback_transactions,
|
||||
"denied_connections": denied_connections,
|
||||
"lost_connections": lost_connections,
|
||||
"access_denied": access_denied,
|
||||
"empty_queries": empty_queries,
|
||||
"total_ssl_connections": total_ssl_connections,
|
||||
}
|
||||
|
||||
acc.AddFields("mysql_user_stats", fields, tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -809,29 +908,6 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
|
||||
} else {
|
||||
acc.AddFields("mysql_process_list", fields, tags)
|
||||
}
|
||||
|
||||
// get count of connections from each user
|
||||
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for conn_rows.Next() {
|
||||
var user string
|
||||
var connections int64
|
||||
|
||||
err = conn_rows.Scan(&user, &connections)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"server": servtag, "user": user}
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
fields["connections"] = connections
|
||||
acc.AddFields("mysql_users", fields, tags)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -841,190 +917,77 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
|
||||
// run query
|
||||
rows, err := db.Query(infoSchemaUserStatisticsQuery)
|
||||
if err != nil {
|
||||
// disable collecting if table is not found (mysql specific error)
|
||||
// (suppresses repeat errors)
|
||||
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
|
||||
m.GatherUserStatistics = false
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
cols, err := columnsToLower(rows.Columns())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
read, err := getColSlice(len(cols))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var (
|
||||
user string
|
||||
total_connections int64
|
||||
concurrent_connections int64
|
||||
connected_time int64
|
||||
busy_time int64
|
||||
cpu_time int64
|
||||
bytes_received int64
|
||||
bytes_sent int64
|
||||
binlog_bytes_written int64
|
||||
rows_fetched int64
|
||||
rows_updated int64
|
||||
table_rows_read int64
|
||||
select_commands int64
|
||||
update_commands int64
|
||||
other_commands int64
|
||||
commit_transactions int64
|
||||
rollback_transactions int64
|
||||
denied_connections int64
|
||||
lost_connections int64
|
||||
access_denied int64
|
||||
empty_queries int64
|
||||
total_ssl_connections int64
|
||||
count uint32
|
||||
)
|
||||
|
||||
servtag := getDSNTag(serv)
|
||||
for rows.Next() {
|
||||
err = rows.Scan(read...)
|
||||
err = rows.Scan(&user, &total_connections, &concurrent_connections,
|
||||
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
|
||||
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
|
||||
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
|
||||
&empty_queries, &total_ssl_connections, &count,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"server": servtag, "user": *read[0].(*string)}
|
||||
fields := map[string]interface{}{}
|
||||
tags := map[string]string{"server": servtag, "user": user}
|
||||
fields := map[string]interface{}{
|
||||
|
||||
for i := range cols {
|
||||
if i == 0 {
|
||||
continue // skip "user"
|
||||
}
|
||||
switch v := read[i].(type) {
|
||||
case *int64:
|
||||
fields[cols[i]] = *v
|
||||
case *float64:
|
||||
fields[cols[i]] = *v
|
||||
case *string:
|
||||
fields[cols[i]] = *v
|
||||
default:
|
||||
return fmt.Errorf("Unknown column type - %T", v)
|
||||
}
|
||||
"total_connections": total_connections,
|
||||
"concurrent_connections": concurrent_connections,
|
||||
"connected_time": connected_time,
|
||||
"busy_time": busy_time,
|
||||
"cpu_time": cpu_time,
|
||||
"bytes_received": bytes_received,
|
||||
"bytes_sent": bytes_sent,
|
||||
"binlog_bytes_written": binlog_bytes_written,
|
||||
"rows_fetched": rows_fetched,
|
||||
"rows_updated": rows_updated,
|
||||
"table_rows_read": table_rows_read,
|
||||
"select_commands": select_commands,
|
||||
"update_commands": update_commands,
|
||||
"other_commands": other_commands,
|
||||
"commit_transactions": commit_transactions,
|
||||
"rollback_transactions": rollback_transactions,
|
||||
"denied_connections": denied_connections,
|
||||
"lost_connections": lost_connections,
|
||||
"access_denied": access_denied,
|
||||
"empty_queries": empty_queries,
|
||||
"total_ssl_connections": total_ssl_connections,
|
||||
}
|
||||
acc.AddFields("mysql_user_stats", fields, tags)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// columnsToLower converts selected column names to lowercase.
|
||||
func columnsToLower(s []string, e error) ([]string, error) {
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
d := make([]string, len(s))
|
||||
|
||||
for i := range s {
|
||||
d[i] = strings.ToLower(s[i])
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// getColSlice returns an in interface slice that can be used in the row.Scan().
|
||||
func getColSlice(l int) ([]interface{}, error) {
|
||||
// list of all possible column names
|
||||
var (
|
||||
user string
|
||||
total_connections int64
|
||||
concurrent_connections int64
|
||||
connected_time int64
|
||||
busy_time int64
|
||||
cpu_time int64
|
||||
bytes_received int64
|
||||
bytes_sent int64
|
||||
binlog_bytes_written int64
|
||||
rows_read int64
|
||||
rows_sent int64
|
||||
rows_deleted int64
|
||||
rows_inserted int64
|
||||
rows_updated int64
|
||||
select_commands int64
|
||||
update_commands int64
|
||||
other_commands int64
|
||||
commit_transactions int64
|
||||
rollback_transactions int64
|
||||
denied_connections int64
|
||||
lost_connections int64
|
||||
access_denied int64
|
||||
empty_queries int64
|
||||
total_ssl_connections int64
|
||||
max_statement_time_exceeded int64
|
||||
// maria specific
|
||||
fbusy_time float64
|
||||
fcpu_time float64
|
||||
// percona specific
|
||||
rows_fetched int64
|
||||
table_rows_read int64
|
||||
)
|
||||
|
||||
switch l {
|
||||
case 23: // maria5
|
||||
return []interface{}{
|
||||
&user,
|
||||
&total_connections,
|
||||
&concurrent_connections,
|
||||
&connected_time,
|
||||
&fbusy_time,
|
||||
&fcpu_time,
|
||||
&bytes_received,
|
||||
&bytes_sent,
|
||||
&binlog_bytes_written,
|
||||
&rows_read,
|
||||
&rows_sent,
|
||||
&rows_deleted,
|
||||
&rows_inserted,
|
||||
&rows_updated,
|
||||
&select_commands,
|
||||
&update_commands,
|
||||
&other_commands,
|
||||
&commit_transactions,
|
||||
&rollback_transactions,
|
||||
&denied_connections,
|
||||
&lost_connections,
|
||||
&access_denied,
|
||||
&empty_queries,
|
||||
}, nil
|
||||
case 25: // maria10
|
||||
return []interface{}{
|
||||
&user,
|
||||
&total_connections,
|
||||
&concurrent_connections,
|
||||
&connected_time,
|
||||
&fbusy_time,
|
||||
&fcpu_time,
|
||||
&bytes_received,
|
||||
&bytes_sent,
|
||||
&binlog_bytes_written,
|
||||
&rows_read,
|
||||
&rows_sent,
|
||||
&rows_deleted,
|
||||
&rows_inserted,
|
||||
&rows_updated,
|
||||
&select_commands,
|
||||
&update_commands,
|
||||
&other_commands,
|
||||
&commit_transactions,
|
||||
&rollback_transactions,
|
||||
&denied_connections,
|
||||
&lost_connections,
|
||||
&access_denied,
|
||||
&empty_queries,
|
||||
&total_ssl_connections,
|
||||
&max_statement_time_exceeded,
|
||||
}, nil
|
||||
case 22: // percona
|
||||
return []interface{}{
|
||||
&user,
|
||||
&total_connections,
|
||||
&concurrent_connections,
|
||||
&connected_time,
|
||||
&busy_time,
|
||||
&cpu_time,
|
||||
&bytes_received,
|
||||
&bytes_sent,
|
||||
&binlog_bytes_written,
|
||||
&rows_fetched,
|
||||
&rows_updated,
|
||||
&table_rows_read,
|
||||
&select_commands,
|
||||
&update_commands,
|
||||
&other_commands,
|
||||
&commit_transactions,
|
||||
&rollback_transactions,
|
||||
&denied_connections,
|
||||
&lost_connections,
|
||||
&access_denied,
|
||||
&empty_queries,
|
||||
&total_ssl_connections,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Not Supported - %d columns", l)
|
||||
}
|
||||
|
||||
// gatherPerfTableIOWaits can be used to get total count and time
|
||||
// of I/O wait event for each table and process
|
||||
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
# Procstat Input Plugin
|
||||
|
||||
The procstat plugin can be used to monitor the system resource usage of one or more processes.
|
||||
The procstat_lookup metric displays the query information,
|
||||
specifically the number of PIDs returned on a search
|
||||
|
||||
Processes can be selected for monitoring using one of several methods:
|
||||
- pidfile
|
||||
@@ -129,18 +127,7 @@ implemented as a WMI query. The pattern allows fuzzy matching using only
|
||||
- voluntary_context_switches (int)
|
||||
- write_bytes (int, *telegraf* may need to be ran as **root**)
|
||||
- write_count (int, *telegraf* may need to be ran as **root**)
|
||||
- procstat_lookup
|
||||
- tags:
|
||||
- exe (string)
|
||||
- pid_finder (string)
|
||||
- pid_file (string)
|
||||
- pattern (string)
|
||||
- prefix (string)
|
||||
- user (string)
|
||||
- systemd_unit (string)
|
||||
- cgroup (string)
|
||||
- fields:
|
||||
- pid_count (int)
|
||||
|
||||
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
|
||||
|
||||
### Example Output:
|
||||
|
||||
@@ -107,13 +107,13 @@ requests that are in the queue but not yet issued to the device driver.
|
||||
|
||||
#### Calculate percent IO utilization per disk and host:
|
||||
```
|
||||
SELECT non_negative_derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
||||
SELECT derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
||||
```
|
||||
|
||||
#### Calculate average queue depth:
|
||||
`iops_in_progress` will give you an instantaneous value. This will give you the average between polling intervals.
|
||||
```
|
||||
SELECT non_negative_derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
||||
SELECT derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
||||
```
|
||||
|
||||
### Example Output:
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
# Swap Input Plugin
|
||||
|
||||
The swap plugin collects system swap metrics.
|
||||
|
||||
For more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
# Read metrics about swap memory usage
|
||||
[[inputs.swap]]
|
||||
# no configuration
|
||||
```
|
||||
|
||||
### Metrics:
|
||||
|
||||
- swap
|
||||
- fields:
|
||||
- free (int)
|
||||
- total (int)
|
||||
- used (int)
|
||||
- used_percent (float)
|
||||
- in (int)
|
||||
- out (int)
|
||||
|
||||
### Example Output:
|
||||
|
||||
```
|
||||
swap total=20855394304i,used_percent=45.43883523785713,used=9476448256i,free=1715331072i 1511894782000000000
|
||||
```
|
||||
@@ -42,9 +42,45 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type SwapStats struct {
|
||||
ps PS
|
||||
}
|
||||
|
||||
func (_ *SwapStats) Description() string {
|
||||
return "Read metrics about swap memory usage"
|
||||
}
|
||||
|
||||
func (_ *SwapStats) SampleConfig() string { return "" }
|
||||
|
||||
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
|
||||
swap, err := s.ps.SwapStat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting swap memory info: %s", err)
|
||||
}
|
||||
|
||||
fieldsG := map[string]interface{}{
|
||||
"total": swap.Total,
|
||||
"used": swap.Used,
|
||||
"free": swap.Free,
|
||||
"used_percent": swap.UsedPercent,
|
||||
}
|
||||
fieldsC := map[string]interface{}{
|
||||
"in": swap.Sin,
|
||||
"out": swap.Sout,
|
||||
}
|
||||
acc.AddGauge("swap", fieldsG, nil)
|
||||
acc.AddCounter("swap", fieldsC, nil)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
ps := newSystemPS()
|
||||
inputs.Add("mem", func() telegraf.Input {
|
||||
return &MemStats{ps: ps}
|
||||
})
|
||||
|
||||
inputs.Add("swap", func() telegraf.Input {
|
||||
return &SwapStats{ps: ps}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -30,6 +30,17 @@ func TestMemStats(t *testing.T) {
|
||||
|
||||
mps.On("VMStat").Return(vms, nil)
|
||||
|
||||
sms := &mem.SwapMemoryStat{
|
||||
Total: 8123,
|
||||
Used: 1232,
|
||||
Free: 6412,
|
||||
UsedPercent: 12.2,
|
||||
Sin: 7,
|
||||
Sout: 830,
|
||||
}
|
||||
|
||||
mps.On("SwapStat").Return(sms, nil)
|
||||
|
||||
err = (&MemStats{&mps}).Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -50,4 +61,15 @@ func TestMemStats(t *testing.T) {
|
||||
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))
|
||||
|
||||
acc.Metrics = nil
|
||||
|
||||
err = (&SwapStats{&mps}).Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
swapfields := map[string]interface{}{
|
||||
"total": uint64(8123),
|
||||
"used": uint64(1232),
|
||||
"used_percent": float64(12.2),
|
||||
"free": uint64(6412),
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
|
||||
}
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
package system
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
type SwapStats struct {
|
||||
ps PS
|
||||
}
|
||||
|
||||
func (_ *SwapStats) Description() string {
|
||||
return "Read metrics about swap memory usage"
|
||||
}
|
||||
|
||||
func (_ *SwapStats) SampleConfig() string { return "" }
|
||||
|
||||
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
|
||||
swap, err := s.ps.SwapStat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting swap memory info: %s", err)
|
||||
}
|
||||
|
||||
fieldsG := map[string]interface{}{
|
||||
"total": swap.Total,
|
||||
"used": swap.Used,
|
||||
"free": swap.Free,
|
||||
"used_percent": swap.UsedPercent,
|
||||
}
|
||||
fieldsC := map[string]interface{}{
|
||||
"in": swap.Sin,
|
||||
"out": swap.Sout,
|
||||
}
|
||||
acc.AddGauge("swap", fieldsG, nil)
|
||||
acc.AddCounter("swap", fieldsC, nil)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
ps := newSystemPS()
|
||||
inputs.Add("swap", func() telegraf.Input {
|
||||
return &SwapStats{ps: ps}
|
||||
})
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
package system
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/shirou/gopsutil/mem"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSwapStats(t *testing.T) {
|
||||
var mps MockPS
|
||||
var err error
|
||||
defer mps.AssertExpectations(t)
|
||||
var acc testutil.Accumulator
|
||||
|
||||
sms := &mem.SwapMemoryStat{
|
||||
Total: 8123,
|
||||
Used: 1232,
|
||||
Free: 6412,
|
||||
UsedPercent: 12.2,
|
||||
Sin: 7,
|
||||
Sout: 830,
|
||||
}
|
||||
|
||||
mps.On("SwapStat").Return(sms, nil)
|
||||
|
||||
err = (&SwapStats{&mps}).Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
swapfields := map[string]interface{}{
|
||||
"total": uint64(8123),
|
||||
"used": uint64(1232),
|
||||
"used_percent": float64(12.2),
|
||||
"free": uint64(6412),
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
|
||||
}
|
||||
@@ -72,15 +72,6 @@ It is recommended NOT to use this on OSes starting with Vista and newer because
|
||||
Example for Windows Server 2003, this would be set to true:
|
||||
`PreVistaSupport=true`
|
||||
|
||||
#### UsePerfCounterTime
|
||||
|
||||
Bool, if set to `true` will request a timestamp along with the PerfCounter data.
|
||||
If se to `false`, current time will be used.
|
||||
|
||||
Supported on Windows Vista/Windows Server 2008 and newer
|
||||
Example:
|
||||
`UsePerfCounterTime=true`
|
||||
|
||||
### Object
|
||||
|
||||
See Entry below.
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
// Copyright (c) 2010 The win Authors. All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions
|
||||
// are met:
|
||||
// 1. Redistributions of source code must retain the above copyright
|
||||
// notice, this list of conditions and the following disclaimer.
|
||||
// 2. Redistributions in binary form must reproduce the above copyright
|
||||
// notice, this list of conditions and the following disclaimer in the
|
||||
// documentation and/or other materials provided with the distribution.
|
||||
// 3. The names of the authors may not be used to endorse or promote products
|
||||
// derived from this software without specific prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
|
||||
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
||||
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||
// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
||||
// THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
//
|
||||
// This is the official list of 'win' authors for copyright purposes.
|
||||
//
|
||||
// Alexander Neumann <an2048@googlemail.com>
|
||||
// Joseph Watson <jtwatson@linux-consulting.us>
|
||||
// Kevin Pors <krpors@gmail.com>
|
||||
|
||||
// +build windows
|
||||
|
||||
package win_perf_counters
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type SYSTEMTIME struct {
|
||||
wYear uint16
|
||||
wMonth uint16
|
||||
wDayOfWeek uint16
|
||||
wDay uint16
|
||||
wHour uint16
|
||||
wMinute uint16
|
||||
wSecond uint16
|
||||
wMilliseconds uint16
|
||||
}
|
||||
|
||||
type FILETIME struct {
|
||||
dwLowDateTime uint32
|
||||
dwHighDateTime uint32
|
||||
}
|
||||
|
||||
var (
|
||||
// Library
|
||||
libkrnDll *syscall.DLL
|
||||
|
||||
// Functions
|
||||
krn_FileTimeToSystemTime *syscall.Proc
|
||||
krn_FileTimeToLocalFileTime *syscall.Proc
|
||||
krn_LocalFileTimeToFileTime *syscall.Proc
|
||||
krn_WideCharToMultiByte *syscall.Proc
|
||||
)
|
||||
|
||||
func init() {
|
||||
libkrnDll = syscall.MustLoadDLL("Kernel32.dll")
|
||||
|
||||
krn_FileTimeToSystemTime = libkrnDll.MustFindProc("FileTimeToSystemTime")
|
||||
krn_FileTimeToLocalFileTime = libkrnDll.MustFindProc("FileTimeToLocalFileTime")
|
||||
krn_LocalFileTimeToFileTime = libkrnDll.MustFindProc("LocalFileTimeToFileTime")
|
||||
krn_WideCharToMultiByte = libkrnDll.MustFindProc("WideCharToMultiByte")
|
||||
}
|
||||
@@ -38,15 +38,12 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Error codes
|
||||
const (
|
||||
ERROR_SUCCESS = 0
|
||||
ERROR_FAILURE = 1
|
||||
ERROR_INVALID_FUNCTION = 1
|
||||
EPOCH_DIFFERENCE_MICROS int64 = 11644473600000000
|
||||
ERROR_SUCCESS = 0
|
||||
ERROR_INVALID_FUNCTION = 1
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -173,7 +170,6 @@ var (
|
||||
pdh_AddEnglishCounterW *syscall.Proc
|
||||
pdh_CloseQuery *syscall.Proc
|
||||
pdh_CollectQueryData *syscall.Proc
|
||||
pdh_CollectQueryDataWithTime *syscall.Proc
|
||||
pdh_GetFormattedCounterValue *syscall.Proc
|
||||
pdh_GetFormattedCounterArrayW *syscall.Proc
|
||||
pdh_OpenQuery *syscall.Proc
|
||||
@@ -191,7 +187,6 @@ func init() {
|
||||
pdh_AddEnglishCounterW, _ = libpdhDll.FindProc("PdhAddEnglishCounterW") // XXX: only supported on versions > Vista.
|
||||
pdh_CloseQuery = libpdhDll.MustFindProc("PdhCloseQuery")
|
||||
pdh_CollectQueryData = libpdhDll.MustFindProc("PdhCollectQueryData")
|
||||
pdh_CollectQueryDataWithTime, _ = libpdhDll.FindProc("PdhCollectQueryDataWithTime")
|
||||
pdh_GetFormattedCounterValue = libpdhDll.MustFindProc("PdhGetFormattedCounterValue")
|
||||
pdh_GetFormattedCounterArrayW = libpdhDll.MustFindProc("PdhGetFormattedCounterArrayW")
|
||||
pdh_OpenQuery = libpdhDll.MustFindProc("PdhOpenQuery")
|
||||
@@ -308,37 +303,6 @@ func PdhCollectQueryData(hQuery PDH_HQUERY) uint32 {
|
||||
return uint32(ret)
|
||||
}
|
||||
|
||||
// PdhCollectQueryDataWithTime queries data from perfmon, retrieving the device/windows timestamp from the node it was collected on.
|
||||
// Converts the filetime structure to a GO time class and returns the native time.
|
||||
//
|
||||
func PdhCollectQueryDataWithTime(hQuery PDH_HQUERY) (uint32, time.Time) {
|
||||
var localFileTime FILETIME
|
||||
ret, _, _ := pdh_CollectQueryDataWithTime.Call(uintptr(hQuery), uintptr(unsafe.Pointer(&localFileTime)))
|
||||
|
||||
if ret == ERROR_SUCCESS {
|
||||
var utcFileTime FILETIME
|
||||
ret, _, _ := krn_LocalFileTimeToFileTime.Call(
|
||||
uintptr(unsafe.Pointer(&localFileTime)),
|
||||
uintptr(unsafe.Pointer(&utcFileTime)))
|
||||
|
||||
if ret == 0 {
|
||||
return uint32(ERROR_FAILURE), time.Now()
|
||||
}
|
||||
|
||||
// First convert 100-ns intervals to microseconds, then adjust for the
|
||||
// epoch difference
|
||||
var totalMicroSeconds int64
|
||||
totalMicroSeconds = ((int64(utcFileTime.dwHighDateTime) << 32) | int64(utcFileTime.dwLowDateTime)) / 10
|
||||
totalMicroSeconds -= EPOCH_DIFFERENCE_MICROS
|
||||
|
||||
retTime := time.Unix(0, totalMicroSeconds*1000)
|
||||
|
||||
return uint32(ERROR_SUCCESS), retTime
|
||||
}
|
||||
|
||||
return uint32(ret), time.Now()
|
||||
}
|
||||
|
||||
// PdhGetFormattedCounterValueDouble formats the given hCounter using a 'double'. The result is set into the specialized union struct pValue.
|
||||
// This function does not directly translate to a Windows counterpart due to union specialization tricks.
|
||||
func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32, pValue *PDH_FMT_COUNTERVALUE_DOUBLE) uint32 {
|
||||
|
||||
@@ -6,7 +6,6 @@ package win_perf_counters
|
||||
import (
|
||||
"errors"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
@@ -27,8 +26,7 @@ type PerformanceQuery interface {
|
||||
GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error)
|
||||
GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error)
|
||||
CollectData() error
|
||||
CollectDataWithTime() (time.Time, error)
|
||||
IsVistaOrNewer() bool
|
||||
AddEnglishCounterSupported() bool
|
||||
}
|
||||
|
||||
//PdhError represents error returned from Performance Counters API
|
||||
@@ -63,8 +61,8 @@ func (m *PerformanceQueryImpl) Open() error {
|
||||
}
|
||||
}
|
||||
var handle PDH_HQUERY
|
||||
|
||||
if ret := PdhOpenQuery(0, 0, &handle); ret != ERROR_SUCCESS {
|
||||
ret := PdhOpenQuery(0, 0, &handle)
|
||||
if ret != ERROR_SUCCESS {
|
||||
return NewPdhError(ret)
|
||||
}
|
||||
m.query = handle
|
||||
@@ -76,8 +74,8 @@ func (m *PerformanceQueryImpl) Close() error {
|
||||
if m.query == 0 {
|
||||
return errors.New("uninitialised query")
|
||||
}
|
||||
|
||||
if ret := PdhCloseQuery(m.query); ret != ERROR_SUCCESS {
|
||||
ret := PdhCloseQuery(m.query)
|
||||
if ret != ERROR_SUCCESS {
|
||||
return NewPdhError(ret)
|
||||
}
|
||||
m.query = 0
|
||||
@@ -89,8 +87,8 @@ func (m *PerformanceQueryImpl) AddCounterToQuery(counterPath string) (PDH_HCOUNT
|
||||
if m.query == 0 {
|
||||
return 0, errors.New("uninitialised query")
|
||||
}
|
||||
|
||||
if ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
|
||||
ret := PdhAddCounter(m.query, counterPath, 0, &counterHandle)
|
||||
if ret != ERROR_SUCCESS {
|
||||
return 0, NewPdhError(ret)
|
||||
}
|
||||
return counterHandle, nil
|
||||
@@ -101,7 +99,8 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
|
||||
if m.query == 0 {
|
||||
return 0, errors.New("uninitialised query")
|
||||
}
|
||||
if ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle); ret != ERROR_SUCCESS {
|
||||
ret := PdhAddEnglishCounter(m.query, counterPath, 0, &counterHandle)
|
||||
if ret != ERROR_SUCCESS {
|
||||
return 0, NewPdhError(ret)
|
||||
}
|
||||
return counterHandle, nil
|
||||
@@ -111,11 +110,13 @@ func (m *PerformanceQueryImpl) AddEnglishCounterToQuery(counterPath string) (PDH
|
||||
func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) {
|
||||
var bufSize uint32
|
||||
var buff []byte
|
||||
var ret uint32
|
||||
if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, nil); ret == PDH_MORE_DATA {
|
||||
|
||||
ret := PdhGetCounterInfo(counterHandle, 0, &bufSize, nil)
|
||||
if ret == PDH_MORE_DATA {
|
||||
buff = make([]byte, bufSize)
|
||||
bufSize = uint32(len(buff))
|
||||
if ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0]); ret == ERROR_SUCCESS {
|
||||
ret = PdhGetCounterInfo(counterHandle, 0, &bufSize, &buff[0])
|
||||
if ret == ERROR_SUCCESS {
|
||||
ci := (*PDH_COUNTER_INFO)(unsafe.Pointer(&buff[0]))
|
||||
return UTF16PtrToString(ci.SzFullPath), nil
|
||||
}
|
||||
@@ -127,9 +128,9 @@ func (m *PerformanceQueryImpl) GetCounterPath(counterHandle PDH_HCOUNTER) (strin
|
||||
func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string, error) {
|
||||
var bufSize uint32
|
||||
var buff []uint16
|
||||
var ret uint32
|
||||
|
||||
if ret = PdhExpandWildCardPath(counterPath, nil, &bufSize); ret == PDH_MORE_DATA {
|
||||
ret := PdhExpandWildCardPath(counterPath, nil, &bufSize)
|
||||
if ret == PDH_MORE_DATA {
|
||||
buff = make([]uint16, bufSize)
|
||||
bufSize = uint32(len(buff))
|
||||
ret = PdhExpandWildCardPath(counterPath, &buff[0], &bufSize)
|
||||
@@ -145,9 +146,8 @@ func (m *PerformanceQueryImpl) ExpandWildCardPath(counterPath string) ([]string,
|
||||
func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) {
|
||||
var counterType uint32
|
||||
var value PDH_FMT_COUNTERVALUE_DOUBLE
|
||||
var ret uint32
|
||||
|
||||
if ret = PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value); ret == ERROR_SUCCESS {
|
||||
ret := PdhGetFormattedCounterValueDouble(hCounter, &counterType, &value)
|
||||
if ret == ERROR_SUCCESS {
|
||||
if value.CStatus == PDH_CSTATUS_VALID_DATA || value.CStatus == PDH_CSTATUS_NEW_DATA {
|
||||
return value.DoubleValue, nil
|
||||
} else {
|
||||
@@ -161,12 +161,11 @@ func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUN
|
||||
func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
|
||||
var buffSize uint32
|
||||
var itemCount uint32
|
||||
var ret uint32
|
||||
|
||||
if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil); ret == PDH_MORE_DATA {
|
||||
ret := PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil)
|
||||
if ret == PDH_MORE_DATA {
|
||||
buff := make([]byte, buffSize)
|
||||
|
||||
if ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0]); ret == ERROR_SUCCESS {
|
||||
ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0])
|
||||
if ret == ERROR_SUCCESS {
|
||||
items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount]
|
||||
values := make([]CounterValue, 0, itemCount)
|
||||
for _, item := range items {
|
||||
@@ -182,29 +181,17 @@ func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
|
||||
}
|
||||
|
||||
func (m *PerformanceQueryImpl) CollectData() error {
|
||||
var ret uint32
|
||||
if m.query == 0 {
|
||||
return errors.New("uninitialised query")
|
||||
}
|
||||
|
||||
if ret = PdhCollectQueryData(m.query); ret != ERROR_SUCCESS {
|
||||
ret := PdhCollectQueryData(m.query)
|
||||
if ret != ERROR_SUCCESS {
|
||||
return NewPdhError(ret)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *PerformanceQueryImpl) CollectDataWithTime() (time.Time, error) {
|
||||
if m.query == 0 {
|
||||
return time.Now(), errors.New("uninitialised query")
|
||||
}
|
||||
ret, mtime := PdhCollectQueryDataWithTime(m.query)
|
||||
if ret != ERROR_SUCCESS {
|
||||
return time.Now(), NewPdhError(ret)
|
||||
}
|
||||
return mtime, nil
|
||||
}
|
||||
|
||||
func (m *PerformanceQueryImpl) IsVistaOrNewer() bool {
|
||||
func (m *PerformanceQueryImpl) AddEnglishCounterSupported() bool {
|
||||
return PdhAddEnglishCounterSupported()
|
||||
}
|
||||
|
||||
|
||||
@@ -23,8 +23,6 @@ var sampleConfig = `
|
||||
## agent, it will not be gathered.
|
||||
## Settings:
|
||||
# PrintValid = false # Print All matching performance counters
|
||||
# Whether request a timestamp along with the PerfCounter data or just use current time
|
||||
# UsePerfCounterTime=true
|
||||
# If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded
|
||||
# and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names.
|
||||
# If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names.
|
||||
@@ -80,7 +78,6 @@ type Win_PerfCounters struct {
|
||||
PrintValid bool
|
||||
//deprecated: determined dynamically
|
||||
PreVistaSupport bool
|
||||
UsePerfCounterTime bool
|
||||
Object []perfobject
|
||||
CountersRefreshInterval internal.Duration
|
||||
UseWildcardsExpansion bool
|
||||
@@ -110,12 +107,6 @@ type counter struct {
|
||||
counterHandle PDH_HCOUNTER
|
||||
}
|
||||
|
||||
type instanceGrouping struct {
|
||||
name string
|
||||
instance string
|
||||
objectname string
|
||||
}
|
||||
|
||||
var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec",
|
||||
" ", "_", "%", "Percent", `\`, "")
|
||||
|
||||
@@ -156,7 +147,7 @@ func (m *Win_PerfCounters) SampleConfig() string {
|
||||
func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error {
|
||||
var err error
|
||||
var counterHandle PDH_HCOUNTER
|
||||
if !m.query.IsVistaOrNewer() {
|
||||
if !m.query.AddEnglishCounterSupported() {
|
||||
counterHandle, err = m.query.AddCounterToQuery(counterPath)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -258,15 +249,18 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
|
||||
m.counters = m.counters[:0]
|
||||
}
|
||||
|
||||
if err = m.query.Open(); err != nil {
|
||||
err = m.query.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = m.ParseConfig(); err != nil {
|
||||
err = m.ParseConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//some counters need two data samples before computing a value
|
||||
if err = m.query.CollectData(); err != nil {
|
||||
err = m.query.CollectData()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.lastRefreshed = time.Now()
|
||||
@@ -274,31 +268,37 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
var collectFields = make(map[instanceGrouping]map[string]interface{})
|
||||
|
||||
var timestamp time.Time
|
||||
if m.UsePerfCounterTime && m.query.IsVistaOrNewer() {
|
||||
timestamp, err = m.query.CollectDataWithTime()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
timestamp = time.Now()
|
||||
if err = m.query.CollectData(); err != nil {
|
||||
return err
|
||||
}
|
||||
type InstanceGrouping struct {
|
||||
name string
|
||||
instance string
|
||||
objectname string
|
||||
}
|
||||
|
||||
var collectFields = make(map[InstanceGrouping]map[string]interface{})
|
||||
|
||||
err = m.query.CollectData()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// For iterate over the known metrics and get the samples.
|
||||
for _, metric := range m.counters {
|
||||
// collect
|
||||
if m.UseWildcardsExpansion {
|
||||
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
|
||||
if err == nil {
|
||||
addCounterMeasurement(metric, metric.instance, value, collectFields)
|
||||
measurement := sanitizedChars.Replace(metric.measurement)
|
||||
if measurement == "" {
|
||||
measurement = "win_perf_counters"
|
||||
}
|
||||
|
||||
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
|
||||
if collectFields[instance] == nil {
|
||||
collectFields[instance] = make(map[string]interface{})
|
||||
}
|
||||
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
|
||||
} else {
|
||||
//ignore invalid data as some counters from process instances returns this sometimes
|
||||
if !isKnownCounterDataError(err) {
|
||||
//ignore invalid data from as some counters from process instances returns this sometimes
|
||||
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
|
||||
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
|
||||
}
|
||||
}
|
||||
@@ -326,14 +326,18 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
|
||||
}
|
||||
|
||||
if add {
|
||||
addCounterMeasurement(metric, cValue.InstanceName, cValue.Value, collectFields)
|
||||
measurement := sanitizedChars.Replace(metric.measurement)
|
||||
if measurement == "" {
|
||||
measurement = "win_perf_counters"
|
||||
}
|
||||
var instance = InstanceGrouping{measurement, cValue.InstanceName, metric.objectName}
|
||||
|
||||
if collectFields[instance] == nil {
|
||||
collectFields[instance] = make(map[string]interface{})
|
||||
}
|
||||
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(cValue.Value)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//ignore invalid data as some counters from process instances returns this sometimes
|
||||
if !isKnownCounterDataError(err) {
|
||||
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -345,33 +349,12 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
|
||||
if len(instance.instance) > 0 {
|
||||
tags["instance"] = instance.instance
|
||||
}
|
||||
acc.AddFields(instance.name, fields, tags, timestamp)
|
||||
acc.AddFields(instance.name, fields, tags)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func addCounterMeasurement(metric *counter, instanceName string, value float64, collectFields map[instanceGrouping]map[string]interface{}) {
|
||||
measurement := sanitizedChars.Replace(metric.measurement)
|
||||
if measurement == "" {
|
||||
measurement = "win_perf_counters"
|
||||
}
|
||||
var instance = instanceGrouping{measurement, instanceName, metric.objectName}
|
||||
if collectFields[instance] == nil {
|
||||
collectFields[instance] = make(map[string]interface{})
|
||||
}
|
||||
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
|
||||
}
|
||||
|
||||
func isKnownCounterDataError(err error) bool {
|
||||
if pdhErr, ok := err.(*PdhError); ok && (pdhErr.ErrorCode == PDH_INVALID_DATA ||
|
||||
pdhErr.ErrorCode == PDH_CALC_NEGATIVE_VALUE ||
|
||||
pdhErr.ErrorCode == PDH_CSTATUS_INVALID_DATA) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("win_perf_counters", func() telegraf.Input {
|
||||
return &Win_PerfCounters{query: &PerformanceQueryImpl{}, CountersRefreshInterval: internal.Duration{Duration: time.Second * 60}}
|
||||
|
||||
@@ -70,11 +70,6 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
|
||||
_, err = query.GetFormattedCounterValueDouble(hCounter)
|
||||
require.NoError(t, err)
|
||||
|
||||
now := time.Now()
|
||||
mtime, err := query.CollectDataWithTime()
|
||||
require.NoError(t, err)
|
||||
assert.True(t, mtime.Sub(now) < time.Second)
|
||||
|
||||
counterPath = "\\Process(*)\\% Processor Time"
|
||||
paths, err := query.ExpandWildCardPath(counterPath)
|
||||
require.NoError(t, err)
|
||||
@@ -103,10 +98,6 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
arr, err := query.GetFormattedCounterArrayDouble(hCounter)
|
||||
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
|
||||
time.Sleep(time.Second)
|
||||
arr, err = query.GetFormattedCounterArrayDouble(hCounter)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
assert.True(t, len(arr) > 0, "Too")
|
||||
|
||||
@@ -605,7 +596,7 @@ func TestWinPerfcountersCollect2(t *testing.T) {
|
||||
|
||||
perfobjects[0] = PerfObject
|
||||
|
||||
m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
|
||||
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
|
||||
var acc testutil.Accumulator
|
||||
err := m.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -19,14 +19,12 @@ type testCounter struct {
|
||||
value float64
|
||||
}
|
||||
type FakePerformanceQuery struct {
|
||||
counters map[string]testCounter
|
||||
vistaAndNewer bool
|
||||
expandPaths map[string][]string
|
||||
openCalled bool
|
||||
counters map[string]testCounter
|
||||
addEnglishSupported bool
|
||||
expandPaths map[string][]string
|
||||
openCalled bool
|
||||
}
|
||||
|
||||
var MetricTime = time.Date(2018, 5, 28, 12, 0, 0, 0, time.UTC)
|
||||
|
||||
func (m *testCounter) ToCounterValue() *CounterValue {
|
||||
_, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path)
|
||||
if inst == "" {
|
||||
@@ -104,10 +102,8 @@ func (m *FakePerformanceQuery) GetFormattedCounterValueDouble(counterHandle PDH_
|
||||
} else {
|
||||
if counter.value == 0 {
|
||||
return 0, NewPdhError(PDH_INVALID_DATA)
|
||||
} else if counter.value == -1 {
|
||||
return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
|
||||
} else {
|
||||
return 0, NewPdhError(PDH_ACCESS_DENIED)
|
||||
return 0, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -142,18 +138,8 @@ func (m *FakePerformanceQuery) GetFormattedCounterArrayDouble(hCounter PDH_HCOUN
|
||||
counters := make([]CounterValue, 0, len(e))
|
||||
for _, p := range e {
|
||||
counter := m.findCounterByPath(p)
|
||||
if counter != nil {
|
||||
if counter.value > 0 {
|
||||
counters = append(counters, *counter.ToCounterValue())
|
||||
} else {
|
||||
if counter.value == 0 {
|
||||
return nil, NewPdhError(PDH_INVALID_DATA)
|
||||
} else if counter.value == -1 {
|
||||
return nil, NewPdhError(PDH_CALC_NEGATIVE_VALUE)
|
||||
} else {
|
||||
return nil, NewPdhError(PDH_ACCESS_DENIED)
|
||||
}
|
||||
}
|
||||
if counter != nil && counter.value > 0 {
|
||||
counters = append(counters, *counter.ToCounterValue())
|
||||
} else {
|
||||
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p)
|
||||
}
|
||||
@@ -174,15 +160,8 @@ func (m *FakePerformanceQuery) CollectData() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *FakePerformanceQuery) CollectDataWithTime() (time.Time, error) {
|
||||
if !m.openCalled {
|
||||
return time.Now(), errors.New("CollectData: uninitialised query")
|
||||
}
|
||||
return MetricTime, nil
|
||||
}
|
||||
|
||||
func (m *FakePerformanceQuery) IsVistaOrNewer() bool {
|
||||
return m.vistaAndNewer
|
||||
func (m *FakePerformanceQuery) AddEnglishCounterSupported() bool {
|
||||
return m.addEnglishSupported
|
||||
}
|
||||
|
||||
func createPerfObject(measurement string, object string, instances []string, counters []string, failOnMissing bool, includeTotal bool) []perfobject {
|
||||
@@ -219,7 +198,7 @@ func TestAddItemSimple(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
cps1[0]: cps1,
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -237,7 +216,7 @@ func TestAddItemInvalidCountPath(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
cps1[0]: {"\\O/C"},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -259,7 +238,7 @@ func TestParseConfigBasic(t *testing.T) {
|
||||
cps1[2]: {cps1[2]},
|
||||
cps1[3]: {cps1[3]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -291,7 +270,7 @@ func TestParseConfigNoInstance(t *testing.T) {
|
||||
cps1[0]: {cps1[0]},
|
||||
cps1[1]: {cps1[1]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -324,7 +303,7 @@ func TestParseConfigInvalidCounterError(t *testing.T) {
|
||||
cps1[1]: {cps1[1]},
|
||||
cps1[2]: {cps1[2]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -355,7 +334,7 @@ func TestParseConfigInvalidCounterNoError(t *testing.T) {
|
||||
cps1[1]: {cps1[1]},
|
||||
cps1[2]: {cps1[2]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -385,7 +364,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
"\\O(*)\\*": cps1,
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -402,7 +381,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
"\\O(*)\\*": cps1,
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -422,7 +401,7 @@ func TestParseConfigExpand(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
"\\O(*)\\*": cps1,
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
err = m.query.Open()
|
||||
require.NoError(t, err)
|
||||
@@ -446,7 +425,7 @@ func TestSimpleGather(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
cp1: {cp1},
|
||||
},
|
||||
vistaAndNewer: false,
|
||||
addEnglishSupported: false,
|
||||
}}
|
||||
var acc1 testutil.Accumulator
|
||||
err = m.Gather(&acc1)
|
||||
@@ -470,65 +449,7 @@ func TestSimpleGather(t *testing.T) {
|
||||
err = m.Gather(&acc2)
|
||||
require.NoError(t, err)
|
||||
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
|
||||
}
|
||||
|
||||
func TestSimpleGatherWithTimestamp(t *testing.T) {
|
||||
var err error
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping long taking test in short mode")
|
||||
}
|
||||
measurement := "test"
|
||||
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
|
||||
cp1 := "\\O(I)\\C"
|
||||
m := Win_PerfCounters{PrintValid: false, UsePerfCounterTime: true, Object: perfObjects, query: &FakePerformanceQuery{
|
||||
counters: createCounterMap([]string{cp1}, []float64{1.2}),
|
||||
expandPaths: map[string][]string{
|
||||
cp1: {cp1},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
}}
|
||||
var acc1 testutil.Accumulator
|
||||
err = m.Gather(&acc1)
|
||||
require.NoError(t, err)
|
||||
|
||||
fields1 := map[string]interface{}{
|
||||
"C": float32(1.2),
|
||||
}
|
||||
tags1 := map[string]string{
|
||||
"instance": "I",
|
||||
"objectname": "O",
|
||||
}
|
||||
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
|
||||
assert.True(t, acc1.HasTimestamp(measurement, MetricTime))
|
||||
}
|
||||
|
||||
func TestGatherError(t *testing.T) {
|
||||
var err error
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping long taking test in short mode")
|
||||
}
|
||||
measurement := "test"
|
||||
perfObjects := createPerfObject(measurement, "O", []string{"I"}, []string{"C"}, false, false)
|
||||
cp1 := "\\O(I)\\C"
|
||||
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
|
||||
counters: createCounterMap([]string{cp1}, []float64{-2}),
|
||||
expandPaths: map[string][]string{
|
||||
cp1: {cp1},
|
||||
},
|
||||
vistaAndNewer: false,
|
||||
}}
|
||||
var acc1 testutil.Accumulator
|
||||
err = m.Gather(&acc1)
|
||||
require.Error(t, err)
|
||||
|
||||
m.UseWildcardsExpansion = true
|
||||
m.counters = nil
|
||||
m.lastRefreshed = time.Time{}
|
||||
|
||||
var acc2 testutil.Accumulator
|
||||
|
||||
err = m.Gather(&acc2)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGatherInvalidDataIgnore(t *testing.T) {
|
||||
@@ -546,7 +467,7 @@ func TestGatherInvalidDataIgnore(t *testing.T) {
|
||||
cps1[1]: {cps1[1]},
|
||||
cps1[2]: {cps1[2]},
|
||||
},
|
||||
vistaAndNewer: false,
|
||||
addEnglishSupported: false,
|
||||
}}
|
||||
var acc1 testutil.Accumulator
|
||||
err = m.Gather(&acc1)
|
||||
@@ -585,7 +506,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
"\\O(*)\\*": cps1,
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}
|
||||
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
|
||||
var acc1 testutil.Accumulator
|
||||
@@ -619,7 +540,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
|
||||
expandPaths: map[string][]string{
|
||||
"\\O(*)\\*": cps2,
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}
|
||||
m.query = fpm
|
||||
fpm.Open()
|
||||
@@ -671,7 +592,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
|
||||
"\\O(*)\\C1": {cps1[0], cps1[2]},
|
||||
"\\O(*)\\C2": {cps1[1], cps1[3]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}
|
||||
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
|
||||
var acc1 testutil.Accumulator
|
||||
@@ -707,7 +628,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
|
||||
"\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]},
|
||||
"\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}
|
||||
m.query = fpm
|
||||
fpm.Open()
|
||||
@@ -741,7 +662,7 @@ func TestGatherRefreshingWithoutExpansion(t *testing.T) {
|
||||
"\\O(*)\\C2": {cps3[1], cps3[4]},
|
||||
"\\O(*)\\C3": {cps3[2], cps3[5]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}
|
||||
m.query = fpm
|
||||
m.Object = perfObjects
|
||||
@@ -789,7 +710,7 @@ func TestGatherTotalNoExpansion(t *testing.T) {
|
||||
"\\O(*)\\C1": {cps1[0], cps1[2]},
|
||||
"\\O(*)\\C2": {cps1[1], cps1[3]},
|
||||
},
|
||||
vistaAndNewer: true,
|
||||
addEnglishSupported: true,
|
||||
}}
|
||||
var acc1 testutil.Accumulator
|
||||
err = m.Gather(&acc1)
|
||||
|
||||
@@ -22,7 +22,6 @@ var (
|
||||
`%`, "-",
|
||||
"#", "-",
|
||||
"$", "-")
|
||||
defaultHttpPath = "/api/put"
|
||||
defaultSeperator = "_"
|
||||
)
|
||||
|
||||
@@ -32,8 +31,7 @@ type OpenTSDB struct {
|
||||
Host string
|
||||
Port int
|
||||
|
||||
HttpBatchSize int // deprecated httpBatchSize form in 1.8
|
||||
HttpPath string
|
||||
HttpBatchSize int
|
||||
|
||||
Debug bool
|
||||
|
||||
@@ -54,11 +52,7 @@ var sampleConfig = `
|
||||
|
||||
## Number of data points to send to OpenTSDB in Http requests.
|
||||
## Not used with telnet API.
|
||||
http_batch_size = 50
|
||||
|
||||
## URI Path for Http requests to OpenTSDB.
|
||||
## Used in cases where OpenTSDB is located behind a reverse proxy.
|
||||
http_path = "/api/put"
|
||||
httpBatchSize = 50
|
||||
|
||||
## Debug true - Prints OpenTSDB communication
|
||||
debug = false
|
||||
@@ -127,7 +121,6 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
|
||||
Scheme: u.Scheme,
|
||||
User: u.User,
|
||||
BatchSize: o.HttpBatchSize,
|
||||
Path: o.HttpPath,
|
||||
Debug: o.Debug,
|
||||
}
|
||||
|
||||
@@ -267,7 +260,6 @@ func sanitize(value string) string {
|
||||
func init() {
|
||||
outputs.Add("opentsdb", func() telegraf.Output {
|
||||
return &OpenTSDB{
|
||||
HttpPath: defaultHttpPath,
|
||||
Separator: defaultSeperator,
|
||||
}
|
||||
})
|
||||
|
||||
@@ -26,7 +26,6 @@ type openTSDBHttp struct {
|
||||
Scheme string
|
||||
User *url.Userinfo
|
||||
BatchSize int
|
||||
Path string
|
||||
Debug bool
|
||||
|
||||
metricCounter int
|
||||
@@ -124,7 +123,7 @@ func (o *openTSDBHttp) flush() error {
|
||||
Scheme: o.Scheme,
|
||||
User: o.User,
|
||||
Host: fmt.Sprintf("%s:%d", o.Host, o.Port),
|
||||
Path: o.Path,
|
||||
Path: "/api/put",
|
||||
}
|
||||
|
||||
if o.Debug {
|
||||
|
||||
@@ -156,7 +156,6 @@ func BenchmarkHttpSend(b *testing.B) {
|
||||
Port: port,
|
||||
Prefix: "",
|
||||
HttpBatchSize: BatchSize,
|
||||
HttpPath: "/api/put",
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -2,7 +2,6 @@ package all
|
||||
|
||||
import (
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/converter"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/enum"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
# Enum Processor Plugin
|
||||
|
||||
The Enum Processor allows the configuration of value mappings for metric fields.
|
||||
The main use-case for this is to rewrite status codes such as _red_, _amber_ and
|
||||
_green_ by numeric values such as 0, 1, 2. The plugin supports string and bool
|
||||
types for the field values. Multiple Fields can be configured with separate
|
||||
value mappings for each field. Default mapping values can be configured to be
|
||||
used for all values, which are not contained in the value_mappings. The
|
||||
processor supports explicit configuration of a destination field. By default the
|
||||
source field is overwritten.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
[[processors.enum]]
|
||||
[[processors.enum.fields]]
|
||||
## Name of the field to map
|
||||
source = "name"
|
||||
|
||||
## Destination field to be used for the mapped value. By default the source
|
||||
## field is used, overwriting the original value.
|
||||
# destination = "mapped"
|
||||
|
||||
## Default value to be used for all values not contained in the mapping
|
||||
## table. When unset, the unmodified value for the field will be used if no
|
||||
## match is found.
|
||||
# default = 0
|
||||
|
||||
## Table of mappings
|
||||
[processors.enum.fields.value_mappings]
|
||||
value1 = 1
|
||||
value2 = 2
|
||||
```
|
||||
@@ -1,104 +0,0 @@
|
||||
package enum
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
)
|
||||
|
||||
var sampleConfig = `
|
||||
[[processors.enum.fields]]
|
||||
## Name of the field to map
|
||||
source = "name"
|
||||
|
||||
## Destination field to be used for the mapped value. By default the source
|
||||
## field is used, overwriting the original value.
|
||||
# destination = "mapped"
|
||||
|
||||
## Default value to be used for all values not contained in the mapping
|
||||
## table. When unset, the unmodified value for the field will be used if no
|
||||
## match is found.
|
||||
# default = 0
|
||||
|
||||
## Table of mappings
|
||||
[processors.enum.fields.value_mappings]
|
||||
value1 = 1
|
||||
value2 = 2
|
||||
`
|
||||
|
||||
type EnumMapper struct {
|
||||
Fields []Mapping
|
||||
}
|
||||
|
||||
type Mapping struct {
|
||||
Source string
|
||||
Destination string
|
||||
Default interface{}
|
||||
ValueMappings map[string]interface{}
|
||||
}
|
||||
|
||||
func (mapper *EnumMapper) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (mapper *EnumMapper) Description() string {
|
||||
return "Map enum values according to given table."
|
||||
}
|
||||
|
||||
func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||
for i := 0; i < len(in); i++ {
|
||||
in[i] = mapper.applyMappings(in[i])
|
||||
}
|
||||
return in
|
||||
}
|
||||
|
||||
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
|
||||
for _, mapping := range mapper.Fields {
|
||||
if originalValue, isPresent := metric.GetField(mapping.Source); isPresent == true {
|
||||
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
|
||||
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
|
||||
writeField(metric, mapping.getDestination(), mappedValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return metric
|
||||
}
|
||||
|
||||
func adjustBoolValue(in interface{}) interface{} {
|
||||
if mappedBool, isBool := in.(bool); isBool == true {
|
||||
return strconv.FormatBool(mappedBool)
|
||||
}
|
||||
return in
|
||||
}
|
||||
|
||||
func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
|
||||
if mapped, found := mapping.ValueMappings[original]; found == true {
|
||||
return mapped, true
|
||||
}
|
||||
if mapping.Default != nil {
|
||||
return mapping.Default, true
|
||||
}
|
||||
return original, false
|
||||
}
|
||||
|
||||
func (mapping *Mapping) getDestination() string {
|
||||
if mapping.Destination != "" {
|
||||
return mapping.Destination
|
||||
}
|
||||
return mapping.Source
|
||||
}
|
||||
|
||||
func writeField(metric telegraf.Metric, name string, value interface{}) {
|
||||
if metric.HasField(name) {
|
||||
metric.RemoveField(name)
|
||||
}
|
||||
metric.AddField(name, value)
|
||||
}
|
||||
|
||||
func init() {
|
||||
processors.Add("enum", func() telegraf.Processor {
|
||||
return &EnumMapper{}
|
||||
})
|
||||
}
|
||||
@@ -1,106 +0,0 @@
|
||||
package enum
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func createTestMetric() telegraf.Metric {
|
||||
metric, _ := metric.New("m1",
|
||||
map[string]string{"tag": "tag_value"},
|
||||
map[string]interface{}{
|
||||
"string_value": "test",
|
||||
"int_value": int(13),
|
||||
"true_value": true,
|
||||
},
|
||||
time.Now(),
|
||||
)
|
||||
return metric
|
||||
}
|
||||
|
||||
func calculateProcessedValues(mapper EnumMapper, metric telegraf.Metric) map[string]interface{} {
|
||||
processed := mapper.Apply(metric)
|
||||
return processed[0].Fields()
|
||||
}
|
||||
|
||||
func assertFieldValue(t *testing.T, expected interface{}, field string, fields map[string]interface{}) {
|
||||
value, present := fields[field]
|
||||
assert.True(t, present, "value of field '"+field+"' was not present")
|
||||
assert.EqualValues(t, expected, value)
|
||||
}
|
||||
|
||||
func TestRetainsMetric(t *testing.T) {
|
||||
mapper := EnumMapper{}
|
||||
source := createTestMetric()
|
||||
|
||||
target := mapper.Apply(source)[0]
|
||||
fields := target.Fields()
|
||||
|
||||
assertFieldValue(t, "test", "string_value", fields)
|
||||
assertFieldValue(t, 13, "int_value", fields)
|
||||
assertFieldValue(t, true, "true_value", fields)
|
||||
assert.Equal(t, "m1", target.Name())
|
||||
assert.Equal(t, source.Tags(), target.Tags())
|
||||
assert.Equal(t, source.Time(), target.Time())
|
||||
}
|
||||
|
||||
func TestMapsSingleStringValue(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, 1, "string_value", fields)
|
||||
}
|
||||
|
||||
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, 13, "int_value", fields)
|
||||
}
|
||||
|
||||
func TestMapSingleBoolValue(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, 1, "true_value", fields)
|
||||
}
|
||||
|
||||
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, 42, "string_value", fields)
|
||||
}
|
||||
|
||||
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, 1, "string_value", fields)
|
||||
}
|
||||
|
||||
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, "test", "string_value", fields)
|
||||
}
|
||||
|
||||
func TestWritesToDestination(t *testing.T) {
|
||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Destination: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, "test", "string_value", fields)
|
||||
assertFieldValue(t, 1, "string_code", fields)
|
||||
}
|
||||
Reference in New Issue
Block a user