Add multifile input plugin (#5256)
This commit is contained in:
parent
3de473721d
commit
7a031c48cd
|
@ -76,6 +76,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/minecraft"
|
_ "github.com/influxdata/telegraf/plugins/inputs/minecraft"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
|
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
|
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/multifile"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
|
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/nats"
|
_ "github.com/influxdata/telegraf/plugins/inputs/nats"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
|
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
# Multifile Input Plugin
|
||||||
|
|
||||||
|
### Description
|
||||||
|
The multifile input plugin allows telegraf to gather data from multiple files into a single point, creating one field or tag per file.
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
```
|
||||||
|
[[inputs.multifile]]
|
||||||
|
## Base directory where telegraf will look for files.
|
||||||
|
## Omit this option to use absolute paths.
|
||||||
|
base_dir = "/sys/bus/i2c/devices/1-0076/iio:device0"
|
||||||
|
|
||||||
|
## If true, Telegraf discard all data when a single file can't be read.
|
||||||
|
## Else, Telegraf omits the field generated from this file.
|
||||||
|
# fail_early = true
|
||||||
|
|
||||||
|
## Files to parse each interval.
|
||||||
|
[[inputs.multifile.file]]
|
||||||
|
file = "in_pressure_input"
|
||||||
|
dest = "pressure"
|
||||||
|
conversion = "float"
|
||||||
|
[[inputs.multifile.file]]
|
||||||
|
file = "in_temp_input"
|
||||||
|
dest = "temperature"
|
||||||
|
conversion = "float(3)"
|
||||||
|
[[inputs.multifile.file]]
|
||||||
|
file = "in_humidityrelative_input"
|
||||||
|
dest = "humidityrelative"
|
||||||
|
conversion = "float(3)"
|
||||||
|
```
|
||||||
|
* `file.file`:
|
||||||
|
Path of the file to be parsed
|
||||||
|
* `file.dest`:
|
||||||
|
Name of the field/tag created, defaults to `$(basename file)`
|
||||||
|
* `file.conversion`:
|
||||||
|
Data format used to parse the file contents
|
||||||
|
* `float(X)`: Converts the input value into a float and divides by the Xth power of 10. Efficively just moves the decimal left X places. For example a value of `123` with `float(2)` will result in `1.23`.
|
||||||
|
* `float`: Converts the value into a float with no adjustment. Same as `float(0)`.
|
||||||
|
* `int`: Convertes the value into an integer.
|
||||||
|
* `string`, `""`: No conversion
|
||||||
|
* `bool`: Convertes the value into a boolean
|
||||||
|
* `tag`: File content is used as a tag
|
||||||
|
|
||||||
|
### Example Output
|
||||||
|
This example shows a BME280 connected to a Raspberry Pi, using the sample config.
|
||||||
|
```
|
||||||
|
multifile pressure=101.343285156,temperature=20.4,humidityrelative=48.9 1547202076000000000
|
||||||
|
```
|
||||||
|
|
||||||
|
To reproduce this, connect a BMP280 to the board's GPIO pins and register the BME280 device driver
|
||||||
|
```
|
||||||
|
cd /sys/bus/i2c/devices/i2c-1
|
||||||
|
echo bme280 0x76 > new_device
|
||||||
|
```
|
||||||
|
|
||||||
|
The kernel driver provides the following files in `/sys/bus/i2c/devices/1-0076/iio:device0`:
|
||||||
|
* `in_humidityrelative_input`: `48900`
|
||||||
|
* `in_pressure_input`: `101.343285156`
|
||||||
|
* `in_temp_input`: `20400`
|
|
@ -0,0 +1,149 @@
|
||||||
|
package multifile
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"math"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MultiFile struct {
|
||||||
|
BaseDir string
|
||||||
|
FailEarly bool
|
||||||
|
Files []File `toml:"file"`
|
||||||
|
|
||||||
|
initialized bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type File struct {
|
||||||
|
Name string `toml:"file"`
|
||||||
|
Dest string
|
||||||
|
Conversion string
|
||||||
|
}
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## Base directory where telegraf will look for files.
|
||||||
|
## Omit this option to use absolute paths.
|
||||||
|
base_dir = "/sys/bus/i2c/devices/1-0076/iio:device0"
|
||||||
|
|
||||||
|
## If true, Telegraf discard all data when a single file can't be read.
|
||||||
|
## Else, Telegraf omits the field generated from this file.
|
||||||
|
# fail_early = true
|
||||||
|
|
||||||
|
## Files to parse each interval.
|
||||||
|
[[inputs.multifile.file]]
|
||||||
|
file = "in_pressure_input"
|
||||||
|
dest = "pressure"
|
||||||
|
conversion = "float"
|
||||||
|
[[inputs.multifile.file]]
|
||||||
|
file = "in_temp_input"
|
||||||
|
dest = "temperature"
|
||||||
|
conversion = "float(3)"
|
||||||
|
[[inputs.multifile.file]]
|
||||||
|
file = "in_humidityrelative_input"
|
||||||
|
dest = "humidityrelative"
|
||||||
|
conversion = "float(3)"
|
||||||
|
`
|
||||||
|
|
||||||
|
// SampleConfig returns the default configuration of the Input
|
||||||
|
func (m *MultiFile) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MultiFile) Description() string {
|
||||||
|
return "Aggregates the contents of multiple files into a single point"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MultiFile) init() {
|
||||||
|
if m.initialized {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, file := range m.Files {
|
||||||
|
if m.BaseDir != "" {
|
||||||
|
m.Files[i].Name = path.Join(m.BaseDir, file.Name)
|
||||||
|
}
|
||||||
|
if file.Dest == "" {
|
||||||
|
m.Files[i].Dest = path.Base(file.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.initialized = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MultiFile) Gather(acc telegraf.Accumulator) error {
|
||||||
|
m.init()
|
||||||
|
now := time.Now()
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
for _, file := range m.Files {
|
||||||
|
fileContents, err := ioutil.ReadFile(file.Name)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if m.FailEarly {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
vStr := string(bytes.TrimSpace(bytes.Trim(fileContents, "\x00")))
|
||||||
|
|
||||||
|
if file.Conversion == "tag" {
|
||||||
|
tags[file.Dest] = vStr
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var value interface{}
|
||||||
|
|
||||||
|
var d int = 0
|
||||||
|
if _, errfmt := fmt.Sscanf(file.Conversion, "float(%d)", &d); errfmt == nil || file.Conversion == "float" {
|
||||||
|
var v float64
|
||||||
|
v, err = strconv.ParseFloat(vStr, 64)
|
||||||
|
value = v / math.Pow10(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
if file.Conversion == "int" {
|
||||||
|
value, err = strconv.ParseInt(vStr, 10, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
if file.Conversion == "string" || file.Conversion == "" {
|
||||||
|
value = vStr
|
||||||
|
}
|
||||||
|
|
||||||
|
if file.Conversion == "bool" {
|
||||||
|
value, err = strconv.ParseBool(vStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if m.FailEarly {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if value == nil {
|
||||||
|
return errors.New(fmt.Sprintf("invalid conversion %v", file.Conversion))
|
||||||
|
}
|
||||||
|
|
||||||
|
fields[file.Dest] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddGauge("multifile", fields, tags, now)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("multifile", func() telegraf.Input {
|
||||||
|
return &MultiFile{
|
||||||
|
FailEarly: true,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
package multifile
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFileTypes(t *testing.T) {
|
||||||
|
wd, _ := os.Getwd()
|
||||||
|
|
||||||
|
m := MultiFile{
|
||||||
|
BaseDir: path.Join(wd, `testdata`),
|
||||||
|
FailEarly: true,
|
||||||
|
Files: []File{
|
||||||
|
{Name: `bool.txt`, Dest: `examplebool`, Conversion: `bool`},
|
||||||
|
{Name: `float.txt`, Dest: `examplefloat`, Conversion: `float`},
|
||||||
|
{Name: `int.txt`, Dest: `examplefloatX`, Conversion: `float(3)`},
|
||||||
|
{Name: `int.txt`, Dest: `exampleint`, Conversion: `int`},
|
||||||
|
{Name: `string.txt`, Dest: `examplestring`},
|
||||||
|
{Name: `tag.txt`, Dest: `exampletag`, Conversion: `tag`},
|
||||||
|
{Name: `int.txt`, Conversion: `int`},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err := m.Gather(&acc)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, map[string]string{"exampletag": "test"}, acc.Metrics[0].Tags)
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"examplebool": true,
|
||||||
|
"examplestring": "hello world",
|
||||||
|
"exampleint": int64(123456),
|
||||||
|
"int.txt": int64(123456),
|
||||||
|
"examplefloat": 123.456,
|
||||||
|
"examplefloatX": 123.456,
|
||||||
|
}, acc.Metrics[0].Fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FailEarly(failEarly bool, t *testing.T) error {
|
||||||
|
wd, _ := os.Getwd()
|
||||||
|
|
||||||
|
m := MultiFile{
|
||||||
|
BaseDir: path.Join(wd, `testdata`),
|
||||||
|
FailEarly: failEarly,
|
||||||
|
Files: []File{
|
||||||
|
{Name: `int.txt`, Dest: `exampleint`, Conversion: `int`},
|
||||||
|
{Name: `int.txt`, Dest: `exampleerror`, Conversion: `bool`},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err := m.Gather(&acc)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"exampleint": int64(123456),
|
||||||
|
}, acc.Metrics[0].Fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFailEarly(t *testing.T) {
|
||||||
|
err := FailEarly(false, t)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = FailEarly(true, t)
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
123.456
|
|
@ -0,0 +1 @@
|
||||||
|
123456
|
|
@ -0,0 +1 @@
|
||||||
|
hello world
|
|
@ -0,0 +1 @@
|
||||||
|
test
|
Loading…
Reference in New Issue