Add Ipset input plugin (#3346)

This commit is contained in:
Laurent Sesquès 2018-01-31 21:25:27 +01:00 committed by Daniel Nelson
parent 32732d42f8
commit 7b365180d0
4 changed files with 324 additions and 0 deletions

View File

@ -36,6 +36,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/internal"
_ "github.com/influxdata/telegraf/plugins/inputs/interrupts"
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
_ "github.com/influxdata/telegraf/plugins/inputs/ipset"
_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia2"

View File

@ -0,0 +1,62 @@
# Ipset Plugin
The ipset plugin gathers packets and bytes counters from Linux ipset.
It uses the output of the command "ipset save".
Ipsets created without the "counters" option are ignored.
Results are tagged with:
- ipset name
- ipset entry
There are 3 ways to grant telegraf the right to run ipset:
* Run as root (strongly discouraged)
* Use sudo
* Configure systemd to run telegraf with CAP_NET_ADMIN and CAP_NET_RAW capabilities.
### Using systemd capabilities
You may run `systemctl edit telegraf.service` and add the following:
```
[Service]
CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN
AmbientCapabilities=CAP_NET_RAW CAP_NET_ADMIN
```
### Using sudo
You may edit your sudo configuration with the following:
```sudo
telegraf ALL=(root) NOPASSWD: /sbin/ipset save
```
### Configuration
```toml
[[inputs.ipset]]
## By default, we only show sets which have already matched at least 1 packet.
## set include_unmatched_sets = true to gather them all.
include_unmatched_sets = false
## Adjust your sudo settings appropriately if using this option ("sudo ipset save")
## You can avoid using sudo or root, by setting appropriate privileges for
## the telegraf.service systemd service.
use_sudo = false
## The default timeout of 1s for ipset execution can be overridden here:
# timeout = "1s"
```
### Example Output
```
$ sudo ipset save
create myset hash:net family inet hashsize 1024 maxelem 65536 counters comment
add myset 10.69.152.1 packets 8 bytes 672 comment "machine A"
```
```
$ telegraf --config telegraf.conf --input-filter ipset --test --debug
* Plugin: inputs.ipset, Collection 1
> ipset,rule=10.69.152.1,host=trashme,set=myset bytes_total=8i,packets_total=672i 1507615028000000000
```

View File

@ -0,0 +1,126 @@
package ipset
import (
"bufio"
"bytes"
"fmt"
"os/exec"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
// Ipsets is a telegraf plugin to gather packets and bytes counters from ipset
type Ipset struct {
IncludeUnmatchedSets bool
UseSudo bool
Timeout internal.Duration
lister setLister
}
type setLister func(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error)
const measurement = "ipset"
var defaultTimeout = internal.Duration{Duration: time.Second}
// Description returns a short description of the plugin
func (ipset *Ipset) Description() string {
return "Gather packets and bytes counters from Linux ipsets"
}
// SampleConfig returns sample configuration options.
func (ipset *Ipset) SampleConfig() string {
return `
## By default, we only show sets which have already matched at least 1 packet.
## set include_unmatched_sets = true to gather them all.
include_unmatched_sets = false
## Adjust your sudo settings appropriately if using this option ("sudo ipset save")
use_sudo = false
## The default timeout of 1s for ipset execution can be overridden here:
# timeout = "1s"
`
}
func (ips *Ipset) Gather(acc telegraf.Accumulator) error {
out, e := ips.lister(ips.Timeout, ips.UseSudo)
if e != nil {
acc.AddError(e)
}
scanner := bufio.NewScanner(out)
for scanner.Scan() {
line := scanner.Text()
// Ignore sets created without the "counters" option
nocomment := strings.Split(line, "\"")[0]
if !(strings.Contains(nocomment, "packets") &&
strings.Contains(nocomment, "bytes")) {
continue
}
data := strings.Fields(line)
if len(data) < 7 {
acc.AddError(fmt.Errorf("Error parsing line (expected at least 7 fields): %s", line))
continue
}
if data[0] == "add" && (data[4] != "0" || ips.IncludeUnmatchedSets) {
tags := map[string]string{
"set": data[1],
"rule": data[2],
}
packets_total, err := strconv.ParseUint(data[4], 10, 64)
if err != nil {
acc.AddError(err)
}
bytes_total, err := strconv.ParseUint(data[6], 10, 64)
if err != nil {
acc.AddError(err)
}
fields := map[string]interface{}{
"packets_total": packets_total,
"bytes_total": bytes_total,
}
acc.AddCounter(measurement, fields, tags)
}
}
return nil
}
func setList(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) {
// Is ipset installed ?
ipsetPath, err := exec.LookPath("ipset")
if err != nil {
return nil, err
}
var args []string
cmdName := ipsetPath
if UseSudo {
cmdName = "sudo"
args = append(args, ipsetPath)
}
args = append(args, "save")
cmd := exec.Command(cmdName, args...)
var out bytes.Buffer
cmd.Stdout = &out
err = internal.RunTimeout(cmd, Timeout.Duration)
if err != nil {
return &out, fmt.Errorf("error running ipset save: %s", err)
}
return &out, nil
}
func init() {
inputs.Add("ipset", func() telegraf.Input {
return &Ipset{
lister: setList,
Timeout: defaultTimeout,
}
})
}

View File

@ -0,0 +1,135 @@
package ipset
import (
"bytes"
"errors"
"fmt"
"reflect"
"testing"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
)
func TestIpset(t *testing.T) {
tests := []struct {
name string
value string
tags []map[string]string
fields [][]map[string]interface{}
err error
}{
{
name: "0 sets, no results",
value: "",
},
{
name: "Empty sets, no values",
value: `create myset hash:net family inet hashsize 1024 maxelem 65536
create myset2 hash:net,port family inet hashsize 16384 maxelem 524288 counters comment
`,
},
{
name: "Non-empty sets, but no counters, no results",
value: `create myset hash:net family inet hashsize 1024 maxelem 65536
add myset 1.2.3.4
`,
},
{
name: "Line with data but not enough fields",
value: `create hash:net family inet hashsize 1024 maxelem 65536 counters
add myset 4.5.6.7 packets 123 bytes
`,
err: fmt.Errorf("Error parsing line (expected at least 7 fields): \t\t\t\tadd myset 4.5.6.7 packets 123 bytes"),
},
{
name: "Non-empty sets, counters, no comment",
value: `create myset hash:net family inet hashsize 1024 maxelem 65536 counters
add myset 1.2.3.4 packets 1328 bytes 79680
add myset 2.3.4.5 packets 0 bytes 0
add myset 3.4.5.6 packets 3 bytes 222
`,
tags: []map[string]string{
map[string]string{"set": "myset", "rule": "1.2.3.4"},
map[string]string{"set": "myset", "rule": "3.4.5.6"},
},
fields: [][]map[string]interface{}{
{map[string]interface{}{"packets_total": uint64(1328), "bytes_total": uint64(79680)}},
{map[string]interface{}{"packets_total": uint64(3), "bytes_total": uint64(222)}},
},
},
{
name: "Sets with counters and comment",
value: `create myset hash:net family inet hashsize 1024 maxelem 65536 counters comment
add myset 1.2.3.4 packets 1328 bytes 79680 comment "first IP"
add myset 2.3.4.5 packets 0 bytes 0 comment "2nd IP"
add myset 3.4.5.6 packets 3 bytes 222 "3rd IP"
`,
tags: []map[string]string{
map[string]string{"set": "myset", "rule": "1.2.3.4"},
map[string]string{"set": "myset", "rule": "3.4.5.6"},
},
fields: [][]map[string]interface{}{
{map[string]interface{}{"packets_total": uint64(1328), "bytes_total": uint64(79680)}},
{map[string]interface{}{"packets_total": uint64(3), "bytes_total": uint64(222)}},
},
},
}
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
i++
ips := &Ipset{
lister: func(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) {
return bytes.NewBufferString(tt.value), nil
},
}
acc := new(testutil.Accumulator)
err := acc.GatherError(ips.Gather)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err)
}
if len(tt.tags) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 values got %d", i, n)
}
return
}
n := 0
for j, tags := range tt.tags {
for k, fields := range tt.fields[j] {
if len(acc.Metrics) < n+1 {
t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics))
break
}
m := acc.Metrics[n]
if !reflect.DeepEqual(m.Measurement, measurement) {
t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement)
}
if !reflect.DeepEqual(m.Tags, tags) {
t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags)
}
if !reflect.DeepEqual(m.Fields, fields) {
t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields)
}
n++
}
}
})
}
}
func TestIpset_Gather_listerError(t *testing.T) {
errFoo := errors.New("error foobar")
ips := &Ipset{
lister: func(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) {
return new(bytes.Buffer), errFoo
},
}
acc := new(testutil.Accumulator)
err := acc.GatherError(ips.Gather)
if !reflect.DeepEqual(err, errFoo) {
t.Errorf("Expected error %#v got\n%#v\n", errFoo, err)
}
}