Compare commits

..

6 Commits

Author SHA1 Message Date
Greg Linton
6f4bd9ad82 Don't read/check msg length 2018-07-03 15:58:03 -06:00
Greg Linton
4ea618ea26 Enhance syslog reading and ensure proper length is read 2018-07-03 15:05:59 -06:00
Greg Linton
a7545e6cac Don't close connection if readTimeout exceeded 2018-07-02 18:56:58 -06:00
Greg Linton
58e815fdd1 Merge branch 'master' into bugfix/4335 2018-07-02 18:13:44 -06:00
Greg Linton
06682c6350 Cleanup syslog messages
Remove leading and trailing spaces and/or newlines from syslog message
field.
2018-07-02 18:11:58 -06:00
Greg Linton
839ca60b0e Improve syslog connection handling
Resolves #4335
2018-07-02 17:49:13 -06:00
15 changed files with 120 additions and 404 deletions

View File

@@ -6,10 +6,6 @@
- [tengine](./plugins/inputs/tengine/README.md) - Contributed by @ertaoxu
### New Processors
- [enum](./plugins/processors/enum/README.md) - Contributed by @KarstenSchnitter
### New Aggregators
- [valuecounter](./plugins/aggregators/valuecounter/README.md) - Contributed by @piotr1212
@@ -30,11 +26,8 @@
- [#4267](https://github.com/influxdata/telegraf/pull/4267): Add option to use of counter time in win perf counters.
- [#4343](https://github.com/influxdata/telegraf/pull/4343): Add energy and power field and device id tag to fibaro input.
- [#4347](https://github.com/influxdata/telegraf/pull/4347): Add http path configuration for OpenTSDB output.
- [#4352](https://github.com/influxdata/telegraf/pull/4352): Gather IPMI metrics concurrently.
- [#4362](https://github.com/influxdata/telegraf/pull/4362): Add mongo document and connection metrics.
- [#3772](https://github.com/influxdata/telegraf/pull/3772): Add Enum Processor.
## v1.7.1 [2018-07-03]
## v1.7.1 [unreleased]
### Bugfixes
@@ -45,7 +38,6 @@
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
- [#4316](https://github.com/influxdata/telegraf/issues/4316): Fix incorrect precision being applied to metric in http_listener.
## v1.7 [2018-06-12]

View File

@@ -343,9 +343,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
h.mu.Lock()
defer h.mu.Unlock()
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
h.handler.SetTimeFunc(func() time.Time { return t })
metrics, err := h.parser.Parse(b)

View File

@@ -5,7 +5,6 @@ import (
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
@@ -62,18 +61,13 @@ func (m *Ipmi) Gather(acc telegraf.Accumulator) error {
}
if len(m.Servers) > 0 {
wg := sync.WaitGroup{}
for _, server := range m.Servers {
wg.Add(1)
go func(a telegraf.Accumulator, s string) {
defer wg.Done()
err := m.parse(a, s)
err := m.parse(acc, server)
if err != nil {
a.AddError(err)
acc.AddError(err)
continue
}
}(acc, server)
}
wg.Wait()
} else {
err := m.parse(acc, "")
if err != nil {

View File

@@ -53,13 +53,6 @@ var DefaultStats = map[string]string{
"cursor_no_timeout": "NoTimeoutC",
"cursor_pinned": "PinnedC",
"cursor_total": "TotalC",
"document_deleted": "DeletedD",
"document_inserted": "InsertedD",
"document_returned": "ReturnedD",
"document_updated": "UpdatedD",
"connections_current": "CurrentC",
"connections_available": "AvailableC",
"connections_total_created": "TotalCreatedC",
}
var DefaultReplStats = map[string]string{

View File

@@ -38,13 +38,6 @@ func TestAddNonReplStats(t *testing.T) {
NoTimeoutC: 0,
PinnedC: 0,
TotalC: 0,
DeletedD: 0,
InsertedD: 0,
ReturnedD: 0,
UpdatedD: 0,
CurrentC: 0,
AvailableC: 0,
TotalCreatedC: 0,
},
tags,
)
@@ -226,13 +219,6 @@ func TestStateTag(t *testing.T) {
"cursor_no_timeout": int64(0),
"cursor_pinned": int64(0),
"cursor_total": int64(0),
"document_deleted": int64(0),
"document_inserted": int64(0),
"document_returned": int64(0),
"document_updated": int64(0),
"connections_current": int64(0),
"connections_available": int64(0),
"connections_total_created": int64(0),
}
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
}

View File

@@ -225,7 +225,7 @@ type FlushStats struct {
type ConnectionStats struct {
Current int64 `bson:"current"`
Available int64 `bson:"available"`
TotalCreated int64 `bson:"total_created"`
TotalCreated int64 `bson:"totalCreated"`
}
// DurTiming stores information related to journaling.
@@ -291,7 +291,6 @@ type OpcountStats struct {
type MetricsStats struct {
TTL *TTLStats `bson:"ttl"`
Cursor *CursorStats `bson:"cursor"`
Document *DocumentStats `bson:"document"`
}
// TTLStats stores information related to documents with a ttl index.
@@ -306,14 +305,6 @@ type CursorStats struct {
Open *OpenCursorStats `bson:"open"`
}
// DocumentStats stores information related to document metrics.
type DocumentStats struct {
Deleted int64 `bson:"deleted"`
Inserted int64 `bson:"inserted"`
Returned int64 `bson:"returned"`
Updated int64 `bson:"updated"`
}
// OpenCursorStats stores information related to open cursor metrics
type OpenCursorStats struct {
NoTimeout int64 `bson:"noTimeout"`
@@ -466,12 +457,6 @@ type StatLine struct {
TimedOutC int64
NoTimeoutC, PinnedC, TotalC int64
// Document fields
DeletedD, InsertedD, ReturnedD, UpdatedD int64
// Connection fields
CurrentC, AvailableC, TotalCreatedC int64
// Collection locks (3.0 mmap only)
CollectionLocks *CollectionLockStatus

View File

@@ -1,8 +1,6 @@
# Procstat Input Plugin
The procstat plugin can be used to monitor the system resource usage of one or more processes.
The procstat_lookup metric displays the query information,
specifically the number of PIDs returned on a search
Processes can be selected for monitoring using one of several methods:
- pidfile
@@ -129,18 +127,7 @@ implemented as a WMI query. The pattern allows fuzzy matching using only
- voluntary_context_switches (int)
- write_bytes (int, *telegraf* may need to be ran as **root**)
- write_count (int, *telegraf* may need to be ran as **root**)
- procstat_lookup
- tags:
- exe (string)
- pid_finder (string)
- pid_file (string)
- pattern (string)
- prefix (string)
- user (string)
- systemd_unit (string)
- cgroup (string)
- fields:
- pid_count (int)
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
### Example Output:

View File

@@ -1,6 +1,7 @@
package syslog
import (
"bytes"
"crypto/tls"
"fmt"
"io"
@@ -279,21 +280,52 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
conn.Close()
}()
for {
data := &bytes.Buffer{}
// read the data
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
n, err := io.Copy(data, conn)
if err != nil {
// read timeout reached, parse what we have
if er, ok := err.(net.Error); ok && er.Timeout() {
if n == 0 {
continue
}
goto parseMsg
}
// client has closed connection, return
if err == io.EOF {
if n > 0 {
goto parseMsg
}
return
}
// other error, log and return
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
return
}
// handle client disconnect
if n == 0 {
return
}
parseMsg:
var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort())
p = rfc5425.NewParser(data, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(conn)
p = rfc5425.NewParser(data)
}
p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
}
}
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
if s.KeepAlivePeriod == nil {
@@ -361,7 +393,7 @@ func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
}
if msg.Message() != nil {
flds["message"] = *msg.Message()
flds["message"] = strings.TrimSpace(*msg.Message())
}
if msg.StructuredData() != nil {

View File

@@ -107,13 +107,13 @@ requests that are in the queue but not yet issued to the device driver.
#### Calculate percent IO utilization per disk and host:
```
SELECT non_negative_derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
SELECT derivative(last("io_time"),1ms) FROM "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
```
#### Calculate average queue depth:
`iops_in_progress` will give you an instantaneous value. This will give you the average between polling intervals.
```
SELECT non_negative_derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
SELECT derivative(last("weighted_io_time",1ms)) from "diskio" WHERE time > now() - 30m GROUP BY "host","name",time(60s)
```
### Example Output:

View File

@@ -2,7 +2,7 @@
The swap plugin collects system swap metrics.
For more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
For a more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
### Configuration:

View File

@@ -2,7 +2,6 @@ package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex"

View File

@@ -1,33 +0,0 @@
# Enum Processor Plugin
The Enum Processor allows the configuration of value mappings for metric fields.
The main use-case for this is to rewrite status codes such as _red_, _amber_ and
_green_ by numeric values such as 0, 1, 2. The plugin supports string and bool
types for the field values. Multiple Fields can be configured with separate
value mappings for each field. Default mapping values can be configured to be
used for all values, which are not contained in the value_mappings. The
processor supports explicit configuration of a destination field. By default the
source field is overwritten.
### Configuration:
```toml
[[processors.enum]]
[[processors.enum.fields]]
## Name of the field to map
source = "name"
## Destination field to be used for the mapped value. By default the source
## field is used, overwriting the original value.
# destination = "mapped"
## Default value to be used for all values not contained in the mapping
## table. When unset, the unmodified value for the field will be used if no
## match is found.
# default = 0
## Table of mappings
[processors.enum.fields.value_mappings]
value1 = 1
value2 = 2
```

View File

@@ -1,104 +0,0 @@
package enum
import (
"strconv"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
var sampleConfig = `
[[processors.enum.fields]]
## Name of the field to map
source = "name"
## Destination field to be used for the mapped value. By default the source
## field is used, overwriting the original value.
# destination = "mapped"
## Default value to be used for all values not contained in the mapping
## table. When unset, the unmodified value for the field will be used if no
## match is found.
# default = 0
## Table of mappings
[processors.enum.fields.value_mappings]
value1 = 1
value2 = 2
`
type EnumMapper struct {
Fields []Mapping
}
type Mapping struct {
Source string
Destination string
Default interface{}
ValueMappings map[string]interface{}
}
func (mapper *EnumMapper) SampleConfig() string {
return sampleConfig
}
func (mapper *EnumMapper) Description() string {
return "Map enum values according to given table."
}
func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
for i := 0; i < len(in); i++ {
in[i] = mapper.applyMappings(in[i])
}
return in
}
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
for _, mapping := range mapper.Fields {
if originalValue, isPresent := metric.GetField(mapping.Source); isPresent == true {
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
writeField(metric, mapping.getDestination(), mappedValue)
}
}
}
}
return metric
}
func adjustBoolValue(in interface{}) interface{} {
if mappedBool, isBool := in.(bool); isBool == true {
return strconv.FormatBool(mappedBool)
}
return in
}
func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
if mapped, found := mapping.ValueMappings[original]; found == true {
return mapped, true
}
if mapping.Default != nil {
return mapping.Default, true
}
return original, false
}
func (mapping *Mapping) getDestination() string {
if mapping.Destination != "" {
return mapping.Destination
}
return mapping.Source
}
func writeField(metric telegraf.Metric, name string, value interface{}) {
if metric.HasField(name) {
metric.RemoveField(name)
}
metric.AddField(name, value)
}
func init() {
processors.Add("enum", func() telegraf.Processor {
return &EnumMapper{}
})
}

View File

@@ -1,106 +0,0 @@
package enum
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
func createTestMetric() telegraf.Metric {
metric, _ := metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{
"string_value": "test",
"int_value": int(13),
"true_value": true,
},
time.Now(),
)
return metric
}
func calculateProcessedValues(mapper EnumMapper, metric telegraf.Metric) map[string]interface{} {
processed := mapper.Apply(metric)
return processed[0].Fields()
}
func assertFieldValue(t *testing.T, expected interface{}, field string, fields map[string]interface{}) {
value, present := fields[field]
assert.True(t, present, "value of field '"+field+"' was not present")
assert.EqualValues(t, expected, value)
}
func TestRetainsMetric(t *testing.T) {
mapper := EnumMapper{}
source := createTestMetric()
target := mapper.Apply(source)[0]
fields := target.Fields()
assertFieldValue(t, "test", "string_value", fields)
assertFieldValue(t, 13, "int_value", fields)
assertFieldValue(t, true, "true_value", fields)
assert.Equal(t, "m1", target.Name())
assert.Equal(t, source.Tags(), target.Tags())
assert.Equal(t, source.Time(), target.Time())
}
func TestMapsSingleStringValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields)
}
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 13, "int_value", fields)
}
func TestMapSingleBoolValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "true_value", fields)
}
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 42, "string_value", fields)
}
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields)
}
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields)
}
func TestWritesToDestination(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Destination: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields)
assertFieldValue(t, 1, "string_code", fields)
}

View File

@@ -67,10 +67,7 @@ func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
for _, converter := range r.Tags {
if value, ok := metric.GetTag(converter.Key); ok {
k, v := r.convert(converter, value)
if k != "" {
metric.AddTag(k, v)
}
metric.AddTag(r.convert(converter, value))
}
}
@@ -78,14 +75,11 @@ func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric {
if value, ok := metric.GetField(converter.Key); ok {
switch value := value.(type) {
case string:
k, _ := r.convert(converter, value)
if k != "" {
metric.AddField(r.convert(converter, value))
}
}
}
}
}
return in
}