Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86a6f337f6 | ||
|
|
a1f7d5549b | ||
|
|
5fbd07b146 | ||
|
|
b8f3c68b89 | ||
|
|
043b171028 | ||
|
|
b86d789abe | ||
|
|
e1c7dc80ae | ||
|
|
1fe0791a74 | ||
|
|
0d87eb4725 | ||
|
|
e2dac56a40 | ||
|
|
039fc80ed7 | ||
|
|
8e90a444c2 | ||
|
|
2ccd828e81 | ||
|
|
480f29bde7 | ||
|
|
11a6db8268 | ||
|
|
6566cc51e3 | ||
|
|
051cd03bbf | ||
|
|
0aa0a40d89 | ||
|
|
9dcbe750d1 | ||
|
|
10bf663a3b | ||
|
|
b71cfb7cfd | ||
|
|
87fedcfa74 | ||
|
|
3b9174a322 | ||
|
|
851fdd439f | ||
|
|
ab78e8efec | ||
|
|
b829febe0d |
14
CHANGELOG.md
Normal file
14
CHANGELOG.md
Normal file
@@ -0,0 +1,14 @@
|
||||
## v0.1.2 [unreleased]
|
||||
|
||||
### Features
|
||||
- [#12](https://github.com/influxdb/influxdb/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit!
|
||||
- [#14](https://github.com/influxdb/influxdb/pull/14): Clarify the S3 buckets that Telegraf is pushed to.
|
||||
|
||||
### Bugfixes
|
||||
- [#13](https://github.com/influxdb/influxdb/pull/13): Fix the packaging script.
|
||||
|
||||
## v0.1.1 [2015-06-19]
|
||||
|
||||
### Release Notes
|
||||
|
||||
This is the initial release of Telegraf.
|
||||
111
PLUGINS.md
111
PLUGINS.md
@@ -1,111 +0,0 @@
|
||||
Telegraf is entirely plugin driven. This interface allows for operators to
|
||||
pick and chose what is gathered as well as makes it easy for developers
|
||||
to create new ways of generating metrics.
|
||||
|
||||
Plugin authorship is kept as simple as possible to promote people to develop
|
||||
and submit new plugins.
|
||||
|
||||
## Guidelines
|
||||
|
||||
* A plugin must conform to the `plugins.Plugin` interface.
|
||||
* Telegraf promises to run each plugin's Gather function serially. This means
|
||||
developers don't have to worry about thread safety within these functions.
|
||||
* Each generated metric automatically has the name of the plugin that generated
|
||||
it prepended. This is to keep plugins honest.
|
||||
* Plugins should call `plugins.Add` in their `init` function to register themselves.
|
||||
See below for a quick example.
|
||||
* To be available within Telegraf itself, plugins must add themselves to the `github.com/influxdb/telegraf/plugins/all/all.go` file.
|
||||
* The `SampleConfig` function should return valid toml that describes how the plugin can be configured. This is include in `telegraf -sample-config`.
|
||||
* The `Description` function should say in one line what this plugin does.
|
||||
|
||||
### Plugin interface
|
||||
|
||||
```go
|
||||
type Plugin interface {
|
||||
SampleConfig() string
|
||||
Description() string
|
||||
Gather(Accumulator) error
|
||||
}
|
||||
|
||||
type Accumulator interface {
|
||||
Add(measurement string, value interface{}, tags map[string]string)
|
||||
AddValuesWithTime(measurement string, values map[string]interface{}, tags map[string]string, timestamp time.Time)
|
||||
}
|
||||
```
|
||||
|
||||
### Accumulator
|
||||
|
||||
The way that a plugin emits metrics is by interacting with the Accumulator.
|
||||
|
||||
The `Add` function takes 3 arguments:
|
||||
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
|
||||
* **value**: A value for the metric. This accepts 5 different types of value:
|
||||
* **int**: The most common type. All int types are accepted but favor using `int64`
|
||||
Useful for counters, etc.
|
||||
* **float**: Favor `float64`, useful for gauges, percentages, etc.
|
||||
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
|
||||
* **string**: Typically used to indicate a message, or some kind of freeform information.
|
||||
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
|
||||
* **tags**: This is a map of strings to strings to describe the where or who about the metric. For instance, the `net` plugin adds a tag named `"interface"` set to the name of the network interface, like `"eth0"`.
|
||||
|
||||
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
|
||||
used are the same type profile as **value** above. The **timestamp** argument
|
||||
allows a point to be registered as having occurred at an arbitrary time.
|
||||
|
||||
Let's say you've written a plugin that emits metrics about processes on the current host.
|
||||
|
||||
```go
|
||||
|
||||
type Process struct {
|
||||
CPUTime float64
|
||||
MemoryBytes int64
|
||||
PID int
|
||||
}
|
||||
|
||||
func Gather(acc plugins.Accumulator) error {
|
||||
for _, process := range system.Processes() {
|
||||
tags := map[string]string {
|
||||
"pid": fmt.Sprintf("%d", process.Pid),
|
||||
}
|
||||
|
||||
acc.Add("cpu", process.CPUTime, tags)
|
||||
acc.Add("memoory", process.MemoryBytes, tags)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
```go
|
||||
|
||||
// simple.go
|
||||
|
||||
import "github.com/influxdb/telegraf/plugins"
|
||||
|
||||
type Simple struct {
|
||||
Ok bool
|
||||
}
|
||||
|
||||
func (s *Simple) Description() string {
|
||||
return "a demo plugin"
|
||||
}
|
||||
|
||||
func (s *Simple) SampleConfig() string {
|
||||
return "ok = true # indicate if everything is fine"
|
||||
}
|
||||
|
||||
func (s *Simple) Gather(acc plugins.Accumulator) error {
|
||||
if s.Ok {
|
||||
acc.Add("state", "pretty good", nil)
|
||||
} else {
|
||||
acc.Add("state", "not great", nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("simple", func() plugins.Plugin { &Simple{} })
|
||||
}
|
||||
```
|
||||
|
||||
152
README.md
152
README.md
@@ -1,13 +1,37 @@
|
||||
# Telegraf - A native agent for InfluxDB
|
||||
|
||||
Telegraf is an agent written in Go for collecting metrics from the system it's running on or from other services and writing them into InfluxDB.
|
||||
|
||||
Design goals are to have a minimal memory footprint with a plugin system so that developers in the community can easily add support for collecting metrics from well known services (like Hadoop, or Postgres, or Redis) and third party APIs (like Mailchimp, AWS CloudWatch, or Google Analytics).
|
||||
|
||||
We'll eagerly accept pull requests for new plugins and will manage the set of plugins that Telegraf supports. See the bottom of this doc for instructions on writing new plugins.
|
||||
|
||||
## Quickstart
|
||||
|
||||
* Build from source or download telegraf (binaries forthcoming)
|
||||
* Build from source or download telegraf:
|
||||
|
||||
### Linux packages for Debian/Ubuntu and RHEL/CentOS:
|
||||
|
||||
```
|
||||
http://get.influxdb.org/telegraf/telegraf_0.1.1_amd64.deb
|
||||
http://get.influxdb.org/telegraf/telegraf-0.1.1-1.x86_64.rpm
|
||||
```
|
||||
|
||||
### OSX via Homebrew:
|
||||
|
||||
```
|
||||
brew update
|
||||
brew install telegraf
|
||||
```
|
||||
|
||||
### How to use it:
|
||||
|
||||
* Run `telegraf -sample-config > telegraf.toml` to create an initial configuration
|
||||
* Edit the configuration to match your needs
|
||||
* Run `telegraf -config telegraf.toml -test` to output one full measurement sample to STDOUT
|
||||
* Run `telegraf -config telegraf.toml` to gather and send metrics to InfluxDB
|
||||
|
||||
|
||||
## Telegraf Options
|
||||
|
||||
Telegraf has a few options you can configure under the `agent` section of the config. If you don't see an `agent` section run `telegraf -sample-config > telegraf.toml` to create a valid initial configuration:
|
||||
@@ -16,6 +40,18 @@ Telegraf has a few options you can configure under the `agent` section of the co
|
||||
* **interval**: How ofter to gather metrics. Uses a simple number + unit parser, ie "10s" for 10 seconds or "5m" for 5 minutes.
|
||||
* **debug**: Set to true to gather and send metrics to STDOUT as well as InfluxDB.
|
||||
|
||||
## Supported Plugins
|
||||
|
||||
Telegraf currently has support for collecting metrics from:
|
||||
|
||||
* System (memory, CPU, network, etc.)
|
||||
* Docker
|
||||
* MySQL
|
||||
* PostgreSQL
|
||||
* Redis
|
||||
|
||||
We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.
|
||||
|
||||
## Plugin Options
|
||||
|
||||
There are 3 configuration options that are configurable per plugin:
|
||||
@@ -23,3 +59,117 @@ There are 3 configuration options that are configurable per plugin:
|
||||
* **pass**: An array of strings that is used to filter metrics generated by the current plugin. Each string in the array is tested as a prefix against metrics and if it matches, the metric is emitted.
|
||||
* **drop**: The inverse of pass, if a metric matches, it is not emitted.
|
||||
* **interval**: How often to gather this metric. Normal plugins use a single global interval, but if one particular plugin should be run less or more often, you can configure that here.
|
||||
|
||||
## Plugins
|
||||
|
||||
This section is for developers that want to create new collection plugins. Telegraf is entirely plugin driven. This interface allows for operators to
|
||||
pick and chose what is gathered as well as makes it easy for developers
|
||||
to create new ways of generating metrics.
|
||||
|
||||
Plugin authorship is kept as simple as possible to promote people to develop
|
||||
and submit new plugins.
|
||||
|
||||
## Guidelines
|
||||
|
||||
* A plugin must conform to the `plugins.Plugin` interface.
|
||||
* Telegraf promises to run each plugin's Gather function serially. This means
|
||||
developers don't have to worry about thread safety within these functions.
|
||||
* Each generated metric automatically has the name of the plugin that generated
|
||||
it prepended. This is to keep plugins honest.
|
||||
* Plugins should call `plugins.Add` in their `init` function to register themselves.
|
||||
See below for a quick example.
|
||||
* To be available within Telegraf itself, plugins must add themselves to the `github.com/influxdb/telegraf/plugins/all/all.go` file.
|
||||
* The `SampleConfig` function should return valid toml that describes how the plugin can be configured. This is include in `telegraf -sample-config`.
|
||||
* The `Description` function should say in one line what this plugin does.
|
||||
|
||||
### Plugin interface
|
||||
|
||||
```go
|
||||
type Plugin interface {
|
||||
SampleConfig() string
|
||||
Description() string
|
||||
Gather(Accumulator) error
|
||||
}
|
||||
|
||||
type Accumulator interface {
|
||||
Add(measurement string, value interface{}, tags map[string]string)
|
||||
AddValuesWithTime(measurement string, values map[string]interface{}, tags map[string]string, timestamp time.Time)
|
||||
}
|
||||
```
|
||||
|
||||
### Accumulator
|
||||
|
||||
The way that a plugin emits metrics is by interacting with the Accumulator.
|
||||
|
||||
The `Add` function takes 3 arguments:
|
||||
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
|
||||
* **value**: A value for the metric. This accepts 5 different types of value:
|
||||
* **int**: The most common type. All int types are accepted but favor using `int64`
|
||||
Useful for counters, etc.
|
||||
* **float**: Favor `float64`, useful for gauges, percentages, etc.
|
||||
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
|
||||
* **string**: Typically used to indicate a message, or some kind of freeform information.
|
||||
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
|
||||
* **tags**: This is a map of strings to strings to describe the where or who about the metric. For instance, the `net` plugin adds a tag named `"interface"` set to the name of the network interface, like `"eth0"`.
|
||||
|
||||
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
|
||||
used are the same type profile as **value** above. The **timestamp** argument
|
||||
allows a point to be registered as having occurred at an arbitrary time.
|
||||
|
||||
Let's say you've written a plugin that emits metrics about processes on the current host.
|
||||
|
||||
```go
|
||||
|
||||
type Process struct {
|
||||
CPUTime float64
|
||||
MemoryBytes int64
|
||||
PID int
|
||||
}
|
||||
|
||||
func Gather(acc plugins.Accumulator) error {
|
||||
for _, process := range system.Processes() {
|
||||
tags := map[string]string {
|
||||
"pid": fmt.Sprintf("%d", process.Pid),
|
||||
}
|
||||
|
||||
acc.Add("cpu", process.CPUTime, tags)
|
||||
acc.Add("memoory", process.MemoryBytes, tags)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
```go
|
||||
|
||||
// simple.go
|
||||
|
||||
import "github.com/influxdb/telegraf/plugins"
|
||||
|
||||
type Simple struct {
|
||||
Ok bool
|
||||
}
|
||||
|
||||
func (s *Simple) Description() string {
|
||||
return "a demo plugin"
|
||||
}
|
||||
|
||||
func (s *Simple) SampleConfig() string {
|
||||
return "ok = true # indicate if everything is fine"
|
||||
}
|
||||
|
||||
func (s *Simple) Gather(acc plugins.Accumulator) error {
|
||||
if s.Ok {
|
||||
acc.Add("state", "pretty good", nil)
|
||||
} else {
|
||||
acc.Add("state", "not great", nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("simple", func() plugins.Plugin { &Simple{} })
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -4,12 +4,15 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
type BatchPoints struct {
|
||||
mu sync.Mutex
|
||||
|
||||
client.BatchPoints
|
||||
|
||||
Debug bool
|
||||
@@ -20,6 +23,9 @@ type BatchPoints struct {
|
||||
}
|
||||
|
||||
func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) {
|
||||
bp.mu.Lock()
|
||||
defer bp.mu.Unlock()
|
||||
|
||||
measurement = bp.Prefix + measurement
|
||||
|
||||
if bp.Config != nil {
|
||||
@@ -55,6 +61,9 @@ func (bp *BatchPoints) AddValuesWithTime(
|
||||
tags map[string]string,
|
||||
timestamp time.Time,
|
||||
) {
|
||||
bp.mu.Lock()
|
||||
defer bp.mu.Unlock()
|
||||
|
||||
measurement = bp.Prefix + measurement
|
||||
|
||||
if bp.Config != nil {
|
||||
|
||||
28
package.sh
28
package.sh
@@ -45,8 +45,8 @@ ARCH=`uname -i`
|
||||
LICENSE=MIT
|
||||
URL=influxdb.com
|
||||
MAINTAINER=support@influxdb.com
|
||||
VENDOR=Influxdb
|
||||
DESCRIPTION="InfluxDB agent"
|
||||
VENDOR=InfluxDB
|
||||
DESCRIPTION="InfluxDB Telegraf agent"
|
||||
PKG_DEPS=(coreutils)
|
||||
GO_VERSION="go1.4.2"
|
||||
GOPATH_INSTALL=
|
||||
@@ -171,15 +171,6 @@ do_build() {
|
||||
echo "Build completed successfully."
|
||||
}
|
||||
|
||||
# return a list of package dependencies for fpm
|
||||
get_fpm_pkg_deps() {
|
||||
local deps=" "
|
||||
for d in ${PKG_DEPS[@]}; do
|
||||
deps="${deps} -d '${d}'"
|
||||
done
|
||||
echo -n ${deps}
|
||||
}
|
||||
|
||||
# generate_postinstall_script creates the post-install script for the
|
||||
# package. It must be passed the version.
|
||||
generate_postinstall_script() {
|
||||
@@ -287,14 +278,14 @@ else
|
||||
fi
|
||||
|
||||
COMMON_FPM_ARGS="-C $TMP_WORK_DIR --vendor $VENDOR --url $URL --license $LICENSE --maintainer $MAINTAINER --after-install $POST_INSTALL_PATH --name telegraf --version $VERSION --config-files $CONFIG_ROOT_DIR ."
|
||||
$rpm_args fpm -s dir -t rpm $(get_fpm_pkg_deps) --description "$DESCRIPTION" $COMMON_FPM_ARGS
|
||||
$rpm_args fpm -s dir -t rpm --description "$DESCRIPTION" $COMMON_FPM_ARGS
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create RPM package -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "RPM package created successfully."
|
||||
|
||||
fpm -s dir -t deb $deb_args $(get_fpm_pkg_deps) --description "$DESCRIPTION" $COMMON_FPM_ARGS
|
||||
fpm -s dir -t deb $deb_args --description "$DESCRIPTION" $COMMON_FPM_ARGS
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create Debian package -- aborting."
|
||||
cleanup_exit 1
|
||||
@@ -340,15 +331,8 @@ if [ "x$response" == "xy" ]; then
|
||||
for filepath in `ls *.{deb,rpm}`; do
|
||||
echo "Uploading $filepath to S3"
|
||||
filename=`basename $filepath`
|
||||
bucket=influxdb
|
||||
echo "Uploading $filename to s3://influxdb/$filename"
|
||||
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath s3://influxdb/$filename --acl public-read --region us-east-1
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Upload failed -- aborting".
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "Uploading $filename to s3://get.influxdb.org/$filename"
|
||||
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath s3://get.influxdb.org/$filename --acl public-read --region us-east-1
|
||||
echo "Uploading $filename to s3://get.influxdb.org/telegraf/$filename"
|
||||
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath s3://get.influxdb.org/telegraf/$filename --acl public-read --region us-east-1
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Upload failed -- aborting".
|
||||
cleanup_exit 1
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package all
|
||||
|
||||
import (
|
||||
_ "github.com/influxdb/telegraf/plugins/memcached"
|
||||
_ "github.com/influxdb/telegraf/plugins/mysql"
|
||||
_ "github.com/influxdb/telegraf/plugins/postgresql"
|
||||
_ "github.com/influxdb/telegraf/plugins/redis"
|
||||
|
||||
134
plugins/memcached/memcached.go
Normal file
134
plugins/memcached/memcached.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package memcached
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
)
|
||||
|
||||
// Memcached is a memcached plugin
|
||||
type Memcached struct {
|
||||
Servers []string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of address to gather stats about. Specify an ip on hostname
|
||||
# with optional port. ie localhost, 10.0.0.1:11211, etc.
|
||||
#
|
||||
# If no servers are specified, then localhost is used as the host.
|
||||
servers = ["localhost"]`
|
||||
|
||||
var defaultTimeout = 5 * time.Second
|
||||
|
||||
// The list of metrics tha should be calculated
|
||||
var sendAsIs = []string{
|
||||
"get_hits",
|
||||
"get_misses",
|
||||
"evictions",
|
||||
"limit_maxbytes",
|
||||
"bytes",
|
||||
}
|
||||
|
||||
// SampleConfig returns sample configuration message
|
||||
func (m *Memcached) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description returns description of Memcached plugin
|
||||
func (m *Memcached) Description() string {
|
||||
return "Read metrics from one or many memcached servers"
|
||||
}
|
||||
|
||||
// Gather reads stats from all configured servers accumulates stats
|
||||
func (m *Memcached) Gather(acc plugins.Accumulator) error {
|
||||
if len(m.Servers) == 0 {
|
||||
return m.gatherServer(":11211", acc)
|
||||
}
|
||||
|
||||
for _, serverAddress := range m.Servers {
|
||||
if err := m.gatherServer(serverAddress, acc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
|
||||
_, _, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
address = address + ":11211"
|
||||
}
|
||||
|
||||
// Connect
|
||||
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Extend connection
|
||||
conn.SetDeadline(time.Now().Add(defaultTimeout))
|
||||
|
||||
// Read and write buffer
|
||||
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
|
||||
// Send command
|
||||
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = rw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read response
|
||||
values := make(map[string]string)
|
||||
|
||||
for {
|
||||
// Read line
|
||||
line, _, errRead := rw.Reader.ReadLine()
|
||||
if errRead != nil {
|
||||
return errRead
|
||||
}
|
||||
// Done
|
||||
if bytes.Equal(line, []byte("END")) {
|
||||
break
|
||||
}
|
||||
// Read values
|
||||
var name, value string
|
||||
n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value)
|
||||
if errScan != nil || n != 2 {
|
||||
return fmt.Errorf("unexpected line in stats response: %q", line)
|
||||
}
|
||||
|
||||
// Save values
|
||||
values[name] = value
|
||||
}
|
||||
|
||||
//
|
||||
tags := map[string]string{"server": address}
|
||||
|
||||
// Process values
|
||||
for _, key := range sendAsIs {
|
||||
if value, ok := values[key]; ok {
|
||||
// Mostly it is the number
|
||||
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
|
||||
acc.Add(key, value, tags)
|
||||
} else {
|
||||
acc.Add(key, iValue, tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("memcached", func() plugins.Plugin {
|
||||
return &Memcached{}
|
||||
})
|
||||
}
|
||||
26
plugins/memcached/memcached_test.go
Normal file
26
plugins/memcached/memcached_test.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package memcached
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMemcachedGeneratesMetrics(t *testing.T) {
|
||||
m := &Memcached{
|
||||
Servers: []string{"localhost"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := m.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
intMetrics := []string{"get_hits", "get_misses", "evictions", "limit_maxbytes", "bytes"}
|
||||
|
||||
for _, metric := range intMetrics {
|
||||
assert.True(t, acc.HasIntValue(metric), metric)
|
||||
}
|
||||
}
|
||||
@@ -89,7 +89,7 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
err := p.accRow(rows, acc)
|
||||
err := p.accRow(rows, acc, serv.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -100,7 +100,7 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
||||
for _, name := range serv.Databases {
|
||||
row := db.QueryRow(`SELECT * FROM pg_stat_database WHERE datname=$1`, name)
|
||||
|
||||
err := p.accRow(row, acc)
|
||||
err := p.accRow(row, acc, serv.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -114,7 +114,7 @@ type scanner interface {
|
||||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
|
||||
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, server string) error {
|
||||
var ignore interface{}
|
||||
var name string
|
||||
var commit, rollback, read, hit int64
|
||||
@@ -135,7 +135,7 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"db": name}
|
||||
tags := map[string]string{"server": server, "db": name}
|
||||
|
||||
acc.Add("xact_commit", commit, tags)
|
||||
acc.Add("xact_rollback", rollback, tags)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -20,8 +21,9 @@ type Redis struct {
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of address to gather stats about. Specify an ip on hostname
|
||||
# with optional port. ie localhost, 10.10.3.33:18832, etc.
|
||||
# An array of URI to gather stats about. Specify an ip or hostname
|
||||
# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
|
||||
# 10.0.0.1:10000, etc.
|
||||
#
|
||||
# If no servers are specified, then localhost is used as the host.
|
||||
servers = ["localhost"]`
|
||||
@@ -73,7 +75,10 @@ var ErrProtocolError = errors.New("redis protocol error")
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *Redis) Gather(acc plugins.Accumulator) error {
|
||||
if len(g.Servers) == 0 {
|
||||
g.gatherServer(":6379", acc)
|
||||
url := &url.URL{
|
||||
Host: ":6379",
|
||||
}
|
||||
g.gatherServer(url, acc)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -82,10 +87,19 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
|
||||
var outerr error
|
||||
|
||||
for _, serv := range g.Servers {
|
||||
u, err := url.Parse(serv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
|
||||
} else if u.Scheme == "" {
|
||||
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||
u.Scheme = "tcp"
|
||||
u.Host = serv
|
||||
u.Path = ""
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(serv string) {
|
||||
defer wg.Done()
|
||||
outerr = g.gatherServer(serv, acc)
|
||||
outerr = g.gatherServer(u, acc)
|
||||
}(serv)
|
||||
}
|
||||
|
||||
@@ -96,17 +110,34 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
|
||||
|
||||
const defaultPort = "6379"
|
||||
|
||||
func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
|
||||
func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
||||
if g.c == nil {
|
||||
|
||||
_, _, err := net.SplitHostPort(addr)
|
||||
_, _, err := net.SplitHostPort(addr.Host)
|
||||
if err != nil {
|
||||
addr = addr + ":" + defaultPort
|
||||
addr.Host = addr.Host + ":" + defaultPort
|
||||
}
|
||||
|
||||
c, err := net.Dial("tcp", addr)
|
||||
c, err := net.Dial("tcp", addr.Host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr, err)
|
||||
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
|
||||
}
|
||||
|
||||
if addr.User != nil {
|
||||
pwd, set := addr.User.Password()
|
||||
if set && pwd != "" {
|
||||
c.Write([]byte(fmt.Sprintf("AUTH %s\n", pwd)))
|
||||
|
||||
r := bufio.NewReader(c)
|
||||
|
||||
line, err := r.ReadString('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if line[0] != '+' {
|
||||
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g.c = c
|
||||
@@ -157,11 +188,12 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
|
||||
continue
|
||||
}
|
||||
|
||||
tags := map[string]string{"host": addr.String()}
|
||||
val := strings.TrimSpace(parts[1])
|
||||
|
||||
ival, err := strconv.ParseUint(val, 10, 64)
|
||||
if err == nil {
|
||||
acc.Add(metric, ival, nil)
|
||||
acc.Add(metric, ival, tags)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -170,7 +202,7 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
|
||||
return err
|
||||
}
|
||||
|
||||
acc.Add(metric, fval, nil)
|
||||
acc.Add(metric, fval, tags)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestRedisGeneratesMetrics(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
addr := l.Addr().String()
|
||||
addr := fmt.Sprintf("redis://%s", l.Addr().String())
|
||||
|
||||
r := &Redis{
|
||||
Servers: []string{addr},
|
||||
@@ -131,7 +131,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
addr := l.Addr().String()
|
||||
addr := fmt.Sprintf("redis://%s", l.Addr().String())
|
||||
|
||||
r := &Redis{
|
||||
Servers: []string{addr},
|
||||
|
||||
@@ -15,4 +15,4 @@ build() {
|
||||
build "darwin" "amd64"
|
||||
build "linux" "amd64"
|
||||
build "linux" "386"
|
||||
|
||||
build "linux" "arm"
|
||||
|
||||
Reference in New Issue
Block a user