Unit test modified.

This commit is contained in:
amandahla 2016-10-27 09:33:56 -02:00
commit 338f778df3
23 changed files with 555 additions and 278 deletions

View File

@ -30,7 +30,7 @@ continue sending logs to /var/log/telegraf/telegraf.log.
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
- [#1407](https://github.com/influxdata/telegraf/pull/1407) & [#1915](https://github.com/influxdata/telegraf/pull/1915): HTTP service listener input plugin.
- [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql
- [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output
- [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion.
@ -43,6 +43,9 @@ continue sending logs to /var/log/telegraf/telegraf.log.
### Bugfixes
- [#1955](https://github.com/influxdata/telegraf/issues/1955): Fix NATS plug-ins reconnection logic.
- [#1936](https://github.com/influxdata/telegraf/issues/1936): Set required default values in udp_listener & tcp_listener.
- [#1926](https://github.com/influxdata/telegraf/issues/1926): Fix toml unmarshal panic in Duration objects.
- [#1746](https://github.com/influxdata/telegraf/issues/1746): Fix handling of non-string values for JSON keys listed in tag_keys.
- [#1628](https://github.com/influxdata/telegraf/issues/1628): Fix mongodb input panic on version 2.2.
- [#1733](https://github.com/influxdata/telegraf/issues/1733): Fix statsd scientific notation parsing

View File

@ -85,45 +85,42 @@ if you don't have it already. You also must build with golang version 1.5+.
## How to use it:
```console
$ telegraf --help
Telegraf, The plugin-driven server agent for collecting and reporting metrics.
See usage with:
Usage:
telegraf [commands|flags]
The commands & flags are:
config print out full sample configuration to stdout
version print the version to stdout
--config <file> configuration file to load
--test gather metrics once, print them to stdout, and exit
--config-directory directory containing additional *.conf files
--input-filter filter the input plugins to enable, separator is :
--output-filter filter the output plugins to enable, separator is :
--usage print usage for a plugin, ie, 'telegraf -usage mysql'
--debug print metrics as they're generated to stdout
--quiet run in quiet mode
Examples:
# generate a telegraf config file:
telegraf config > telegraf.conf
# generate config with only cpu input & influxdb output plugins defined
telegraf config -input-filter cpu -output-filter influxdb
# run a single telegraf collection, outputing metrics to stdout
telegraf --config telegraf.conf -test
# run telegraf with all plugins defined in config file
telegraf --config telegraf.conf
# run telegraf, enabling the cpu & memory input, and influxdb output plugins
telegraf --config telegraf.conf -input-filter cpu:mem -output-filter influxdb
```
telegraf --help
```
### Generate a telegraf config file:
```
telegraf config > telegraf.conf
```
### Generate config with only cpu input & influxdb output plugins defined
```
telegraf --input-filter cpu --output-filter influxdb config
```
### Run a single telegraf collection, outputing metrics to stdout
```
telegraf --config telegraf.conf -test
```
### Run telegraf with all plugins defined in config file
```
telegraf --config telegraf.conf
```
### Run telegraf, enabling the cpu & memory input, and influxdb output plugins
```
telegraf --config telegraf.conf -input-filter cpu:mem -output-filter influxdb
```
## Configuration

View File

@ -4,17 +4,14 @@ machine:
post:
- sudo service zookeeper stop
- go version
- go version | grep 1.7.1 || sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.7.1.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.7.1.linux-amd64.tar.gz
- go version | grep 1.7.3 || sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.7.3.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.7.3.linux-amd64.tar.gz
- go version
dependencies:
override:
- docker info
post:
- gem install fpm
- sudo apt-get install -y rpm python-boto
test:
override:

View File

@ -95,7 +95,7 @@ Examples:
telegraf config > telegraf.conf
# generate config with only cpu input & influxdb output plugins defined
telegraf config -input-filter cpu -output-filter influxdb
telegraf --input-filter cpu --output-filter influxdb config
# run a single telegraf collection, outputing metrics to stdout
telegraf --config telegraf.conf -test

View File

@ -12,10 +12,10 @@ telegraf config > telegraf.conf
```
To generate a file with specific inputs and outputs, you can use the
-input-filter and -output-filter flags:
--input-filter and --output-filter flags:
```
telegraf config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka
telegraf --input-filter cpu:mem:net:swap --output-filter influxdb:kafka config
```
## Environment Variables

View File

@ -1,6 +1,8 @@
package buffer
import (
"sync"
"github.com/influxdata/telegraf"
)
@ -11,6 +13,8 @@ type Buffer struct {
drops int
// total metrics added
total int
mu sync.Mutex
}
// NewBuffer returns a Buffer
@ -61,11 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
// the batch will be of maximum length batchSize. It can be less than batchSize,
// if the length of Buffer is less than batchSize.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.mu.Lock()
n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ {
out[i] = <-b.buf
}
b.mu.Unlock()
return out
}

View File

@ -35,12 +35,21 @@ type Duration struct {
// UnmarshalTOML parses the duration from the TOML config file
func (d *Duration) UnmarshalTOML(b []byte) error {
var err error
// Parse string duration, ie, "1s"
d.Duration, err = time.ParseDuration(string(b[1 : len(b)-1]))
// see if we can straight convert it
d.Duration, err = time.ParseDuration(string(b))
if err == nil {
return nil
}
// Parse string duration, ie, "1s"
if uq, err := strconv.Unquote(string(b)); err == nil && len(uq) > 0 {
d.Duration, err = time.ParseDuration(uq)
if err == nil {
return nil
}
}
// First try parsing as integer seconds
sI, err := strconv.ParseInt(string(b), 10, 64)
if err == nil {

View File

@ -131,3 +131,22 @@ func TestRandomSleep(t *testing.T) {
elapsed = time.Since(s)
assert.True(t, elapsed < time.Millisecond*150)
}
func TestDuration(t *testing.T) {
var d Duration
d.UnmarshalTOML([]byte(`"1s"`))
assert.Equal(t, time.Second, d.Duration)
d = Duration{}
d.UnmarshalTOML([]byte(`1s`))
assert.Equal(t, time.Second, d.Duration)
d = Duration{}
d.UnmarshalTOML([]byte(`10`))
assert.Equal(t, 10*time.Second, d.Duration)
d = Duration{}
d.UnmarshalTOML([]byte(`1.5`))
assert.Equal(t, time.Second, d.Duration)
}

View File

@ -7,7 +7,7 @@
#### Description
The Cassandra plugin collects Cassandra/JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured.
The Cassandra plugin collects Cassandra 3 / JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured.
See: https://jolokia.org/ and [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html)
@ -38,9 +38,9 @@ Here is a list of metrics that might be useful to monitor your cassandra cluster
####measurement = javaGarbageCollector
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount
- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime
- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime
- /java.lang:type=GarbageCollector,name=ParNew/CollectionCount
####measurement = javaMemory
@ -50,13 +50,13 @@ Here is a list of metrics that might be useful to monitor your cassandra cluster
####measurement = cassandraCache
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hit
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hits
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hit
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hits
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity
@ -67,33 +67,33 @@ Here is a list of metrics that might be useful to monitor your cassandra cluster
####measurement = cassandraClientRequest
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures
####measurement = cassandraCommitLog
- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks
- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks
- /org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize
####measurement = cassandraCompaction
- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTask
- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks
- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTasks
- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks
- /org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted
- /org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted
####measurement = cassandraStorage
- /org.apache.cassandra.metrics:type=Storage,name=Load
- /org.apache.cassandra.metrics:type=Storage,name=Exceptions
- /org.apache.cassandra.metrics:type=Storage,name=Exceptions
####measurement = cassandraTable
Using wildcards for "keyspace" and "scope" can create a lot of series as metrics will be reported for every table and keyspace including internal system tables. Specify a keyspace name and/or a table name to limit them.
@ -101,25 +101,25 @@ Using wildcards for "keyspace" and "scope" can create a lot of series as metrics
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=LiveDiskSpaceUsed
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=TotalDiskSpaceUsed
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency
####measurement = cassandraThreadPools
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks

View File

@ -0,0 +1,43 @@
package http_listener
import (
"sync/atomic"
)
type pool struct {
buffers chan []byte
size int
created int64
}
// NewPool returns a new pool object.
// n is the number of buffers
// bufSize is the size (in bytes) of each buffer
func NewPool(n, bufSize int) *pool {
return &pool{
buffers: make(chan []byte, n),
size: bufSize,
}
}
func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
atomic.AddInt64(&p.created, 1)
return make([]byte, p.size)
}
}
func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
}
}
func (p *pool) ncreated() int64 {
return atomic.LoadInt64(&p.created)
}

View File

@ -1,9 +1,9 @@
package http_listener
import (
"bufio"
"bytes"
"fmt"
"compress/gzip"
"io"
"log"
"net"
"net/http"
@ -13,135 +13,137 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/http_listener/stoppableListener"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
)
type HttpListener struct {
const (
// DEFAULT_MAX_BODY_SIZE is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 500 MB
DEFAULT_MAX_BODY_SIZE = 500 * 1024 * 1024
// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for
// a single InfluxDB point.
// 64 KB
DEFAULT_MAX_LINE_SIZE = 64 * 1024
)
type HTTPListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize int64
MaxLineSize int
sync.Mutex
mu sync.Mutex
wg sync.WaitGroup
listener *stoppableListener.StoppableListener
listener net.Listener
parser parsers.Parser
parser influx.InfluxParser
acc telegraf.Accumulator
pool *pool
}
const sampleConfig = `
## Address and port to host HTTP listener on
service_address = ":8186"
## timeouts
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
max_body_size = 0
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = 0
`
func (t *HttpListener) SampleConfig() string {
func (h *HTTPListener) SampleConfig() string {
return sampleConfig
}
func (t *HttpListener) Description() string {
func (h *HTTPListener) Description() string {
return "Influx HTTP write listener"
}
func (t *HttpListener) Gather(_ telegraf.Accumulator) error {
func (h *HTTPListener) Gather(_ telegraf.Accumulator) error {
log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated())
return nil
}
func (t *HttpListener) SetParser(parser parsers.Parser) {
t.parser = parser
}
// Start starts the http listener service.
func (t *HttpListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.mu.Lock()
defer h.mu.Unlock()
t.acc = acc
if h.MaxBodySize == 0 {
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
}
if h.MaxLineSize == 0 {
h.MaxLineSize = DEFAULT_MAX_LINE_SIZE
}
var rawListener, err = net.Listen("tcp", t.ServiceAddress)
if err != nil {
return err
}
t.listener, err = stoppableListener.New(rawListener)
h.acc = acc
h.pool = NewPool(200, h.MaxLineSize)
var listener, err = net.Listen("tcp", h.ServiceAddress)
if err != nil {
return err
}
h.listener = listener
go t.httpListen()
h.wg.Add(1)
go func() {
defer h.wg.Done()
h.httpListen()
}()
log.Printf("I! Started HTTP listener service on %s\n", t.ServiceAddress)
log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress)
return nil
}
// Stop cleans up all resources
func (t *HttpListener) Stop() {
t.Lock()
defer t.Unlock()
func (h *HTTPListener) Stop() {
h.mu.Lock()
defer h.mu.Unlock()
t.listener.Stop()
t.listener.Close()
h.listener.Close()
h.wg.Wait()
t.wg.Wait()
log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress)
}
// httpListen listens for HTTP requests.
func (t *HttpListener) httpListen() error {
if t.ReadTimeout.Duration < time.Second {
t.ReadTimeout.Duration = time.Second * 10
// httpListen sets up an http.Server and calls server.Serve.
// like server.Serve, httpListen will always return a non-nil error, for this
// reason, the error returned should probably be ignored.
// see https://golang.org/pkg/net/http/#Server.Serve
func (h *HTTPListener) httpListen() error {
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if t.WriteTimeout.Duration < time.Second {
t.WriteTimeout.Duration = time.Second * 10
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}
var server = http.Server{
Handler: t,
ReadTimeout: t.ReadTimeout.Duration,
WriteTimeout: t.WriteTimeout.Duration,
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
}
return server.Serve(t.listener)
return server.Serve(h.listener)
}
func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/write":
var http400msg bytes.Buffer
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
http400msg.WriteString(err.Error() + " ")
}
}
if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
res.WriteHeader(http.StatusNoContent)
}
h.serveWrite(res, req)
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
@ -158,8 +160,135 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
}
}
func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize {
tooLarge(res)
return
}
now := time.Now()
// Handle gzip request bodies
body := req.Body
var err error
if req.Header.Get("Content-Encoding") == "gzip" {
body, err = gzip.NewReader(req.Body)
defer body.Close()
if err != nil {
log.Println("E! " + err.Error())
badRequest(res)
return
}
}
body = http.MaxBytesReader(res, body, h.MaxBodySize)
var return400 bool
var hangingBytes bool
buf := h.pool.get()
defer h.pool.put(buf)
bufStart := 0
for {
n, err := io.ReadFull(body, buf[bufStart:])
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
log.Println("E! " + err.Error())
// problem reading the request body
badRequest(res)
return
}
if err == io.EOF {
if return400 {
badRequest(res)
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}
if hangingBytes {
i := bytes.IndexByte(buf, '\n')
if i == -1 {
// still didn't find a newline, keep scanning
continue
}
// rotate the bit remaining after the first newline to the front of the buffer
i++ // start copying after the newline
bufStart = len(buf) - i
if bufStart > 0 {
copy(buf, buf[i:])
}
hangingBytes = false
continue
}
if err == io.ErrUnexpectedEOF {
// finished reading the request body
if err := h.parse(buf[:n+bufStart], now); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
if return400 {
badRequest(res)
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}
// if we got down here it means that we filled our buffer, and there
// are still bytes remaining to be read. So we will parse up until the
// final newline, then push the rest of the bytes into the next buffer.
i := bytes.LastIndexByte(buf, '\n')
if i == -1 {
// drop any line longer than the max buffer size
log.Printf("E! http_listener received a single line longer than the maximum of %d bytes",
len(buf))
hangingBytes = true
return400 = true
bufStart = 0
continue
}
if err := h.parse(buf[:i], now); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
// rotate the bit remaining after the last newline to the front of the buffer
i++ // start copying after the newline
bufStart = len(buf) - i
if bufStart > 0 {
copy(buf, buf[i:])
}
}
}
func (h *HTTPListener) parse(b []byte, t time.Time) error {
metrics, err := h.parser.ParseWithDefaultTime(b, t)
for _, m := range metrics {
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return err
}
func tooLarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(`{"error":"http: request body too large"}`))
}
func badRequest(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(`{"error":"http: bad request"}`))
}
func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{}
return &HTTPListener{
ServiceAddress: ":8186",
}
})
}

View File

@ -1,16 +1,16 @@
package http_listener
import (
"bytes"
"io/ioutil"
"net/http"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"bytes"
"github.com/stretchr/testify/require"
"net/http"
)
const (
@ -27,17 +27,15 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
emptyMsg = ""
)
func newTestHttpListener() *HttpListener {
listener := &HttpListener{
func newTestHTTPListener() *HTTPListener {
listener := &HTTPListener{
ServiceAddress: ":8186",
}
return listener
}
func TestWriteHTTP(t *testing.T) {
listener := newTestHttpListener()
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)
listener := newTestHTTPListener()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -71,10 +69,10 @@ func TestWriteHTTP(t *testing.T) {
)
}
// Post a gigantic metric to the listener:
// Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
require.EqualValues(t, 400, resp.StatusCode)
time.Sleep(time.Millisecond * 15)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
@ -83,11 +81,133 @@ func TestWriteHTTP(t *testing.T) {
)
}
func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: ":8296",
MaxLineSize: 128 * 1000,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// Post a gigantic metric to the listener and verify that it writes OK this time:
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}
func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: ":8297",
MaxBodySize: 4096,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
require.EqualValues(t, 413, resp.StatusCode)
}
func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: ":8298",
MaxLineSize: 70,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: ":8300",
MaxLineSize: 100,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
require.NoError(t, err)
require.EqualValues(t, 400, resp.StatusCode)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
// test that writing gzipped data works
func TestWriteHTTPGzippedData(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: ":8299",
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
require.NoError(t, err)
req, err := http.NewRequest("POST", "http://localhost:8299/write", bytes.NewBuffer(data))
require.NoError(t, err)
req.Header.Set("Content-Encoding", "gzip")
client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 50)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
// writes 25,000 metrics to the listener with 10 different writers
func TestWriteHTTPHighTraffic(t *testing.T) {
listener := &HttpListener{ServiceAddress: ":8286"}
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)
listener := &HTTPListener{ServiceAddress: ":8286"}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -110,15 +230,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
}
wg.Wait()
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 250)
listener.Gather(acc)
require.Equal(t, int64(25000), int64(acc.NMetrics()))
}
func TestReceive404ForInvalidEndpoint(t *testing.T) {
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
listener := newTestHTTPListener()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -135,8 +254,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
listener := newTestHTTPListener()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -153,8 +271,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
func TestWriteHTTPEmpty(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
listener := newTestHTTPListener()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -171,8 +288,7 @@ func TestWriteHTTPEmpty(t *testing.T) {
func TestQueryAndPingHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
listener := newTestHTTPListener()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))

View File

@ -1,10 +0,0 @@
Copyright (c) 2014, Eric Urban
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,62 +0,0 @@
package stoppableListener
import (
"errors"
"net"
"time"
)
type StoppableListener struct {
*net.TCPListener //Wrapped listener
stop chan int //Channel used only to indicate listener should shutdown
}
func New(l net.Listener) (*StoppableListener, error) {
tcpL, ok := l.(*net.TCPListener)
if !ok {
return nil, errors.New("Cannot wrap listener")
}
retval := &StoppableListener{}
retval.TCPListener = tcpL
retval.stop = make(chan int)
return retval, nil
}
var StoppedError = errors.New("Listener stopped")
func (sl *StoppableListener) Accept() (net.Conn, error) {
for {
//Wait up to one second for a new connection
sl.SetDeadline(time.Now().Add(time.Second))
newConn, err := sl.TCPListener.Accept()
//Check for the channel being closed
select {
case <-sl.stop:
return nil, StoppedError
default:
//If the channel is still open, continue as normal
}
if err != nil {
netErr, ok := err.(net.Error)
//If this is a timeout, then continue to wait for
//new connections
if ok && netErr.Timeout() && netErr.Temporary() {
continue
}
}
return newConn, err
}
}
func (sl *StoppableListener) Stop() {
close(sl.stop)
}

Binary file not shown.

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
@ -31,6 +30,8 @@ type Kubernetes struct {
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
RoundTripper http.RoundTripper
}
var sampleConfig = `
@ -101,15 +102,12 @@ func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) err
return err
}
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: time.Duration(3 * time.Second),
DisableKeepAlives: false,
if k.RoundTripper == nil {
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
}
if k.BearerToken != "" {
@ -120,7 +118,7 @@ func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) err
req.Header.Set("Authorization", "Bearer "+string(token))
}
resp, err = rt.RoundTrip(req)
resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}

View File

@ -91,8 +91,15 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
var connectErr error
// set default NATS connection options
opts := nats.DefaultOptions
// override max reconnection tries
opts.MaxReconnect = -1
// override servers if any were specified
opts.Servers = n.Servers
opts.Secure = n.Secure
if n.Conn == nil || n.Conn.IsClosed() {

View File

@ -10,7 +10,7 @@ The plugin will tag processes by their PID and their process name.
Processes can be specified either by pid file, by executable name, by command
line pattern matching, or by username (in this order or priority. Procstat
plugin will use `pgrep` when executable name is provided to obtain the pid.
Proctstas plugin will transmit IO, memory, cpu, file descriptor related
Procstat plugin will transmit IO, memory, cpu, file descriptor related
measurements for every process specified. A prefix can be set to isolate
individual process specific measurements.

View File

@ -52,14 +52,14 @@ var malformedwarn = "E! tcp_listener has received %d malformed packets" +
const sampleConfig = `
## Address and port to host TCP listener on
service_address = ":8094"
# service_address = ":8094"
## Number of TCP messages allowed to queue up. Once filled, the
## TCP listener will start dropping packets.
allowed_pending_messages = 10000
# allowed_pending_messages = 10000
## Maximum number of concurrent TCP connections to allow
max_tcp_connections = 250
# max_tcp_connections = 250
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
@ -276,6 +276,10 @@ func (t *TcpListener) remember(id string, conn *net.TCPConn) {
func init() {
inputs.Add("tcp_listener", func() telegraf.Input {
return &TcpListener{}
return &TcpListener{
ServiceAddress: ":8094",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
}
})
}

View File

@ -51,11 +51,11 @@ var malformedwarn = "E! udp_listener has received %d malformed packets" +
const sampleConfig = `
## Address and port to host UDP listener on
service_address = ":8092"
# service_address = ":8092"
## Number of UDP messages allowed to queue up. Once filled, the
## UDP listener will start dropping packets.
allowed_pending_messages = 10000
# allowed_pending_messages = 10000
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
@ -178,6 +178,9 @@ func (u *UdpListener) udpParser() error {
func init() {
inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{}
return &UdpListener{
ServiceAddress: ":8092",
AllowedPendingMessages: 10000,
}
})
}

View File

@ -62,14 +62,23 @@ func (n *NATS) SetSerializer(serializer serializers.Serializer) {
func (n *NATS) Connect() error {
var err error
// set NATS connection options
// set default NATS connection options
opts := nats_client.DefaultOptions
// override max reconnection tries
opts.MaxReconnect = -1
// override servers, if any were specified
opts.Servers = n.Servers
// override authentication, if any was specified
if n.Username != "" {
opts.User = n.Username
opts.Password = n.Password
}
// override TLS, if it was specified
tlsConfig, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {

View File

@ -3,6 +3,7 @@ package influx
import (
"bytes"
"fmt"
"time"
"github.com/influxdata/telegraf"
@ -15,15 +16,10 @@ type InfluxParser struct {
DefaultTags map[string]string
}
// Parse returns a slice of Metrics from a text representation of a
// metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
points, err := models.ParsePoints(buf)
points, err := models.ParsePointsWithPrecision(buf, t, "n")
metrics := make([]telegraf.Metric, len(points))
for i, point := range points {
for k, v := range p.DefaultTags {
@ -39,6 +35,15 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, err
}
// Parse returns a slice of Metrics from a text representation of a
// metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return p.ParseWithDefaultTime(buf, time.Now())
}
func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

View File

@ -75,6 +75,10 @@ cat telegraf-race | gzip > $CIRCLE_ARTIFACTS/telegraf-race.gz
eval "git describe --exact-match HEAD"
if [ $? -eq 0 ]; then
# install fpm (packaging dependency)
exit_if_fail gem install fpm
# install boto & rpm (packaging & AWS dependencies)
exit_if_fail sudo apt-get install -y rpm python-boto
unset GOGC
tag=$(git describe --exact-match HEAD)
echo $tag