Add processor to look up service name by port (#7540)
This commit is contained in:
parent
a438678d5b
commit
71b0b96241
1
Makefile
1
Makefile
|
@ -72,6 +72,7 @@ test-windows:
|
||||||
go test -short ./plugins/inputs/win_services/...
|
go test -short ./plugins/inputs/win_services/...
|
||||||
go test -short ./plugins/inputs/procstat/...
|
go test -short ./plugins/inputs/procstat/...
|
||||||
go test -short ./plugins/inputs/ntpq/...
|
go test -short ./plugins/inputs/ntpq/...
|
||||||
|
go test -short ./plugins/processors/port_name/...
|
||||||
|
|
||||||
.PHONY: vet
|
.PHONY: vet
|
||||||
vet:
|
vet:
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/parser"
|
_ "github.com/influxdata/telegraf/plugins/processors/parser"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/pivot"
|
_ "github.com/influxdata/telegraf/plugins/processors/pivot"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/port_name"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
# Port Name Lookup Processor Plugin
|
||||||
|
|
||||||
|
Use the `port_name` processor to convert a tag containing a well-known port number to the registered service name.
|
||||||
|
|
||||||
|
Tag can contain a number ("80") or number and protocol separated by slash ("443/tcp"). If protocol is not provided it defaults to tcp but can be changed with the default_protocol setting.
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.port_name]]
|
||||||
|
## Name of tag holding the port number
|
||||||
|
# tag = "port"
|
||||||
|
|
||||||
|
## Name of output tag where service name will be added
|
||||||
|
# dest = "service"
|
||||||
|
|
||||||
|
## Default tcp or udp
|
||||||
|
# default_protocol = "tcp"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
```diff
|
||||||
|
- measurement,port=80 field=123 1560540094000000000
|
||||||
|
+ measurement,port=80,service=http field=123 1560540094000000000
|
||||||
|
```
|
|
@ -0,0 +1,174 @@
|
||||||
|
package portname
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
[[processors.port_name]]
|
||||||
|
## Name of tag holding the port number
|
||||||
|
# tag = "port"
|
||||||
|
|
||||||
|
## Name of output tag where service name will be added
|
||||||
|
# dest = "service"
|
||||||
|
|
||||||
|
## Default tcp or udp
|
||||||
|
# default_protocol = "tcp"
|
||||||
|
`
|
||||||
|
|
||||||
|
type sMap map[string]map[int]string // "https" == services["tcp"][443]
|
||||||
|
|
||||||
|
var services sMap
|
||||||
|
|
||||||
|
type PortName struct {
|
||||||
|
SourceTag string `toml:"tag"`
|
||||||
|
DestTag string `toml:"dest"`
|
||||||
|
DefaultProtocol string `toml:"default_protocol"`
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *PortName) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *PortName) Description() string {
|
||||||
|
return "Given a tag of a TCP or UDP port number, add a tag of the service name looked up in the system services file"
|
||||||
|
}
|
||||||
|
|
||||||
|
func readServicesFile() {
|
||||||
|
file, err := os.Open(servicesPath())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
services = readServices(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the services file into a map.
|
||||||
|
//
|
||||||
|
// This function takes a similar approach to parsing as the go
|
||||||
|
// standard library (see src/net/port_unix.go in golang source) but
|
||||||
|
// maps protocol and port number to service name, not protocol and
|
||||||
|
// service to port number.
|
||||||
|
func readServices(r io.Reader) sMap {
|
||||||
|
services = make(sMap)
|
||||||
|
scanner := bufio.NewScanner(r)
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
// "http 80/tcp www www-http # World Wide Web HTTP"
|
||||||
|
if i := strings.Index(line, "#"); i >= 0 {
|
||||||
|
line = line[:i]
|
||||||
|
}
|
||||||
|
f := strings.Fields(line)
|
||||||
|
if len(f) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
service := f[0] // "http"
|
||||||
|
portProto := f[1] // "80/tcp"
|
||||||
|
portProtoSlice := strings.SplitN(portProto, "/", 2)
|
||||||
|
if len(portProtoSlice) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
port, err := strconv.Atoi(portProtoSlice[0]) // "80"
|
||||||
|
if err != nil || port <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
proto := portProtoSlice[1] // "tcp"
|
||||||
|
proto = strings.ToLower(proto)
|
||||||
|
|
||||||
|
protoMap, ok := services[proto]
|
||||||
|
if !ok {
|
||||||
|
protoMap = make(map[int]string)
|
||||||
|
services[proto] = protoMap
|
||||||
|
}
|
||||||
|
protoMap[port] = service
|
||||||
|
}
|
||||||
|
return services
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *PortName) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
for _, m := range metrics {
|
||||||
|
portProto, ok := m.GetTag(d.SourceTag)
|
||||||
|
if !ok {
|
||||||
|
// Nonexistent tag
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
portProtoSlice := strings.SplitN(portProto, "/", 2)
|
||||||
|
l := len(portProtoSlice)
|
||||||
|
|
||||||
|
if l == 0 {
|
||||||
|
// Empty tag
|
||||||
|
d.Log.Errorf("empty port tag: %v", d.SourceTag)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var port int
|
||||||
|
if l > 0 {
|
||||||
|
var err error
|
||||||
|
val := portProtoSlice[0]
|
||||||
|
port, err = strconv.Atoi(val)
|
||||||
|
if err != nil {
|
||||||
|
// Can't convert port to string
|
||||||
|
d.Log.Errorf("error converting port to integer: %v", val)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
proto := d.DefaultProtocol
|
||||||
|
if l > 1 && len(portProtoSlice[1]) > 0 {
|
||||||
|
proto = portProtoSlice[1]
|
||||||
|
}
|
||||||
|
proto = strings.ToLower(proto)
|
||||||
|
|
||||||
|
protoMap, ok := services[proto]
|
||||||
|
if !ok {
|
||||||
|
// Unknown protocol
|
||||||
|
//
|
||||||
|
// Protocol is normally tcp or udp. The services file
|
||||||
|
// normally has entries for both, so our map does too. If
|
||||||
|
// not, it's very likely the source tag or the services
|
||||||
|
// file doesn't make sense.
|
||||||
|
d.Log.Errorf("protocol not found in services map: %v", proto)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
service, ok := protoMap[port]
|
||||||
|
if !ok {
|
||||||
|
// Unknown port
|
||||||
|
//
|
||||||
|
// Not all ports are named so this isn't an error, but
|
||||||
|
// it's helpful to know when debugging.
|
||||||
|
d.Log.Debugf("port not found in services map: %v", port)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
m.AddTag(d.DestTag, service)
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *PortName) Init() error {
|
||||||
|
services = make(sMap)
|
||||||
|
readServicesFile()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("port_name", func() telegraf.Processor {
|
||||||
|
return &PortName{
|
||||||
|
SourceTag: "port",
|
||||||
|
DestTag: "service",
|
||||||
|
DefaultProtocol: "tcp",
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,261 @@
|
||||||
|
package portname
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var fakeServices = `
|
||||||
|
http 80/tcp www # WorldWideWeb HTTP
|
||||||
|
https 443/tcp # http protocol over TLS/SSL
|
||||||
|
tftp 69/udp`
|
||||||
|
|
||||||
|
func TestReadServicesFile(t *testing.T) {
|
||||||
|
readServicesFile()
|
||||||
|
require.NotZero(t, len(services))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFakeServices(t *testing.T) {
|
||||||
|
r := strings.NewReader(fakeServices)
|
||||||
|
m := readServices(r)
|
||||||
|
require.Equal(t, sMap{"tcp": {80: "http", 443: "https"}, "udp": {69: "tftp"}}, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTable(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
tag string
|
||||||
|
dest string
|
||||||
|
prot string
|
||||||
|
input []telegraf.Metric
|
||||||
|
expected []telegraf.Metric
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ordinary tcp default",
|
||||||
|
tag: "port",
|
||||||
|
dest: "service",
|
||||||
|
prot: "tcp",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "443",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "443",
|
||||||
|
"service": "https",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "force udp default",
|
||||||
|
tag: "port",
|
||||||
|
dest: "service",
|
||||||
|
prot: "udp",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "69",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "69",
|
||||||
|
"service": "tftp",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "override default protocol",
|
||||||
|
tag: "port",
|
||||||
|
dest: "service",
|
||||||
|
prot: "foobar",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "80/tcp",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "80/tcp",
|
||||||
|
"service": "http",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple metrics, multiple protocols",
|
||||||
|
tag: "port",
|
||||||
|
dest: "service",
|
||||||
|
prot: "tcp",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "80",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "69/udp",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "80",
|
||||||
|
"service": "http",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "69/udp",
|
||||||
|
"service": "tftp",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "rename source and destination tags",
|
||||||
|
tag: "foo",
|
||||||
|
dest: "bar",
|
||||||
|
prot: "tcp",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"foo": "80",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"foo": "80",
|
||||||
|
"bar": "http",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unknown port",
|
||||||
|
tag: "port",
|
||||||
|
dest: "service",
|
||||||
|
prot: "tcp",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "9999",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "9999",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "don't mix up protocols",
|
||||||
|
tag: "port",
|
||||||
|
dest: "service",
|
||||||
|
prot: "udp",
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "80",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"meas",
|
||||||
|
map[string]string{
|
||||||
|
"port": "80",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := strings.NewReader(fakeServices)
|
||||||
|
services = readServices(r)
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
p := PortName{
|
||||||
|
SourceTag: tt.tag,
|
||||||
|
DestTag: tt.dest,
|
||||||
|
DefaultProtocol: tt.prot,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := p.Apply(tt.input...)
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, tt.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
// +build windows
|
||||||
|
|
||||||
|
package portname
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
func servicesPath() string {
|
||||||
|
return filepath.Join(os.Getenv("WINDIR"), `system32\drivers\etc\services`)
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package portname
|
||||||
|
|
||||||
|
func servicesPath() string {
|
||||||
|
return "/etc/services"
|
||||||
|
}
|
Loading…
Reference in New Issue