Compare commits
16 Commits
bugfix/433
...
bugfix/437
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71a93ed6b1 | ||
|
|
a7fb1c280f | ||
|
|
2e724d8d02 | ||
|
|
bf076dab73 | ||
|
|
500e136844 | ||
|
|
515ff03364 | ||
|
|
c389a68f19 | ||
|
|
d69f833917 | ||
|
|
9106011f58 | ||
|
|
b5abf2c577 | ||
|
|
38c0628190 | ||
|
|
0ef12f87af | ||
|
|
cba87212d3 | ||
|
|
c5fa6729d3 | ||
|
|
4e440b36fd | ||
|
|
2da223390a |
10
CHANGELOG.md
10
CHANGELOG.md
@@ -6,6 +6,10 @@
|
|||||||
|
|
||||||
- [tengine](./plugins/inputs/tengine/README.md) - Contributed by @ertaoxu
|
- [tengine](./plugins/inputs/tengine/README.md) - Contributed by @ertaoxu
|
||||||
|
|
||||||
|
### New Processors
|
||||||
|
|
||||||
|
- [enum](./plugins/processors/enum/README.md) - Contributed by @KarstenSchnitter
|
||||||
|
|
||||||
### New Aggregators
|
### New Aggregators
|
||||||
|
|
||||||
- [valuecounter](./plugins/aggregators/valuecounter/README.md) - Contributed by @piotr1212
|
- [valuecounter](./plugins/aggregators/valuecounter/README.md) - Contributed by @piotr1212
|
||||||
@@ -26,8 +30,11 @@
|
|||||||
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
|
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
|
||||||
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
|
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
|
||||||
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
|
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
|
||||||
|
- [#4352](https://github.com/influxdata/telegraf/pull/4352): Gather IPMI metrics concurrently.
|
||||||
|
- [#4362](https://github.com/influxdata/telegraf/pull/4362): Add mongo document and connection metrics.
|
||||||
|
- [#3772](https://github.com/influxdata/telegraf/pull/3772): Add Enum Processor.
|
||||||
|
|
||||||
## v1.7.1 [unreleased]
|
## v1.7.1 [2018-07-03]
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
@@ -38,6 +45,7 @@
|
|||||||
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
|
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
|
||||||
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
|
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
|
||||||
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
|
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
|
||||||
|
- [#4316](https://github.com/influxdata/telegraf/issues/4316): Fix incorrect precision being applied to metric in http_listener.
|
||||||
|
|
||||||
## v1.7 [2018-06-12]
|
## v1.7 [2018-06-12]
|
||||||
|
|
||||||
|
|||||||
@@ -343,6 +343,9 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
|
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
|
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
|
||||||
h.handler.SetTimeFunc(func() time.Time { return t })
|
h.handler.SetTimeFunc(func() time.Time { return t })
|
||||||
metrics, err := h.parser.Parse(b)
|
metrics, err := h.parser.Parse(b)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
@@ -61,13 +62,18 @@ func (m *Ipmi) Gather(acc telegraf.Accumulator) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(m.Servers) > 0 {
|
if len(m.Servers) > 0 {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
for _, server := range m.Servers {
|
for _, server := range m.Servers {
|
||||||
err := m.parse(acc, server)
|
wg.Add(1)
|
||||||
|
go func(a telegraf.Accumulator, s string) {
|
||||||
|
defer wg.Done()
|
||||||
|
err := m.parse(a, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(err)
|
a.AddError(err)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
}(acc, server)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
} else {
|
} else {
|
||||||
err := m.parse(acc, "")
|
err := m.parse(acc, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -53,6 +53,13 @@ var DefaultStats = map[string]string{
|
|||||||
"cursor_no_timeout": "NoTimeoutC",
|
"cursor_no_timeout": "NoTimeoutC",
|
||||||
"cursor_pinned": "PinnedC",
|
"cursor_pinned": "PinnedC",
|
||||||
"cursor_total": "TotalC",
|
"cursor_total": "TotalC",
|
||||||
|
"document_deleted": "DeletedD",
|
||||||
|
"document_inserted": "InsertedD",
|
||||||
|
"document_returned": "ReturnedD",
|
||||||
|
"document_updated": "UpdatedD",
|
||||||
|
"connections_current": "CurrentC",
|
||||||
|
"connections_available": "AvailableC",
|
||||||
|
"connections_total_created": "TotalCreatedC",
|
||||||
}
|
}
|
||||||
|
|
||||||
var DefaultReplStats = map[string]string{
|
var DefaultReplStats = map[string]string{
|
||||||
|
|||||||
@@ -38,6 +38,13 @@ func TestAddNonReplStats(t *testing.T) {
|
|||||||
NoTimeoutC: 0,
|
NoTimeoutC: 0,
|
||||||
PinnedC: 0,
|
PinnedC: 0,
|
||||||
TotalC: 0,
|
TotalC: 0,
|
||||||
|
DeletedD: 0,
|
||||||
|
InsertedD: 0,
|
||||||
|
ReturnedD: 0,
|
||||||
|
UpdatedD: 0,
|
||||||
|
CurrentC: 0,
|
||||||
|
AvailableC: 0,
|
||||||
|
TotalCreatedC: 0,
|
||||||
},
|
},
|
||||||
tags,
|
tags,
|
||||||
)
|
)
|
||||||
@@ -219,6 +226,13 @@ func TestStateTag(t *testing.T) {
|
|||||||
"cursor_no_timeout": int64(0),
|
"cursor_no_timeout": int64(0),
|
||||||
"cursor_pinned": int64(0),
|
"cursor_pinned": int64(0),
|
||||||
"cursor_total": int64(0),
|
"cursor_total": int64(0),
|
||||||
|
"document_deleted": int64(0),
|
||||||
|
"document_inserted": int64(0),
|
||||||
|
"document_returned": int64(0),
|
||||||
|
"document_updated": int64(0),
|
||||||
|
"connections_current": int64(0),
|
||||||
|
"connections_available": int64(0),
|
||||||
|
"connections_total_created": int64(0),
|
||||||
}
|
}
|
||||||
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
|
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -225,7 +225,7 @@ type FlushStats struct {
|
|||||||
type ConnectionStats struct {
|
type ConnectionStats struct {
|
||||||
Current int64 `bson:"current"`
|
Current int64 `bson:"current"`
|
||||||
Available int64 `bson:"available"`
|
Available int64 `bson:"available"`
|
||||||
TotalCreated int64 `bson:"totalCreated"`
|
TotalCreated int64 `bson:"total_created"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DurTiming stores information related to journaling.
|
// DurTiming stores information related to journaling.
|
||||||
@@ -291,6 +291,7 @@ type OpcountStats struct {
|
|||||||
type MetricsStats struct {
|
type MetricsStats struct {
|
||||||
TTL *TTLStats `bson:"ttl"`
|
TTL *TTLStats `bson:"ttl"`
|
||||||
Cursor *CursorStats `bson:"cursor"`
|
Cursor *CursorStats `bson:"cursor"`
|
||||||
|
Document *DocumentStats `bson:"document"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TTLStats stores information related to documents with a ttl index.
|
// TTLStats stores information related to documents with a ttl index.
|
||||||
@@ -305,6 +306,14 @@ type CursorStats struct {
|
|||||||
Open *OpenCursorStats `bson:"open"`
|
Open *OpenCursorStats `bson:"open"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DocumentStats stores information related to document metrics.
|
||||||
|
type DocumentStats struct {
|
||||||
|
Deleted int64 `bson:"deleted"`
|
||||||
|
Inserted int64 `bson:"inserted"`
|
||||||
|
Returned int64 `bson:"returned"`
|
||||||
|
Updated int64 `bson:"updated"`
|
||||||
|
}
|
||||||
|
|
||||||
// OpenCursorStats stores information related to open cursor metrics
|
// OpenCursorStats stores information related to open cursor metrics
|
||||||
type OpenCursorStats struct {
|
type OpenCursorStats struct {
|
||||||
NoTimeout int64 `bson:"noTimeout"`
|
NoTimeout int64 `bson:"noTimeout"`
|
||||||
@@ -457,6 +466,12 @@ type StatLine struct {
|
|||||||
TimedOutC int64
|
TimedOutC int64
|
||||||
NoTimeoutC, PinnedC, TotalC int64
|
NoTimeoutC, PinnedC, TotalC int64
|
||||||
|
|
||||||
|
// Document fields
|
||||||
|
DeletedD, InsertedD, ReturnedD, UpdatedD int64
|
||||||
|
|
||||||
|
// Connection fields
|
||||||
|
CurrentC, AvailableC, TotalCreatedC int64
|
||||||
|
|
||||||
// Collection locks (3.0 mmap only)
|
// Collection locks (3.0 mmap only)
|
||||||
CollectionLocks *CollectionLockStatus
|
CollectionLocks *CollectionLockStatus
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
# Procstat Input Plugin
|
# Procstat Input Plugin
|
||||||
|
|
||||||
The procstat plugin can be used to monitor the system resource usage of one or more processes.
|
The procstat plugin can be used to monitor the system resource usage of one or more processes.
|
||||||
|
The procstat_lookup metric displays the query information,
|
||||||
|
specifically the number of PIDs returned on a search
|
||||||
|
|
||||||
Processes can be selected for monitoring using one of several methods:
|
Processes can be selected for monitoring using one of several methods:
|
||||||
- pidfile
|
- pidfile
|
||||||
@@ -127,7 +129,18 @@ implemented as a WMI query. The pattern allows fuzzy matching using only
|
|||||||
- voluntary_context_switches (int)
|
- voluntary_context_switches (int)
|
||||||
- write_bytes (int, *telegraf* may need to be ran as **root**)
|
- write_bytes (int, *telegraf* may need to be ran as **root**)
|
||||||
- write_count (int, *telegraf* may need to be ran as **root**)
|
- write_count (int, *telegraf* may need to be ran as **root**)
|
||||||
|
- procstat_lookup
|
||||||
|
- tags:
|
||||||
|
- exe (string)
|
||||||
|
- pid_finder (string)
|
||||||
|
- pid_file (string)
|
||||||
|
- pattern (string)
|
||||||
|
- prefix (string)
|
||||||
|
- user (string)
|
||||||
|
- systemd_unit (string)
|
||||||
|
- cgroup (string)
|
||||||
|
- fields:
|
||||||
|
- pid_count (int)
|
||||||
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
|
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
|
||||||
|
|
||||||
### Example Output:
|
### Example Output:
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package syslog
|
package syslog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -280,51 +279,20 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
|||||||
conn.Close()
|
conn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
|
||||||
data := &bytes.Buffer{}
|
|
||||||
|
|
||||||
// read the data
|
|
||||||
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
||||||
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := io.Copy(data, conn)
|
|
||||||
if err != nil {
|
|
||||||
// read timeout reached, parse what we have
|
|
||||||
if er, ok := err.(net.Error); ok && er.Timeout() {
|
|
||||||
if n == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
goto parseMsg
|
|
||||||
}
|
|
||||||
// client has closed connection, return
|
|
||||||
if err == io.EOF {
|
|
||||||
if n > 0 {
|
|
||||||
goto parseMsg
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// other error, log and return
|
|
||||||
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// handle client disconnect
|
|
||||||
if n == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
parseMsg:
|
|
||||||
var p *rfc5425.Parser
|
var p *rfc5425.Parser
|
||||||
if s.BestEffort {
|
if s.BestEffort {
|
||||||
p = rfc5425.NewParser(data, rfc5425.WithBestEffort())
|
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort())
|
||||||
} else {
|
} else {
|
||||||
p = rfc5425.NewParser(data)
|
p = rfc5425.NewParser(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.ParseExecuting(func(r *rfc5425.Result) {
|
p.ParseExecuting(func(r *rfc5425.Result) {
|
||||||
s.store(*r, acc)
|
s.store(*r, acc)
|
||||||
})
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
|
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
|
||||||
@@ -393,7 +361,7 @@ func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if msg.Message() != nil {
|
if msg.Message() != nil {
|
||||||
flds["message"] = strings.TrimSpace(*msg.Message())
|
flds["message"] = *msg.Message()
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.StructuredData() != nil {
|
if msg.StructuredData() != nil {
|
||||||
|
|||||||
@@ -107,13 +107,13 @@ requests that are in the queue but not yet issued to the device driver.
|
|||||||
|
|
||||||
#### Calculate percent IO utilization per disk and host:
|
#### Calculate percent IO utilization per disk and host:
|
||||||
```
|
```
|
||||||
SELECT derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
SELECT non_negative_derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Calculate average queue depth:
|
#### Calculate average queue depth:
|
||||||
`iops_in_progress` will give you an instantaneous value. This will give you the average between polling intervals.
|
`iops_in_progress` will give you an instantaneous value. This will give you the average between polling intervals.
|
||||||
```
|
```
|
||||||
SELECT derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
SELECT non_negative_derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Example Output:
|
### Example Output:
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
The swap plugin collects system swap metrics.
|
The swap plugin collects system swap metrics.
|
||||||
|
|
||||||
For a more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
|
For more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
|
||||||
|
|
||||||
### Configuration:
|
### Configuration:
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package all
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/converter"
|
_ "github.com/influxdata/telegraf/plugins/processors/converter"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/enum"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||||
|
|||||||
33
plugins/processors/enum/README.md
Normal file
33
plugins/processors/enum/README.md
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
# Enum Processor Plugin
|
||||||
|
|
||||||
|
The Enum Processor allows the configuration of value mappings for metric fields.
|
||||||
|
The main use-case for this is to rewrite status codes such as _red_, _amber_ and
|
||||||
|
_green_ by numeric values such as 0, 1, 2. The plugin supports string and bool
|
||||||
|
types for the field values. Multiple Fields can be configured with separate
|
||||||
|
value mappings for each field. Default mapping values can be configured to be
|
||||||
|
used for all values, which are not contained in the value_mappings. The
|
||||||
|
processor supports explicit configuration of a destination field. By default the
|
||||||
|
source field is overwritten.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.enum]]
|
||||||
|
[[processors.enum.fields]]
|
||||||
|
## Name of the field to map
|
||||||
|
source = "name"
|
||||||
|
|
||||||
|
## Destination field to be used for the mapped value. By default the source
|
||||||
|
## field is used, overwriting the original value.
|
||||||
|
# destination = "mapped"
|
||||||
|
|
||||||
|
## Default value to be used for all values not contained in the mapping
|
||||||
|
## table. When unset, the unmodified value for the field will be used if no
|
||||||
|
## match is found.
|
||||||
|
# default = 0
|
||||||
|
|
||||||
|
## Table of mappings
|
||||||
|
[processors.enum.fields.value_mappings]
|
||||||
|
value1 = 1
|
||||||
|
value2 = 2
|
||||||
|
```
|
||||||
104
plugins/processors/enum/enum.go
Normal file
104
plugins/processors/enum/enum.go
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
package enum
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
[[processors.enum.fields]]
|
||||||
|
## Name of the field to map
|
||||||
|
source = "name"
|
||||||
|
|
||||||
|
## Destination field to be used for the mapped value. By default the source
|
||||||
|
## field is used, overwriting the original value.
|
||||||
|
# destination = "mapped"
|
||||||
|
|
||||||
|
## Default value to be used for all values not contained in the mapping
|
||||||
|
## table. When unset, the unmodified value for the field will be used if no
|
||||||
|
## match is found.
|
||||||
|
# default = 0
|
||||||
|
|
||||||
|
## Table of mappings
|
||||||
|
[processors.enum.fields.value_mappings]
|
||||||
|
value1 = 1
|
||||||
|
value2 = 2
|
||||||
|
`
|
||||||
|
|
||||||
|
type EnumMapper struct {
|
||||||
|
Fields []Mapping
|
||||||
|
}
|
||||||
|
|
||||||
|
type Mapping struct {
|
||||||
|
Source string
|
||||||
|
Destination string
|
||||||
|
Default interface{}
|
||||||
|
ValueMappings map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mapper *EnumMapper) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mapper *EnumMapper) Description() string {
|
||||||
|
return "Map enum values according to given table."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
for i := 0; i < len(in); i++ {
|
||||||
|
in[i] = mapper.applyMappings(in[i])
|
||||||
|
}
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
|
||||||
|
for _, mapping := range mapper.Fields {
|
||||||
|
if originalValue, isPresent := metric.GetField(mapping.Source); isPresent == true {
|
||||||
|
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
|
||||||
|
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
|
||||||
|
writeField(metric, mapping.getDestination(), mappedValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func adjustBoolValue(in interface{}) interface{} {
|
||||||
|
if mappedBool, isBool := in.(bool); isBool == true {
|
||||||
|
return strconv.FormatBool(mappedBool)
|
||||||
|
}
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
|
||||||
|
if mapped, found := mapping.ValueMappings[original]; found == true {
|
||||||
|
return mapped, true
|
||||||
|
}
|
||||||
|
if mapping.Default != nil {
|
||||||
|
return mapping.Default, true
|
||||||
|
}
|
||||||
|
return original, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mapping *Mapping) getDestination() string {
|
||||||
|
if mapping.Destination != "" {
|
||||||
|
return mapping.Destination
|
||||||
|
}
|
||||||
|
return mapping.Source
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeField(metric telegraf.Metric, name string, value interface{}) {
|
||||||
|
if metric.HasField(name) {
|
||||||
|
metric.RemoveField(name)
|
||||||
|
}
|
||||||
|
metric.AddField(name, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("enum", func() telegraf.Processor {
|
||||||
|
return &EnumMapper{}
|
||||||
|
})
|
||||||
|
}
|
||||||
106
plugins/processors/enum/enum_test.go
Normal file
106
plugins/processors/enum/enum_test.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package enum
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createTestMetric() telegraf.Metric {
|
||||||
|
metric, _ := metric.New("m1",
|
||||||
|
map[string]string{"tag": "tag_value"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"string_value": "test",
|
||||||
|
"int_value": int(13),
|
||||||
|
"true_value": true,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
return metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func calculateProcessedValues(mapper EnumMapper, metric telegraf.Metric) map[string]interface{} {
|
||||||
|
processed := mapper.Apply(metric)
|
||||||
|
return processed[0].Fields()
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertFieldValue(t *testing.T, expected interface{}, field string, fields map[string]interface{}) {
|
||||||
|
value, present := fields[field]
|
||||||
|
assert.True(t, present, "value of field '"+field+"' was not present")
|
||||||
|
assert.EqualValues(t, expected, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetainsMetric(t *testing.T) {
|
||||||
|
mapper := EnumMapper{}
|
||||||
|
source := createTestMetric()
|
||||||
|
|
||||||
|
target := mapper.Apply(source)[0]
|
||||||
|
fields := target.Fields()
|
||||||
|
|
||||||
|
assertFieldValue(t, "test", "string_value", fields)
|
||||||
|
assertFieldValue(t, 13, "int_value", fields)
|
||||||
|
assertFieldValue(t, true, "true_value", fields)
|
||||||
|
assert.Equal(t, "m1", target.Name())
|
||||||
|
assert.Equal(t, source.Tags(), target.Tags())
|
||||||
|
assert.Equal(t, source.Time(), target.Time())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapsSingleStringValue(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, 1, "string_value", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, 13, "int_value", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapSingleBoolValue(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, 1, "true_value", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, 42, "string_value", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, 1, "string_value", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, "test", "string_value", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWritesToDestination(t *testing.T) {
|
||||||
|
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Destination: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||||
|
|
||||||
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
assertFieldValue(t, "test", "string_value", fields)
|
||||||
|
assertFieldValue(t, 1, "string_code", fields)
|
||||||
|
}
|
||||||
@@ -67,7 +67,10 @@ func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
|||||||
for _, metric := range in {
|
for _, metric := range in {
|
||||||
for _, converter := range r.Tags {
|
for _, converter := range r.Tags {
|
||||||
if value, ok := metric.GetTag(converter.Key); ok {
|
if value, ok := metric.GetTag(converter.Key); ok {
|
||||||
metric.AddTag(r.convert(converter, value))
|
k, v := r.convert(converter, value)
|
||||||
|
if k != "" {
|
||||||
|
metric.AddTag(k, v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,11 +78,14 @@ func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
|||||||
if value, ok := metric.GetField(converter.Key); ok {
|
if value, ok := metric.GetField(converter.Key); ok {
|
||||||
switch value := value.(type) {
|
switch value := value.(type) {
|
||||||
case string:
|
case string:
|
||||||
|
k, _ := r.convert(converter, value)
|
||||||
|
if k != "" {
|
||||||
metric.AddField(r.convert(converter, value))
|
metric.AddField(r.convert(converter, value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return in
|
return in
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user