Refactor the architecture of encoding parsers.
This commit is contained in:
parent
93709e0209
commit
88f62a4cc5
|
@ -158,9 +158,8 @@ Currently implemented sources:
|
|||
* disque
|
||||
* docker
|
||||
* elasticsearch
|
||||
* exec (generic line-protocol-emitting executable plugin, support JSON, influx and graphite)
|
||||
* socket (generic line protocol listen input service, support influx and graphite)
|
||||
* tail (Plugin to tail the files to process line protocol contents, support influx and graphite)
|
||||
* exec (generic executable plugin, support JSON, influx and graphite)
|
||||
* tail (generic plugin to tail the files to process line protocol contents, support influx and graphite)
|
||||
* haproxy
|
||||
* httpjson (generic JSON-emitting http service plugin)
|
||||
* influxdb
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package encoding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type Parser interface {
|
||||
InitConfig(configs map[string]interface{}) error
|
||||
Parse(buf []byte) ([]telegraf.Metric, error)
|
||||
ParseLine(line string) (telegraf.Metric, error)
|
||||
}
|
||||
|
||||
type Creator func() Parser
|
||||
|
||||
var Parsers = map[string]Creator{}
|
||||
|
||||
func Add(name string, creator Creator) {
|
||||
Parsers[name] = creator
|
||||
}
|
||||
|
||||
func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) {
|
||||
creator := Parsers[dataFormat]
|
||||
if creator == nil {
|
||||
return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat)
|
||||
}
|
||||
parser = creator()
|
||||
err = parser.InitConfig(configs)
|
||||
return parser, err
|
||||
}
|
|
@ -14,6 +14,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/encoding"
|
||||
)
|
||||
|
||||
// Minimum and maximum supported dates for timestamps.
|
||||
|
@ -22,31 +23,31 @@ var (
|
|||
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
|
||||
)
|
||||
|
||||
var defaultTemplate *template
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
defaultTemplate, err = NewTemplate("measurement*", nil, DefaultSeparator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Parser encapsulates a Graphite Parser.
|
||||
type Parser struct {
|
||||
matcher *matcher
|
||||
}
|
||||
|
||||
// Options are configurable values that can be provided to a Parser
|
||||
type Options struct {
|
||||
Separator string
|
||||
Templates []string
|
||||
}
|
||||
|
||||
// NewParserWithOptions returns a graphite parser using the given options
|
||||
func NewParserWithOptions(options Options) (*Parser, error) {
|
||||
// Parser encapsulates a Graphite Parser.
|
||||
type GraphiteParser struct {
|
||||
matcher *matcher
|
||||
}
|
||||
|
||||
func NewParser() *GraphiteParser {
|
||||
return &GraphiteParser{}
|
||||
}
|
||||
|
||||
func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error {
|
||||
|
||||
var err error
|
||||
options := Options{
|
||||
Templates: configs["Templates"].([]string),
|
||||
Separator: configs["Separator"].(string)}
|
||||
|
||||
matcher := newMatcher()
|
||||
p.matcher = matcher
|
||||
defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator)
|
||||
matcher.AddDefaultTemplate(defaultTemplate)
|
||||
|
||||
for _, pattern := range options.Templates {
|
||||
|
@ -76,25 +77,29 @@ func NewParserWithOptions(options Options) (*Parser, error) {
|
|||
}
|
||||
}
|
||||
|
||||
tmpl, err := NewTemplate(template, tags, options.Separator)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
tmpl, err1 := NewTemplate(template, tags, options.Separator)
|
||||
if err1 != nil {
|
||||
err = err1
|
||||
break
|
||||
}
|
||||
matcher.Add(filter, tmpl)
|
||||
}
|
||||
return &Parser{matcher: matcher}, nil
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("exec input parser config is error: %s ", err.Error())
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// NewParser returns a GraphiteParser instance.
|
||||
func NewParser(templates []string) (*Parser, error) {
|
||||
return NewParserWithOptions(
|
||||
Options{
|
||||
Templates: templates,
|
||||
Separator: DefaultSeparator,
|
||||
})
|
||||
func init() {
|
||||
encoding.Add("graphite", func() encoding.Parser {
|
||||
return NewParser()
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) {
|
||||
func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
// parse even if the buffer begins with a newline
|
||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
||||
|
||||
|
@ -114,7 +119,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) {
|
|||
|
||||
// Trim the buffer, even though there should be no padding
|
||||
line := strings.TrimSpace(string(buf))
|
||||
if metric, err := p.Parse(line); err == nil {
|
||||
if metric, err := p.ParseLine(line); err == nil {
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
}
|
||||
|
@ -122,7 +127,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) {
|
|||
}
|
||||
|
||||
// Parse performs Graphite parsing of a single line.
|
||||
func (p *Parser) Parse(line string) (telegraf.Metric, error) {
|
||||
func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
// Break into 3 fields (name, value, timestamp).
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) != 2 && len(fields) != 3 {
|
||||
|
@ -184,7 +189,7 @@ func (p *Parser) Parse(line string) (telegraf.Metric, error) {
|
|||
|
||||
// ApplyTemplate extracts the template fields from the given line and
|
||||
// returns the measurement name and tags.
|
||||
func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) {
|
||||
func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) {
|
||||
// Break line into fields (name, value, timestamp), only name is used
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) == 0 {
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package influx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/encoding"
|
||||
)
|
||||
|
||||
type InfluxParser struct {
|
||||
}
|
||||
|
||||
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
metrics, err := telegraf.ParseMetrics(buf)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
metrics, err := p.Parse([]byte(line + "\n"))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
func NewParser() *InfluxParser {
|
||||
return &InfluxParser{}
|
||||
}
|
||||
|
||||
func (p *InfluxParser) InitConfig(configs map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
encoding.Add("influx", func() encoding.Parser {
|
||||
return NewParser()
|
||||
})
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/encoding"
|
||||
)
|
||||
|
||||
type JsonParser struct {
|
||||
}
|
||||
|
||||
func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
||||
metrics := make([]telegraf.Metric, 0)
|
||||
|
||||
var jsonOut interface{}
|
||||
err := json.Unmarshal(buf, &jsonOut)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unable to parse out as JSON, %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f := internal.JSONFlattener{}
|
||||
err = f.FlattenJSON("", jsonOut)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return append(metrics, metric), nil
|
||||
}
|
||||
|
||||
func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
metrics, err := p.Parse([]byte(line + "\n"))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
func NewParser() *JsonParser {
|
||||
return &JsonParser{}
|
||||
}
|
||||
|
||||
func (p *JsonParser) InitConfig(configs map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
encoding.Add("json", func() encoding.Parser {
|
||||
return NewParser()
|
||||
})
|
||||
}
|
|
@ -1,90 +0,0 @@
|
|||
package encoding
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/encoding/graphite"
|
||||
)
|
||||
|
||||
type Parser struct {
|
||||
graphiteParser *graphite.Parser
|
||||
}
|
||||
|
||||
func NewParser(parser *graphite.Parser) *Parser {
|
||||
return &Parser{graphiteParser: parser}
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(dataFormat string, out []byte, acc telegraf.Accumulator) error {
|
||||
var err error
|
||||
var metrics []telegraf.Metric
|
||||
var metric telegraf.Metric
|
||||
|
||||
switch dataFormat {
|
||||
case "", "json":
|
||||
var jsonOut interface{}
|
||||
err = json.Unmarshal(out, &jsonOut)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unable to parse out as JSON, %s", err)
|
||||
break
|
||||
}
|
||||
|
||||
f := internal.JSONFlattener{}
|
||||
err = f.FlattenJSON("", jsonOut)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
acc.AddFields("exec", f.Fields, nil)
|
||||
case "influx":
|
||||
now := time.Now()
|
||||
metrics, err = telegraf.ParseMetrics(out)
|
||||
for _, metric = range metrics {
|
||||
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
|
||||
}
|
||||
case "graphite":
|
||||
metrics, err = p.graphiteParser.ParseMetrics(out)
|
||||
for _, metric = range metrics {
|
||||
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("Unsupported data format: %s. Must be either json, influx or graphite ", dataFormat)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *Parser) ParseSocketLines(dataFormat string, buf []byte) ([]telegraf.Metric, error) {
|
||||
var err error
|
||||
var metrics []telegraf.Metric
|
||||
|
||||
switch dataFormat {
|
||||
case "", "graphite":
|
||||
metrics, err = p.graphiteParser.ParseMetrics(buf)
|
||||
case "influx":
|
||||
metrics, err = telegraf.ParseMetrics(buf)
|
||||
default:
|
||||
err = fmt.Errorf("Unsupported data format: %s. Must be either influx or graphite ", dataFormat)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *Parser) ParseSocketLine(dataFormat, line string) (telegraf.Metric, error) {
|
||||
metrics, err := p.ParseSocketLines(dataFormat, []byte(line+"\n"))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, fmt.Errorf("Can not parse the line: %s, for data format: %s ", line, dataFormat)
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
}
|
|
@ -1,162 +0,0 @@
|
|||
# The Socket Service Input
|
||||
|
||||
## A note on UDP/IP OS Buffer sizes
|
||||
|
||||
If you're using UDP input and running Linux or FreeBSD, please adjust your UDP buffer
|
||||
size limit, [see here for more details.](../udp/README.md#a-note-on-udpip-os-buffer-sizes)
|
||||
|
||||
## Configuration
|
||||
|
||||
Each Socket input allows the binding address, and protocol to be set.
|
||||
Currently socket service input only support two line protocol formats of metric: influx and graphite.
|
||||
Each input allows to use data_format to choose which format of contents will be sent to this socket service.
|
||||
|
||||
## Parsing Metrics
|
||||
|
||||
The Socket service input plugin allows measurements to be saved using the influx or graphite line protocol. By default, enabling the socket plugin will allow you to collect influx or graphite metrics and store them using the metric name as the measurement. If you send a graphite metric named `servers.localhost.cpu.loadavg.10`, it will store the full metric name as the measurement with no extracted tags.
|
||||
|
||||
While this default setup works, it is not the ideal way to store measurements in InfluxDB since it does not take advantage of tags. It also will not perform optimally with a large dataset sizes since queries will be forced to use regexes which is known to not scale well.
|
||||
|
||||
To extract tags from metrics, one or more templates must be configured to parse metrics into tags and measurements.
|
||||
|
||||
## Templates
|
||||
|
||||
Templates allow matching parts of a metric name to be used as tag keys in the stored metric. They have a similar format to graphite metric names. The values in between the separators are used as the tag keys. The location of the tag key that matches the same position as the graphite metric section is used as the value. If there is no value, the graphite portion is skipped.
|
||||
|
||||
The special value _measurement_ is used to define the measurement name. It can have a trailing `*` to indicate that the remainder of the metric should be used. If a _measurement_ is not specified, the full metric name is used.
|
||||
|
||||
### Basic Matching
|
||||
|
||||
`servers.localhost.cpu.loadavg.10`
|
||||
* Template: `.host.resource.measurement*`
|
||||
* Output: _measurement_ =`loadavg.10` _tags_ =`host=localhost resource=cpu`
|
||||
|
||||
### Multiple Measurement Matching
|
||||
|
||||
The _measurement_ can be specified multiple times in a template to provide more control over the measurement name. Multiple values
|
||||
will be joined together using the _Separator_ config variable. By default, this value is `.`.
|
||||
|
||||
`servers.localhost.cpu.cpu0.user`
|
||||
* Template: `.host.measurement.cpu.measurement`
|
||||
* Output: _measurement_ = `cpu.user` _tags_ = `host=localhost cpu=cpu0`
|
||||
|
||||
Since '.' requires queries on measurements to be double-quoted, you may want to set this to `_` to simplify querying parsed metrics.
|
||||
|
||||
`servers.localhost.cpu.cpu0.user`
|
||||
* Separator: `_`
|
||||
* Template: `.host.measurement.cpu.measurement`
|
||||
* Output: _measurement_ = `cpu_user` _tags_ = `host=localhost cpu=cpu0`
|
||||
|
||||
### Adding Tags
|
||||
|
||||
Additional tags can be added to a metric that don't exist on the received metric. You can add additional tags by specifying them after the pattern. Tags have the same format as the line protocol. Multiple tags are separated by commas.
|
||||
|
||||
`servers.localhost.cpu.loadavg.10`
|
||||
* Template: `.host.resource.measurement* region=us-west,zone=1a`
|
||||
* Output: _measurement_ = `loadavg.10` _tags_ = `host=localhost resource=cpu region=us-west zone=1a`
|
||||
|
||||
### Fields
|
||||
|
||||
A field key can be specified by using the keyword _field_. By default if no _field_ keyword is specified then the metric will be written to a field named _value_.
|
||||
|
||||
When using the current default engine _BZ1_, it's recommended to use a single field per value for performance reasons.
|
||||
|
||||
When using the _TSM1_ engine it's possible to amend measurement metrics with additional fields, e.g:
|
||||
|
||||
Input:
|
||||
```
|
||||
sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982
|
||||
sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982
|
||||
sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982
|
||||
sensu.metric.net.server0.eth0.tx_errors 0 1444234982
|
||||
sensu.metric.net.server0.eth0.rx_errors 0 1444234982
|
||||
sensu.metric.net.server0.eth0.tx_dropped 0 1444234982
|
||||
sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
|
||||
```
|
||||
|
||||
With template:
|
||||
```
|
||||
sensu.metric.* ..measurement.host.interface.field
|
||||
```
|
||||
|
||||
Becomes database entry:
|
||||
```
|
||||
> select * from net
|
||||
name: net
|
||||
---------
|
||||
time host interface rx_bytes rx_dropped rx_errors rx_packets tx_bytes tx_dropped tx_errors
|
||||
1444234982000000000 server0 eth0 1.015633926034834e+15 0 0 4.61295119435e+11 1.09308649338848e+15 0 0
|
||||
```
|
||||
|
||||
## Multiple Templates
|
||||
|
||||
One template may not match all metrics. For example, using multiple plugins with diamond will produce metrics in different formats. If you need to use multiple templates, you'll need to define a prefix filter that must match before the template can be applied.
|
||||
|
||||
### Filters
|
||||
|
||||
Filters have a similar format to templates but work more like wildcard expressions. When multiple filters would match a metric, the more specific one is chosen. Filters are configured by adding them before the template.
|
||||
|
||||
For example,
|
||||
|
||||
```
|
||||
servers.localhost.cpu.loadavg.10
|
||||
servers.host123.elasticsearch.cache_hits 100
|
||||
servers.host456.mysql.tx_count 10
|
||||
servers.host789.prod.mysql.tx_count 10
|
||||
```
|
||||
* `servers.*` would match all values
|
||||
* `servers.*.mysql` would match `servers.host456.mysql.tx_count 10`
|
||||
* `servers.localhost.*` would match `servers.localhost.cpu.loadavg`
|
||||
* `servers.*.*.mysql` would match `servers.host789.prod.mysql.tx_count 10`
|
||||
|
||||
## Default Templates
|
||||
|
||||
If no template filters are defined or you want to just have one basic template, you can define a default template. This template will apply to any metric that has not already matched a filter.
|
||||
|
||||
```
|
||||
dev.http.requests.200
|
||||
prod.myapp.errors.count
|
||||
dev.db.queries.count
|
||||
```
|
||||
|
||||
* `env.app.measurement*` would create
|
||||
* _measurement_=`requests.200` _tags_=`env=dev,app=http`
|
||||
* _measurement_= `errors.count` _tags_=`env=prod,app=myapp`
|
||||
* _measurement_=`queries.count` _tags_=`env=dev,app=db`
|
||||
|
||||
## Global Tags
|
||||
|
||||
If you need to add the same set of tags to all metrics, you can define them globally at the plugin level and not within each template description.
|
||||
|
||||
## Minimal Config
|
||||
```
|
||||
[[inputs.socket]]
|
||||
bind_address = ":3003" # the bind address
|
||||
protocol = "tcp" # or "udp" protocol to read via
|
||||
udp_read_buffer = 8388608 # (8*1024*1024) UDP read buffer size
|
||||
|
||||
# Data format to consume. This can be "influx" or "graphite" (line-protocol)
|
||||
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||
data_format = "graphite"
|
||||
|
||||
### If matching multiple measurement files, this string will be used to join the matched values.
|
||||
separator = "."
|
||||
|
||||
### Each template line requires a template pattern. It can have an optional
|
||||
### filter before the template and separated by spaces. It can also have optional extra
|
||||
### tags following the template. Multiple tags should be separated by commas and no spaces
|
||||
### similar to the line protocol format. The can be only one default template.
|
||||
### Templates support below format:
|
||||
### 1. filter + template
|
||||
### 2. filter + template + extra tag
|
||||
### 3. filter + template with field key
|
||||
### 4. default template
|
||||
templates = [
|
||||
"*.app env.service.resource.measurement",
|
||||
"stats.* .host.measurement* region=us-west,agent=sensu",
|
||||
"stats2.* .host.measurement.field",
|
||||
"measurement*"
|
||||
]
|
||||
```
|
||||
|
||||
|
|
@ -1,58 +0,0 @@
|
|||
package socket
|
||||
|
||||
import "github.com/influxdata/telegraf/internal/encoding/graphite"
|
||||
|
||||
const (
|
||||
// DefaultBindAddress is the default binding interface if none is specified.
|
||||
DefaultBindAddress = ":2003"
|
||||
|
||||
// DefaultProtocol is the default IP protocol used by the Graphite input.
|
||||
DefaultProtocol = "tcp"
|
||||
|
||||
// DefaultUDPReadBuffer is the default buffer size for the UDP listener.
|
||||
// Sets the size of the operating system's receive buffer associated with
|
||||
// the UDP traffic. Keep in mind that the OS must be able
|
||||
// to handle the number set here or the UDP listener will error and exit.
|
||||
//
|
||||
// DefaultReadBuffer = 0 means to use the OS default, which is usually too
|
||||
// small for high UDP performance.
|
||||
//
|
||||
// Increasing OS buffer limits:
|
||||
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
|
||||
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
|
||||
DefaultUdpReadBuffer = 0
|
||||
)
|
||||
|
||||
// Config represents the configuration for Graphite endpoints.
|
||||
type Config struct {
|
||||
BindAddress string
|
||||
Protocol string
|
||||
UdpReadBuffer int
|
||||
|
||||
graphite.Config
|
||||
}
|
||||
|
||||
// New Config instance.
|
||||
func NewConfig(bindAddress, protocol string, udpReadBuffer int, separator string, templates []string) *Config {
|
||||
c := &Config{}
|
||||
if bindAddress == "" {
|
||||
bindAddress = DefaultBindAddress
|
||||
}
|
||||
if protocol == "" {
|
||||
protocol = DefaultProtocol
|
||||
}
|
||||
if udpReadBuffer < 0 {
|
||||
udpReadBuffer = DefaultUdpReadBuffer
|
||||
}
|
||||
if separator == "" {
|
||||
separator = graphite.DefaultSeparator
|
||||
}
|
||||
|
||||
c.BindAddress = bindAddress
|
||||
c.Protocol = protocol
|
||||
c.UdpReadBuffer = udpReadBuffer
|
||||
c.Separator = separator
|
||||
c.Templates = templates
|
||||
|
||||
return c
|
||||
}
|
|
@ -1,313 +0,0 @@
|
|||
package socket
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"github.com/influxdata/telegraf/internal/encoding"
|
||||
"github.com/influxdata/telegraf/internal/encoding/graphite"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
const (
|
||||
udpBufferSize = 65536
|
||||
)
|
||||
|
||||
type tcpConnection struct {
|
||||
conn net.Conn
|
||||
connectTime time.Time
|
||||
}
|
||||
|
||||
func (c *tcpConnection) Close() {
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
// Socket represents a Socket listening service.
|
||||
type Socket struct {
|
||||
BindAddress string
|
||||
Protocol string
|
||||
UdpReadBuffer int
|
||||
|
||||
DataFormat string
|
||||
|
||||
Separator string
|
||||
Templates []string
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
encodingParser *encoding.Parser
|
||||
|
||||
logger *log.Logger
|
||||
config *Config
|
||||
|
||||
tcpConnectionsMu sync.Mutex
|
||||
tcpConnections map[string]*tcpConnection
|
||||
|
||||
ln net.Listener
|
||||
addr net.Addr
|
||||
udpConn *net.UDPConn
|
||||
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
// channel for all incoming parsed points
|
||||
metricC chan telegraf.Metric
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
bind_address = ":2003" # the bind address
|
||||
protocol = "tcp" # or "udp" protocol to read via
|
||||
udp_read_buffer = 8388608 # (8*1024*1024) UDP read buffer size
|
||||
|
||||
# Data format to consume. This can be "influx" or "graphite" (line-protocol)
|
||||
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||
data_format = "graphite"
|
||||
|
||||
### If matching multiple measurement files, this string will be used to join the matched values.
|
||||
separator = "."
|
||||
|
||||
### Each template line requires a template pattern. It can have an optional
|
||||
### filter before the template and separated by spaces. It can also have optional extra
|
||||
### tags following the template. Multiple tags should be separated by commas and no spaces
|
||||
### similar to the line protocol format. The can be only one default template.
|
||||
### Templates support below format:
|
||||
### 1. filter + template
|
||||
### 2. filter + template + extra tag
|
||||
### 3. filter + template with field key
|
||||
### 4. default template
|
||||
templates = [
|
||||
"*.app env.service.resource.measurement",
|
||||
"stats.* .host.measurement* region=us-west,agent=sensu",
|
||||
"stats2.* .host.measurement.field",
|
||||
"measurement*"
|
||||
]
|
||||
`
|
||||
|
||||
func (s *Socket) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (s *Socket) Description() string {
|
||||
return "Socket read influx or graphite line-protocol metrics from tcp/udp socket"
|
||||
}
|
||||
|
||||
// Open starts the Socket input processing data.
|
||||
func (s *Socket) Start() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c := NewConfig(s.BindAddress, s.Protocol, s.UdpReadBuffer, s.Separator, s.Templates)
|
||||
|
||||
if err := c.Validate(); err != nil {
|
||||
return fmt.Errorf("Socket input configuration is error: %s ", err.Error())
|
||||
}
|
||||
s.config = c
|
||||
|
||||
graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{
|
||||
Templates: s.config.Templates,
|
||||
Separator: s.config.Separator})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Socket input parser config is error: %s ", err.Error())
|
||||
}
|
||||
|
||||
s.encodingParser = encoding.NewParser(graphiteParser)
|
||||
|
||||
s.tcpConnections = make(map[string]*tcpConnection)
|
||||
s.done = make(chan struct{})
|
||||
s.metricC = make(chan telegraf.Metric, 50000)
|
||||
|
||||
if strings.ToLower(s.config.Protocol) == "tcp" {
|
||||
s.addr, err = s.openTCPServer()
|
||||
} else if strings.ToLower(s.config.Protocol) == "udp" {
|
||||
s.addr, err = s.openUDPServer()
|
||||
} else {
|
||||
return fmt.Errorf("unrecognized Socket input protocol %s", s.config.Protocol)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Printf("Socket Plugin Listening on %s: %s", strings.ToUpper(s.config.Protocol), s.config.BindAddress)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Socket) closeAllConnections() {
|
||||
s.tcpConnectionsMu.Lock()
|
||||
defer s.tcpConnectionsMu.Unlock()
|
||||
for _, c := range s.tcpConnections {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops all data processing on the Socket input.
|
||||
func (s *Socket) Stop() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.closeAllConnections()
|
||||
|
||||
if s.ln != nil {
|
||||
s.ln.Close()
|
||||
}
|
||||
if s.udpConn != nil {
|
||||
s.udpConn.Close()
|
||||
}
|
||||
|
||||
close(s.done)
|
||||
s.wg.Wait()
|
||||
s.done = nil
|
||||
}
|
||||
|
||||
// openTCPServer opens the Socket input in TCP mode and starts processing data.
|
||||
func (s *Socket) openTCPServer() (net.Addr, error) {
|
||||
ln, err := net.Listen("tcp", s.config.BindAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.ln = ln
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
conn, err := s.ln.Accept()
|
||||
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
|
||||
s.logger.Println("Socket TCP listener closed")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
s.logger.Println("error accepting TCP connection", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.handleTCPConnection(conn)
|
||||
}
|
||||
}()
|
||||
return ln.Addr(), nil
|
||||
}
|
||||
|
||||
// handleTCPConnection services an individual TCP connection for the Socket input.
|
||||
func (s *Socket) handleTCPConnection(conn net.Conn) {
|
||||
defer s.wg.Done()
|
||||
defer conn.Close()
|
||||
defer s.untrackConnection(conn)
|
||||
|
||||
s.trackConnection(conn)
|
||||
reader := bufio.NewReader(conn)
|
||||
|
||||
for {
|
||||
// Read up to the next newline.
|
||||
buf, err := reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Trim the buffer, even though there should be no padding
|
||||
// line := strings.TrimSpace(string(buf))
|
||||
s.handleLines(buf)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Socket) trackConnection(c net.Conn) {
|
||||
s.tcpConnectionsMu.Lock()
|
||||
defer s.tcpConnectionsMu.Unlock()
|
||||
s.tcpConnections[c.RemoteAddr().String()] = &tcpConnection{
|
||||
conn: c,
|
||||
connectTime: time.Now().UTC(),
|
||||
}
|
||||
}
|
||||
func (s *Socket) untrackConnection(c net.Conn) {
|
||||
s.tcpConnectionsMu.Lock()
|
||||
defer s.tcpConnectionsMu.Unlock()
|
||||
delete(s.tcpConnections, c.RemoteAddr().String())
|
||||
}
|
||||
|
||||
// openUDPServer opens the Socket input in UDP mode and starts processing incoming data.
|
||||
func (s *Socket) openUDPServer() (net.Addr, error) {
|
||||
addr, err := net.ResolveUDPAddr("udp", s.config.BindAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.udpConn, err = net.ListenUDP("udp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.config.UdpReadBuffer != 0 {
|
||||
err = s.udpConn.SetReadBuffer(s.config.UdpReadBuffer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s",
|
||||
s.config.UdpReadBuffer, err)
|
||||
}
|
||||
}
|
||||
|
||||
buf := make([]byte, udpBufferSize)
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
n, _, err := s.udpConn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
s.udpConn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
s.handleLines(buf[:n])
|
||||
}
|
||||
}()
|
||||
return s.udpConn.LocalAddr(), nil
|
||||
}
|
||||
|
||||
func (s *Socket) handleLines(buf []byte) {
|
||||
if buf == nil || len(buf) < 1 {
|
||||
return
|
||||
}
|
||||
|
||||
// Parse it.
|
||||
metrics, err := s.encodingParser.ParseSocketLines(s.DataFormat, buf)
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *graphite.UnsupposedValueError:
|
||||
// Socket ignores NaN values with no error.
|
||||
if math.IsNaN(err.Value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
s.logger.Printf("unable to parse lines: %s: %s", buf, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
s.metricC <- metric
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *Socket) Gather(acc telegraf.Accumulator) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
npoints := len(s.metricC)
|
||||
for i := 0; i < npoints; i++ {
|
||||
metric := <-s.metricC
|
||||
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("socket", func() telegraf.Input {
|
||||
return &Socket{logger: log.New(os.Stderr, "[socket] ", log.LstdFlags)}
|
||||
})
|
||||
}
|
|
@ -1,24 +0,0 @@
|
|||
package tail
|
||||
|
||||
import "github.com/influxdata/telegraf/internal/encoding/graphite"
|
||||
|
||||
// Config represents the configuration for Graphite endpoints.
|
||||
type Config struct {
|
||||
Files []string
|
||||
|
||||
graphite.Config
|
||||
}
|
||||
|
||||
// New Config instance.
|
||||
func NewConfig(files []string, separator string, templates []string) *Config {
|
||||
c := &Config{}
|
||||
if separator == "" {
|
||||
separator = graphite.DefaultSeparator
|
||||
}
|
||||
|
||||
c.Files = files
|
||||
c.Templates = templates
|
||||
c.Separator = separator
|
||||
|
||||
return c
|
||||
}
|
|
@ -13,6 +13,9 @@ import (
|
|||
"github.com/influxdata/telegraf/internal/encoding"
|
||||
"github.com/influxdata/telegraf/internal/encoding/graphite"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
_ "github.com/influxdata/telegraf/internal/encoding/graphite"
|
||||
_ "github.com/influxdata/telegraf/internal/encoding/influx"
|
||||
)
|
||||
|
||||
// Tail represents a tail service to
|
||||
|
@ -27,10 +30,10 @@ type Tail struct {
|
|||
|
||||
mu sync.Mutex
|
||||
|
||||
encodingParser *encoding.Parser
|
||||
encodingParser encoding.Parser
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
logger *log.Logger
|
||||
config *Config
|
||||
tailPointers []*tail.Tail
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
@ -81,22 +84,17 @@ func (t *Tail) Start() error {
|
|||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
c := NewConfig(t.Files, t.Separator, t.Templates)
|
||||
if err := c.Validate(); err != nil {
|
||||
return fmt.Errorf("Tail input configuration is error: %s ", err.Error())
|
||||
}
|
||||
t.config = c
|
||||
configs := make(map[string]interface{})
|
||||
configs["Separator"] = t.Separator
|
||||
configs["Templates"] = t.Templates
|
||||
|
||||
graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{
|
||||
Templates: t.config.Templates,
|
||||
Separator: t.config.Separator})
|
||||
var err error
|
||||
t.encodingParser, err = encoding.NewParser(t.DataFormat, configs)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Tail input parser config is error: %s ", err.Error())
|
||||
return fmt.Errorf("Tail input configuration is error: %s ", err.Error())
|
||||
}
|
||||
|
||||
t.encodingParser = encoding.NewParser(graphiteParser)
|
||||
|
||||
t.done = make(chan struct{})
|
||||
t.metricC = make(chan telegraf.Metric, 50000)
|
||||
t.tailPointers = make([]*tail.Tail, len(t.Files))
|
||||
|
@ -138,7 +136,7 @@ func (t *Tail) handleLine(line string) {
|
|||
}
|
||||
|
||||
// Parse it.
|
||||
metric, err := t.encodingParser.ParseSocketLine(t.DataFormat, line)
|
||||
metric, err := t.encodingParser.ParseLine(line)
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *graphite.UnsupposedValueError:
|
||||
|
|
Loading…
Reference in New Issue