Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2bc5594b44 | ||
|
|
99b53c8745 | ||
|
|
27b89dff48 | ||
|
|
b16eb6eae6 | ||
|
|
feaf76913b |
@@ -1,4 +1,4 @@
|
||||
## v1.3 [unreleased]
|
||||
## v1.3 [2017-05-15]
|
||||
|
||||
### Release Notes
|
||||
|
||||
@@ -81,6 +81,7 @@ be deprecated eventually.
|
||||
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
|
||||
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
|
||||
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
|
||||
- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
||||
@@ -111,6 +111,7 @@ configuration options.
|
||||
* [couchbase](./plugins/inputs/couchbase)
|
||||
* [couchdb](./plugins/inputs/couchdb)
|
||||
* [disque](./plugins/inputs/disque)
|
||||
* [dmcache](./plugins/inputs/dmcache)
|
||||
* [dns query time](./plugins/inputs/dns_query)
|
||||
* [docker](./plugins/inputs/docker)
|
||||
* [dovecot](./plugins/inputs/dovecot)
|
||||
@@ -127,6 +128,7 @@ configuration options.
|
||||
* [ipmi_sensor](./plugins/inputs/ipmi_sensor)
|
||||
* [iptables](./plugins/inputs/iptables)
|
||||
* [jolokia](./plugins/inputs/jolokia)
|
||||
* [kapacitor](./plugins/inputs/kapacitor)
|
||||
* [kubernetes](./plugins/inputs/kubernetes)
|
||||
* [leofs](./plugins/inputs/leofs)
|
||||
* [lustre2](./plugins/inputs/lustre2)
|
||||
@@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
|
||||
* [github](./plugins/inputs/webhooks/github)
|
||||
* [mandrill](./plugins/inputs/webhooks/mandrill)
|
||||
* [rollbar](./plugins/inputs/webhooks/rollbar)
|
||||
* [papertrail](./plugins/inputs/webhooks/papertrail)
|
||||
|
||||
Telegraf is able to parse the following input data formats into metrics, these
|
||||
formats may be used with input plugins supporting the `data_format` option:
|
||||
|
||||
@@ -25,6 +25,7 @@ type UDPConfig struct {
|
||||
PayloadSize int
|
||||
}
|
||||
|
||||
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
|
||||
func NewUDP(config UDPConfig) (Client, error) {
|
||||
p, err := url.Parse(config.URL)
|
||||
if err != nil {
|
||||
@@ -55,20 +56,22 @@ type udpClient struct {
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
// Query will send the provided query command to the client, returning an error if any issues arise
|
||||
func (c *udpClient) Query(command string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write will send the byte stream to the given UDP client endpoint
|
||||
func (c *udpClient) Write(b []byte) (int, error) {
|
||||
return c.WriteStream(bytes.NewReader(b), -1)
|
||||
}
|
||||
|
||||
// write params are ignored by the UDP client
|
||||
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
|
||||
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||
return c.WriteStream(bytes.NewReader(b), -1)
|
||||
}
|
||||
|
||||
// contentLength is ignored by the UDP client.
|
||||
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
|
||||
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
var totaln int
|
||||
for {
|
||||
@@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
return totaln, nil
|
||||
}
|
||||
|
||||
// contentLength is ignored by the UDP client.
|
||||
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
|
||||
// write params are ignored by the UDP client
|
||||
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
|
||||
return c.WriteStream(r, -1)
|
||||
}
|
||||
|
||||
// Close will terminate the provided client connection
|
||||
func (c *udpClient) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"strings"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||
)
|
||||
|
||||
// InfluxDB struct is the primary data structure for the plugin
|
||||
type InfluxDB struct {
|
||||
// URL is only for backwards compatability
|
||||
URL string
|
||||
@@ -40,7 +42,8 @@ type InfluxDB struct {
|
||||
// Precision is only here for legacy support. It will be ignored.
|
||||
Precision string
|
||||
|
||||
clients []client.Client
|
||||
clients []client.Client
|
||||
splitPayload bool
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
@@ -79,11 +82,10 @@ var sampleConfig = `
|
||||
# insecure_skip_verify = false
|
||||
`
|
||||
|
||||
// Connect initiates the primary connection to the range of provided URLs
|
||||
func (i *InfluxDB) Connect() error {
|
||||
var urls []string
|
||||
for _, u := range i.URLs {
|
||||
urls = append(urls, u)
|
||||
}
|
||||
urls = append(urls, i.URLs...)
|
||||
|
||||
// Backward-compatability with single Influx URL config files
|
||||
// This could eventually be removed in favor of specifying the urls as a list
|
||||
@@ -109,6 +111,7 @@ func (i *InfluxDB) Connect() error {
|
||||
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
||||
}
|
||||
i.clients = append(i.clients, c)
|
||||
i.splitPayload = true
|
||||
default:
|
||||
// If URL doesn't start with "udp", assume HTTP client
|
||||
config := client.HTTPConfig{
|
||||
@@ -144,26 +147,41 @@ func (i *InfluxDB) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close will terminate the session to the backend, returning error if an issue arises
|
||||
func (i *InfluxDB) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SampleConfig returns the formatted sample configuration for the plugin
|
||||
func (i *InfluxDB) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description returns the human-readable function definition of the plugin
|
||||
func (i *InfluxDB) Description() string {
|
||||
return "Configuration for influxdb server to send metrics to"
|
||||
}
|
||||
|
||||
// Choose a random server in the cluster to write to until a successful write
|
||||
func (i *InfluxDB) getReader(metrics []telegraf.Metric) io.Reader {
|
||||
if !i.splitPayload {
|
||||
return metric.NewReader(metrics)
|
||||
}
|
||||
|
||||
splitData := make([]telegraf.Metric, 0)
|
||||
for _, m := range metrics {
|
||||
splitData = append(splitData, m.Split(i.UDPPayload)...)
|
||||
}
|
||||
return metric.NewReader(splitData)
|
||||
}
|
||||
|
||||
// Write will choose a random server in the cluster to write to until a successful write
|
||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||
bufsize := 0
|
||||
for _, m := range metrics {
|
||||
bufsize += m.Len()
|
||||
}
|
||||
r := metric.NewReader(metrics)
|
||||
r := i.getReader(metrics)
|
||||
|
||||
// This will get set to nil if a successful write occurs
|
||||
err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
|
||||
|
||||
Reference in New Issue
Block a user