Compare commits
49 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
120218f9c6 | ||
|
|
38ee6adcd2 | ||
|
|
1d8e6473c6 | ||
|
|
494704b479 | ||
|
|
d634b08969 | ||
|
|
350f91601c | ||
|
|
659e1cfe85 | ||
|
|
1943d89147 | ||
|
|
aa822756e7 | ||
|
|
073b1084b7 | ||
|
|
5cbe15b676 | ||
|
|
e2cff9febe | ||
|
|
e9ad786578 | ||
|
|
0692b4be61 | ||
|
|
6550d4f634 | ||
|
|
c523ae2c52 | ||
|
|
5e1ba3fbb7 | ||
|
|
6e8a298d21 | ||
|
|
815e9534b8 | ||
|
|
5390a8ea71 | ||
|
|
e34c52402f | ||
|
|
86a6f337f6 | ||
|
|
a1f7d5549b | ||
|
|
5fbd07b146 | ||
|
|
b8f3c68b89 | ||
|
|
043b171028 | ||
|
|
b86d789abe | ||
|
|
e1c7dc80ae | ||
|
|
1fe0791a74 | ||
|
|
0d87eb4725 | ||
|
|
e2dac56a40 | ||
|
|
039fc80ed7 | ||
|
|
8e90a444c2 | ||
|
|
2ccd828e81 | ||
|
|
480f29bde7 | ||
|
|
11a6db8268 | ||
|
|
6566cc51e3 | ||
|
|
051cd03bbf | ||
|
|
0aa0a40d89 | ||
|
|
9dcbe750d1 | ||
|
|
10bf663a3b | ||
|
|
b71cfb7cfd | ||
|
|
87fedcfa74 | ||
|
|
3b9174a322 | ||
|
|
851fdd439f | ||
|
|
ab78e8efec | ||
|
|
b829febe0d | ||
|
|
39c90dd879 | ||
|
|
3a43042089 |
30
CHANGELOG.md
Normal file
30
CHANGELOG.md
Normal file
@@ -0,0 +1,30 @@
|
||||
## v0.1.3 [2015-07-05]
|
||||
|
||||
### Features
|
||||
- [#35](https://github.com/influxdb/influxdb/pull/35): Add Kafka plugin. Thanks @EmilS!
|
||||
- [#47](https://github.com/influxdb/influxdb/pull/47): Add RethinkDB plugin. Thanks @jipperinbham!
|
||||
|
||||
### Bugfixes
|
||||
- [#45](https://github.com/influxdb/influxdb/pull/45): Skip disk tags that don't have a value. Thanks @jhofeditz!
|
||||
- [#43](https://github.com/influxdb/influxdb/pull/43): Fix bug in MySQL plugin. Thanks @marcosnils!
|
||||
|
||||
## v0.1.2 [2015-07-01]
|
||||
|
||||
### Features
|
||||
- [#12](https://github.com/influxdb/influxdb/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit!
|
||||
- [#14](https://github.com/influxdb/influxdb/pull/14): Clarify the S3 buckets that Telegraf is pushed to.
|
||||
- [#16](https://github.com/influxdb/influxdb/pull/16): Convert Redis to use URI, support Redis AUTH. Thanks @jipperinbham!
|
||||
- [#21](https://github.com/influxdb/influxdb/pull/21): Add memcached plugin. Thanks @Yukki!
|
||||
|
||||
### Bugfixes
|
||||
- [#13](https://github.com/influxdb/influxdb/pull/13): Fix the packaging script.
|
||||
- [#19](https://github.com/influxdb/influxdb/pull/19): Add host name to metric tags. Thanks @sherifzain!
|
||||
- [#20](https://github.com/influxdb/influxdb/pull/20): Fix race condition with accumulator mutex. Thanks @nkatsaros!
|
||||
- [#23](https://github.com/influxdb/influxdb/pull/23): Change name of folder for packages. Thanks @colinrymer!
|
||||
- [#32](https://github.com/influxdb/influxdb/pull/32): Fix spelling of memoory -> memory. Thanks @tylernisonoff!
|
||||
|
||||
## v0.1.1 [2015-06-19]
|
||||
|
||||
### Release Notes
|
||||
|
||||
This is the initial release of Telegraf.
|
||||
111
PLUGINS.md
111
PLUGINS.md
@@ -1,111 +0,0 @@
|
||||
Telegraf is entirely plugin driven. This interface allows for operators to
|
||||
pick and chose what is gathered as well as makes it easy for developers
|
||||
to create new ways of generating metrics.
|
||||
|
||||
Plugin authorship is kept as simple as possible to promote people to develop
|
||||
and submit new plugins.
|
||||
|
||||
## Guidelines
|
||||
|
||||
* A plugin must conform to the `plugins.Plugin` interface.
|
||||
* Telegraf promises to run each plugin's Gather function serially. This means
|
||||
developers don't have to worry about thread safety within these functions.
|
||||
* Each generated metric automatically has the name of the plugin that generated
|
||||
it prepended. This is to keep plugins honest.
|
||||
* Plugins should call `plugins.Add` in their `init` function to register themselves.
|
||||
See below for a quick example.
|
||||
* To be available within Telegraf itself, plugins must add themselves to the `github.com/influxdb/telegraf/plugins/all/all.go` file.
|
||||
* The `SampleConfig` function should return valid toml that describes how the plugin can be configured. This is include in `telegraf -sample-config`.
|
||||
* The `Description` function should say in one line what this plugin does.
|
||||
|
||||
### Plugin interface
|
||||
|
||||
```go
|
||||
type Plugin interface {
|
||||
SampleConfig() string
|
||||
Description() string
|
||||
Gather(Accumulator) error
|
||||
}
|
||||
|
||||
type Accumulator interface {
|
||||
Add(measurement string, value interface{}, tags map[string]string)
|
||||
AddValuesWithTime(measurement string, values map[string]interface{}, tags map[string]string, timestamp time.Time)
|
||||
}
|
||||
```
|
||||
|
||||
### Accumulator
|
||||
|
||||
The way that a plugin emits metrics is by interacting with the Accumulator.
|
||||
|
||||
The `Add` function takes 3 arguments:
|
||||
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
|
||||
* **value**: A value for the metric. This accepts 5 different types of value:
|
||||
* **int**: The most common type. All int types are accepted but favor using `int64`
|
||||
Useful for counters, etc.
|
||||
* **float**: Favor `float64`, useful for gauges, percentages, etc.
|
||||
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
|
||||
* **string**: Typically used to indicate a message, or some kind of freeform information.
|
||||
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
|
||||
* **tags**: This is a map of strings to strings to describe the where or who about the metric. For instance, the `net` plugin adds a tag named `"interface"` set to the name of the network interface, like `"eth0"`.
|
||||
|
||||
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
|
||||
used are the same type profile as **value** above. The **timestamp** argument
|
||||
allows a point to be registered as having occurred at an arbitrary time.
|
||||
|
||||
Let's say you've written a plugin that emits metrics about processes on the current host.
|
||||
|
||||
```go
|
||||
|
||||
type Process struct {
|
||||
CPUTime float64
|
||||
MemoryBytes int64
|
||||
PID int
|
||||
}
|
||||
|
||||
func Gather(acc plugins.Accumulator) error {
|
||||
for _, process := range system.Processes() {
|
||||
tags := map[string]string {
|
||||
"pid": fmt.Sprintf("%d", process.Pid),
|
||||
}
|
||||
|
||||
acc.Add("cpu", process.CPUTime, tags)
|
||||
acc.Add("memoory", process.MemoryBytes, tags)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
```go
|
||||
|
||||
// simple.go
|
||||
|
||||
import "github.com/influxdb/telegraf/plugins"
|
||||
|
||||
type Simple struct {
|
||||
Ok bool
|
||||
}
|
||||
|
||||
func (s *Simple) Description() string {
|
||||
return "a demo plugin"
|
||||
}
|
||||
|
||||
func (s *Simple) SampleConfig() string {
|
||||
return "ok = true # indicate if everything is fine"
|
||||
}
|
||||
|
||||
func (s *Simple) Gather(acc plugins.Accumulator) error {
|
||||
if s.Ok {
|
||||
acc.Add("state", "pretty good", nil)
|
||||
} else {
|
||||
acc.Add("state", "not great", nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("simple", func() plugins.Plugin { &Simple{} })
|
||||
}
|
||||
```
|
||||
|
||||
154
README.md
154
README.md
@@ -1,13 +1,37 @@
|
||||
# Telegraf - A native agent for InfluxDB
|
||||
|
||||
Telegraf is an agent written in Go for collecting metrics from the system it's running on or from other services and writing them into InfluxDB.
|
||||
|
||||
Design goals are to have a minimal memory footprint with a plugin system so that developers in the community can easily add support for collecting metrics from well known services (like Hadoop, or Postgres, or Redis) and third party APIs (like Mailchimp, AWS CloudWatch, or Google Analytics).
|
||||
|
||||
We'll eagerly accept pull requests for new plugins and will manage the set of plugins that Telegraf supports. See the bottom of this doc for instructions on writing new plugins.
|
||||
|
||||
## Quickstart
|
||||
|
||||
* Build from source or download telegraf (binaries forthcoming)
|
||||
* Build from source or download telegraf:
|
||||
|
||||
### Linux packages for Debian/Ubuntu and RHEL/CentOS:
|
||||
|
||||
```
|
||||
http://get.influxdb.org/telegraf/telegraf_0.1.2_amd64.deb
|
||||
http://get.influxdb.org/telegraf/telegraf-0.1.2-1.x86_64.rpm
|
||||
```
|
||||
|
||||
### OSX via Homebrew:
|
||||
|
||||
```
|
||||
brew update
|
||||
brew install telegraf
|
||||
```
|
||||
|
||||
### How to use it:
|
||||
|
||||
* Run `telegraf -sample-config > telegraf.toml` to create an initial configuration
|
||||
* Edit the configuration to match your needs
|
||||
* Run `telegraf -config telegraf.toml -test` to output one full measurement sample to STDOUT
|
||||
* Run `telegraf -config telegraf.toml` to gather and send metrics to InfluxDB
|
||||
|
||||
|
||||
## Telegraf Options
|
||||
|
||||
Telegraf has a few options you can configure under the `agent` section of the config. If you don't see an `agent` section run `telegraf -sample-config > telegraf.toml` to create a valid initial configuration:
|
||||
@@ -16,6 +40,19 @@ Telegraf has a few options you can configure under the `agent` section of the co
|
||||
* **interval**: How ofter to gather metrics. Uses a simple number + unit parser, ie "10s" for 10 seconds or "5m" for 5 minutes.
|
||||
* **debug**: Set to true to gather and send metrics to STDOUT as well as InfluxDB.
|
||||
|
||||
## Supported Plugins
|
||||
|
||||
Telegraf currently has support for collecting metrics from:
|
||||
|
||||
* System (memory, CPU, network, etc.)
|
||||
* Docker
|
||||
* MySQL
|
||||
* Prometheus (client libraries and exporters)
|
||||
* PostgreSQL
|
||||
* Redis
|
||||
|
||||
We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.
|
||||
|
||||
## Plugin Options
|
||||
|
||||
There are 3 configuration options that are configurable per plugin:
|
||||
@@ -23,3 +60,118 @@ There are 3 configuration options that are configurable per plugin:
|
||||
* **pass**: An array of strings that is used to filter metrics generated by the current plugin. Each string in the array is tested as a prefix against metrics and if it matches, the metric is emitted.
|
||||
* **drop**: The inverse of pass, if a metric matches, it is not emitted.
|
||||
* **interval**: How often to gather this metric. Normal plugins use a single global interval, but if one particular plugin should be run less or more often, you can configure that here.
|
||||
|
||||
## Plugins
|
||||
|
||||
This section is for developers that want to create new collection plugins. Telegraf is entirely plugin driven. This interface allows for operators to
|
||||
pick and chose what is gathered as well as makes it easy for developers
|
||||
to create new ways of generating metrics.
|
||||
|
||||
Plugin authorship is kept as simple as possible to promote people to develop
|
||||
and submit new plugins.
|
||||
|
||||
## Guidelines
|
||||
|
||||
* A plugin must conform to the `plugins.Plugin` interface.
|
||||
* Telegraf promises to run each plugin's Gather function serially. This means
|
||||
developers don't have to worry about thread safety within these functions.
|
||||
* Each generated metric automatically has the name of the plugin that generated
|
||||
it prepended. This is to keep plugins honest.
|
||||
* Plugins should call `plugins.Add` in their `init` function to register themselves.
|
||||
See below for a quick example.
|
||||
* To be available within Telegraf itself, plugins must add themselves to the `github.com/influxdb/telegraf/plugins/all/all.go` file.
|
||||
* The `SampleConfig` function should return valid toml that describes how the plugin can be configured. This is include in `telegraf -sample-config`.
|
||||
* The `Description` function should say in one line what this plugin does.
|
||||
|
||||
### Plugin interface
|
||||
|
||||
```go
|
||||
type Plugin interface {
|
||||
SampleConfig() string
|
||||
Description() string
|
||||
Gather(Accumulator) error
|
||||
}
|
||||
|
||||
type Accumulator interface {
|
||||
Add(measurement string, value interface{}, tags map[string]string)
|
||||
AddValuesWithTime(measurement string, values map[string]interface{}, tags map[string]string, timestamp time.Time)
|
||||
}
|
||||
```
|
||||
|
||||
### Accumulator
|
||||
|
||||
The way that a plugin emits metrics is by interacting with the Accumulator.
|
||||
|
||||
The `Add` function takes 3 arguments:
|
||||
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
|
||||
* **value**: A value for the metric. This accepts 5 different types of value:
|
||||
* **int**: The most common type. All int types are accepted but favor using `int64`
|
||||
Useful for counters, etc.
|
||||
* **float**: Favor `float64`, useful for gauges, percentages, etc.
|
||||
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
|
||||
* **string**: Typically used to indicate a message, or some kind of freeform information.
|
||||
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
|
||||
* **tags**: This is a map of strings to strings to describe the where or who about the metric. For instance, the `net` plugin adds a tag named `"interface"` set to the name of the network interface, like `"eth0"`.
|
||||
|
||||
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
|
||||
used are the same type profile as **value** above. The **timestamp** argument
|
||||
allows a point to be registered as having occurred at an arbitrary time.
|
||||
|
||||
Let's say you've written a plugin that emits metrics about processes on the current host.
|
||||
|
||||
```go
|
||||
|
||||
type Process struct {
|
||||
CPUTime float64
|
||||
MemoryBytes int64
|
||||
PID int
|
||||
}
|
||||
|
||||
func Gather(acc plugins.Accumulator) error {
|
||||
for _, process := range system.Processes() {
|
||||
tags := map[string]string {
|
||||
"pid": fmt.Sprintf("%d", process.Pid),
|
||||
}
|
||||
|
||||
acc.Add("cpu", process.CPUTime, tags)
|
||||
acc.Add("memory", process.MemoryBytes, tags)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
```go
|
||||
package simple
|
||||
|
||||
// simple.go
|
||||
|
||||
import "github.com/influxdb/telegraf/plugins"
|
||||
|
||||
type Simple struct {
|
||||
Ok bool
|
||||
}
|
||||
|
||||
func (s *Simple) Description() string {
|
||||
return "a demo plugin"
|
||||
}
|
||||
|
||||
func (s *Simple) SampleConfig() string {
|
||||
return "ok = true # indicate if everything is fine"
|
||||
}
|
||||
|
||||
func (s *Simple) Gather(acc plugins.Accumulator) error {
|
||||
if s.Ok {
|
||||
acc.Add("state", "pretty good", nil)
|
||||
} else {
|
||||
acc.Add("state", "not great", nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("simple", func() plugins.Plugin { return &Simple{} })
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -4,12 +4,15 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
type BatchPoints struct {
|
||||
mu sync.Mutex
|
||||
|
||||
client.BatchPoints
|
||||
|
||||
Debug bool
|
||||
@@ -20,6 +23,9 @@ type BatchPoints struct {
|
||||
}
|
||||
|
||||
func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) {
|
||||
bp.mu.Lock()
|
||||
defer bp.mu.Unlock()
|
||||
|
||||
measurement = bp.Prefix + measurement
|
||||
|
||||
if bp.Config != nil {
|
||||
@@ -55,6 +61,9 @@ func (bp *BatchPoints) AddValuesWithTime(
|
||||
tags map[string]string,
|
||||
timestamp time.Time,
|
||||
) {
|
||||
bp.mu.Lock()
|
||||
defer bp.mu.Unlock()
|
||||
|
||||
measurement = bp.Prefix + measurement
|
||||
|
||||
if bp.Config != nil {
|
||||
|
||||
@@ -17,8 +17,10 @@ var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
|
||||
var fConfig = flag.String("config", "", "configuration file to load")
|
||||
var fVersion = flag.Bool("version", false, "display the version")
|
||||
var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration")
|
||||
var fPidfile = flag.String("pidfile", "", "file to write our pid to")
|
||||
|
||||
var Version = "unreleased"
|
||||
var Commit = ""
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
@@ -104,5 +106,16 @@ func main() {
|
||||
log.Printf("Tags enabled: %v", config.ListTags())
|
||||
}
|
||||
|
||||
if *fPidfile != "" {
|
||||
f, err := os.Create(*fPidfile)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to create pidfile: %s", err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(f, "%d\n", os.Getpid())
|
||||
|
||||
f.Close()
|
||||
}
|
||||
|
||||
ag.Run(shutdown)
|
||||
}
|
||||
|
||||
349
package.sh
Executable file
349
package.sh
Executable file
@@ -0,0 +1,349 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
###########################################################################
|
||||
# Packaging script which creates debian and RPM packages. It optionally
|
||||
# tags the repo with the given version.
|
||||
#
|
||||
# Requirements: GOPATH must be set. 'fpm' must be on the path, and the AWS
|
||||
# CLI tools must also be installed.
|
||||
#
|
||||
# https://github.com/jordansissel/fpm
|
||||
# http://aws.amazon.com/cli/
|
||||
#
|
||||
# Packaging process: to package a build, simple execute:
|
||||
#
|
||||
# package.sh <version>
|
||||
#
|
||||
# where <version> is the desired version. If generation of a debian and RPM
|
||||
# package is successful, the script will offer to tag the repo using the
|
||||
# supplied version string.
|
||||
#
|
||||
# AWS upload: the script will also offer to upload the packages to S3. If
|
||||
# this option is selected, the credentials should be present in the file
|
||||
# ~/aws.conf. The contents should be of the form:
|
||||
#
|
||||
# [default]
|
||||
# aws_access_key_id=<access ID>
|
||||
# aws_secret_access_key=<secret key>
|
||||
# region = us-east-1
|
||||
#
|
||||
# Trim the leading spaces when creating the file. The script will exit if
|
||||
# S3 upload is requested, but this file does not exist.
|
||||
|
||||
AWS_FILE=~/aws.conf
|
||||
|
||||
INSTALL_ROOT_DIR=/opt/telegraf
|
||||
TELEGRAF_LOG_DIR=/var/log/telegraf
|
||||
CONFIG_ROOT_DIR=/etc/opt/telegraf
|
||||
|
||||
SAMPLE_CONFIGURATION=etc/config.sample.toml
|
||||
INITD_SCRIPT=scripts/init.sh
|
||||
|
||||
TMP_WORK_DIR=`mktemp -d`
|
||||
POST_INSTALL_PATH=`mktemp`
|
||||
ARCH=`uname -i`
|
||||
LICENSE=MIT
|
||||
URL=influxdb.com
|
||||
MAINTAINER=support@influxdb.com
|
||||
VENDOR=InfluxDB
|
||||
DESCRIPTION="InfluxDB Telegraf agent"
|
||||
PKG_DEPS=(coreutils)
|
||||
GO_VERSION="go1.4.2"
|
||||
GOPATH_INSTALL=
|
||||
BINS=(
|
||||
telegraf
|
||||
)
|
||||
|
||||
###########################################################################
|
||||
# Helper functions.
|
||||
|
||||
# usage prints simple usage information.
|
||||
usage() {
|
||||
echo -e "$0 [<version>] [-h]\n"
|
||||
cleanup_exit $1
|
||||
}
|
||||
|
||||
# cleanup_exit removes all resources created during the process and exits with
|
||||
# the supplied returned code.
|
||||
cleanup_exit() {
|
||||
rm -r $TMP_WORK_DIR
|
||||
rm $POST_INSTALL_PATH
|
||||
exit $1
|
||||
}
|
||||
|
||||
# check_gopath sanity checks the value of the GOPATH env variable, and determines
|
||||
# the path where build artifacts are installed. GOPATH may be a colon-delimited
|
||||
# list of directories.
|
||||
check_gopath() {
|
||||
[ -z "$GOPATH" ] && echo "GOPATH is not set." && cleanup_exit 1
|
||||
GOPATH_INSTALL=`echo $GOPATH | cut -d ':' -f 1`
|
||||
[ ! -d "$GOPATH_INSTALL" ] && echo "GOPATH_INSTALL is not a directory." && cleanup_exit 1
|
||||
echo "GOPATH ($GOPATH) looks sane, using $GOPATH_INSTALL for installation."
|
||||
}
|
||||
|
||||
check_gvm() {
|
||||
source $HOME/.gvm/scripts/gvm
|
||||
which gvm
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "gvm not found -- aborting."
|
||||
cleanup_exit $1
|
||||
fi
|
||||
gvm use $GO_VERSION
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "gvm cannot find Go version $GO_VERSION -- aborting."
|
||||
cleanup_exit $1
|
||||
fi
|
||||
}
|
||||
|
||||
# check_clean_tree ensures that no source file is locally modified.
|
||||
check_clean_tree() {
|
||||
modified=$(git ls-files --modified | wc -l)
|
||||
if [ $modified -ne 0 ]; then
|
||||
echo "The source tree is not clean -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "Git tree is clean."
|
||||
}
|
||||
|
||||
# update_tree ensures the tree is in-sync with the repo.
|
||||
update_tree() {
|
||||
git pull origin master
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to pull latest code -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
git fetch --tags
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to fetch tags -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "Git tree updated successfully."
|
||||
}
|
||||
|
||||
# check_tag_exists checks if the existing release already exists in the tags.
|
||||
check_tag_exists () {
|
||||
version=$1
|
||||
git tag | grep -q "^v$version$"
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Proposed version $version already exists as a tag -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
# make_dir_tree creates the directory structure within the packages.
|
||||
make_dir_tree() {
|
||||
work_dir=$1
|
||||
version=$2
|
||||
mkdir -p $work_dir/$INSTALL_ROOT_DIR/versions/$version/scripts
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create installation directory -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
mkdir -p $work_dir/$CONFIG_ROOT_DIR
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create configuration directory -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
# do_build builds the code. The version and commit must be passed in.
|
||||
do_build() {
|
||||
version=$1
|
||||
commit=`git rev-parse HEAD`
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Unable to retrieve current commit -- aborting"
|
||||
cleanup_exit 1
|
||||
fi
|
||||
|
||||
for b in ${BINS[*]}; do
|
||||
rm -f $GOPATH_INSTALL/bin/$b
|
||||
done
|
||||
go get -u -f ./...
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "WARNING: failed to 'go get' packages."
|
||||
fi
|
||||
go install -a -ldflags="-X main.Version $version -X main.Commit $commit" ./...
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Build failed, unable to create package -- aborting"
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "Build completed successfully."
|
||||
}
|
||||
|
||||
# generate_postinstall_script creates the post-install script for the
|
||||
# package. It must be passed the version.
|
||||
generate_postinstall_script() {
|
||||
version=$1
|
||||
cat <<EOF >$POST_INSTALL_PATH
|
||||
rm -f $INSTALL_ROOT_DIR/telegraf
|
||||
rm -f $INSTALL_ROOT_DIR/init.sh
|
||||
ln -s $INSTALL_ROOT_DIR/versions/$version/telegraf $INSTALL_ROOT_DIR/telegraf
|
||||
ln -s $INSTALL_ROOT_DIR/versions/$version/scripts/init.sh $INSTALL_ROOT_DIR/init.sh
|
||||
|
||||
rm -f /etc/init.d/telegraf
|
||||
ln -sfn $INSTALL_ROOT_DIR/init.sh /etc/init.d/telegraf
|
||||
chmod +x /etc/init.d/telegraf
|
||||
if which update-rc.d > /dev/null 2>&1 ; then
|
||||
update-rc.d -f telegraf remove
|
||||
update-rc.d telegraf defaults
|
||||
else
|
||||
chkconfig --add telegraf
|
||||
fi
|
||||
|
||||
if ! id telegraf >/dev/null 2>&1; then
|
||||
useradd --system -U -M telegraf
|
||||
fi
|
||||
chown -R -L telegraf:telegraf $INSTALL_ROOT_DIR
|
||||
chmod -R a+rX $INSTALL_ROOT_DIR
|
||||
|
||||
mkdir -p $TELEGRAF_LOG_DIR
|
||||
chown -R -L telegraf:telegraf $TELEGRAF_LOG_DIR
|
||||
EOF
|
||||
echo "Post-install script created successfully at $POST_INSTALL_PATH"
|
||||
}
|
||||
|
||||
###########################################################################
|
||||
# Start the packaging process.
|
||||
|
||||
if [ $# -ne 1 ]; then
|
||||
usage 1
|
||||
elif [ $1 == "-h" ]; then
|
||||
usage 0
|
||||
else
|
||||
VERSION=$1
|
||||
fi
|
||||
|
||||
echo -e "\nStarting package process...\n"
|
||||
|
||||
check_gvm
|
||||
check_gopath
|
||||
check_clean_tree
|
||||
update_tree
|
||||
check_tag_exists $VERSION
|
||||
do_build $VERSION
|
||||
make_dir_tree $TMP_WORK_DIR $VERSION
|
||||
|
||||
###########################################################################
|
||||
# Copy the assets to the installation directories.
|
||||
|
||||
for b in ${BINS[*]}; do
|
||||
cp $GOPATH_INSTALL/bin/$b $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to copy binaries to packaging directory -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
echo "${BINS[*]} copied to $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION"
|
||||
|
||||
cp $INITD_SCRIPT $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION/scripts
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to copy init.d script to packaging directory -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "$INITD_SCRIPT copied to $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION/scripts"
|
||||
|
||||
cp $SAMPLE_CONFIGURATION $TMP_WORK_DIR/$CONFIG_ROOT_DIR/telegraf.conf
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to copy $SAMPLE_CONFIGURATION to packaging directory -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
|
||||
generate_postinstall_script $VERSION
|
||||
|
||||
###########################################################################
|
||||
# Create the actual packages.
|
||||
|
||||
echo -n "Commence creation of $ARCH packages, version $VERSION? [Y/n] "
|
||||
read response
|
||||
response=`echo $response | tr 'A-Z' 'a-z'`
|
||||
if [ "x$response" == "xn" ]; then
|
||||
echo "Packaging aborted."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
|
||||
if [ $ARCH == "i386" ]; then
|
||||
rpm_package=telegraf-$VERSION-1.i686.rpm
|
||||
debian_package=telegraf_${VERSION}_i686.deb
|
||||
deb_args="-a i686"
|
||||
rpm_args="setarch i686"
|
||||
elif [ $ARCH == "arm" ]; then
|
||||
rpm_package=telegraf-$VERSION-1.armel.rpm
|
||||
debian_package=telegraf_${VERSION}_armel.deb
|
||||
else
|
||||
rpm_package=telegraf-$VERSION-1.x86_64.rpm
|
||||
debian_package=telegraf_${VERSION}_amd64.deb
|
||||
fi
|
||||
|
||||
COMMON_FPM_ARGS="-C $TMP_WORK_DIR --vendor $VENDOR --url $URL --license $LICENSE --maintainer $MAINTAINER --after-install $POST_INSTALL_PATH --name telegraf --version $VERSION --config-files $CONFIG_ROOT_DIR ."
|
||||
$rpm_args fpm -s dir -t rpm --description "$DESCRIPTION" $COMMON_FPM_ARGS
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create RPM package -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "RPM package created successfully."
|
||||
|
||||
fpm -s dir -t deb $deb_args --description "$DESCRIPTION" $COMMON_FPM_ARGS
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create Debian package -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
echo "Debian package created successfully."
|
||||
|
||||
###########################################################################
|
||||
# Offer to tag the repo.
|
||||
|
||||
echo -n "Tag source tree with v$VERSION and push to repo? [y/N] "
|
||||
read response
|
||||
response=`echo $response | tr 'A-Z' 'a-z'`
|
||||
if [ "x$response" == "xy" ]; then
|
||||
echo "Creating tag v$VERSION and pushing to repo"
|
||||
git tag v$VERSION
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to create tag v$VERSION -- aborting"
|
||||
cleanup_exit 1
|
||||
fi
|
||||
git push origin v$VERSION
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to push tag v$VERSION to repo -- aborting"
|
||||
cleanup_exit 1
|
||||
fi
|
||||
else
|
||||
echo "Not creating tag v$VERSION."
|
||||
fi
|
||||
|
||||
|
||||
###########################################################################
|
||||
# Offer to publish the packages.
|
||||
|
||||
echo -n "Publish packages to S3? [y/N] "
|
||||
read response
|
||||
response=`echo $response | tr 'A-Z' 'a-z'`
|
||||
if [ "x$response" == "xy" ]; then
|
||||
echo "Publishing packages to S3."
|
||||
if [ ! -e "$AWS_FILE" ]; then
|
||||
echo "$AWS_FILE does not exist -- aborting."
|
||||
cleanup_exit 1
|
||||
fi
|
||||
|
||||
for filepath in `ls *.{deb,rpm}`; do
|
||||
echo "Uploading $filepath to S3"
|
||||
filename=`basename $filepath`
|
||||
echo "Uploading $filename to s3://get.influxdb.org/telegraf/$filename"
|
||||
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath s3://get.influxdb.org/telegraf/$filename --acl public-read --region us-east-1
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Upload failed -- aborting".
|
||||
cleanup_exit 1
|
||||
fi
|
||||
done
|
||||
else
|
||||
echo "Not publishing packages to S3."
|
||||
fi
|
||||
|
||||
###########################################################################
|
||||
# All done.
|
||||
|
||||
echo -e "\nPackaging process complete."
|
||||
cleanup_exit 0
|
||||
@@ -1,8 +1,11 @@
|
||||
package all
|
||||
|
||||
import (
|
||||
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
||||
_ "github.com/influxdb/telegraf/plugins/memcached"
|
||||
_ "github.com/influxdb/telegraf/plugins/mysql"
|
||||
_ "github.com/influxdb/telegraf/plugins/postgresql"
|
||||
_ "github.com/influxdb/telegraf/plugins/prometheus"
|
||||
_ "github.com/influxdb/telegraf/plugins/redis"
|
||||
_ "github.com/influxdb/telegraf/plugins/system"
|
||||
)
|
||||
|
||||
153
plugins/kafka_consumer/kafka_consumer.go
Normal file
153
plugins/kafka_consumer/kafka_consumer.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
"github.com/wvanbergen/kafka/consumergroup"
|
||||
"gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
type Kafka struct {
|
||||
ConsumerGroupName string
|
||||
Topic string
|
||||
ZookeeperPeers []string
|
||||
Consumer *consumergroup.ConsumerGroup
|
||||
BatchSize int
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# topic to consume
|
||||
topic = "topic_with_metrics"
|
||||
|
||||
# the name of the consumer group
|
||||
consumerGroupName = "telegraf_metrics_consumers"
|
||||
|
||||
# an array of Zookeeper connection strings
|
||||
zookeeperPeers = ["localhost:2181"]
|
||||
|
||||
# Batch size of points sent to InfluxDB
|
||||
batchSize = 1000`
|
||||
|
||||
func (k *Kafka) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (k *Kafka) Description() string {
|
||||
return "read metrics from a Kafka topic"
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Measurement string `json:"measurement"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Time time.Time `json:"time"`
|
||||
}
|
||||
|
||||
func (k *Kafka) Gather(acc plugins.Accumulator) error {
|
||||
var consumerErr error
|
||||
metricQueue := make(chan []byte, 200)
|
||||
|
||||
if k.Consumer == nil {
|
||||
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
|
||||
k.ConsumerGroupName,
|
||||
[]string{k.Topic},
|
||||
k.ZookeeperPeers,
|
||||
nil,
|
||||
)
|
||||
|
||||
if consumerErr != nil {
|
||||
return consumerErr
|
||||
}
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
halt := make(chan bool, 1)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
go func() {
|
||||
<-c
|
||||
halt <- true
|
||||
emitMetrics(k, acc, metricQueue)
|
||||
k.Consumer.Close()
|
||||
}()
|
||||
|
||||
go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
|
||||
}
|
||||
|
||||
return emitMetrics(k, acc, metricQueue)
|
||||
}
|
||||
|
||||
func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
|
||||
timeout := time.After(1 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case batch := <-metricConsumer:
|
||||
var points []tsdb.Point
|
||||
var err error
|
||||
if points, err = tsdb.ParsePoints(batch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, point := range points {
|
||||
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
|
||||
}
|
||||
case <-timeout:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const millisecond = 1000000 * time.Nanosecond
|
||||
|
||||
type ack func(*sarama.ConsumerMessage) error
|
||||
|
||||
func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
|
||||
batch := make([]byte, 0)
|
||||
currentBatchSize := 0
|
||||
timeout := time.After(500 * millisecond)
|
||||
var msg *sarama.ConsumerMessage
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg = <-kafkaMsgs:
|
||||
if currentBatchSize != 0 {
|
||||
batch = append(batch, '\n')
|
||||
}
|
||||
|
||||
batch = append(batch, msg.Value...)
|
||||
currentBatchSize++
|
||||
|
||||
if currentBatchSize == maxBatchSize {
|
||||
metricProducer <- batch
|
||||
currentBatchSize = 0
|
||||
batch = make([]byte, 0)
|
||||
ackMsg(msg)
|
||||
}
|
||||
case <-timeout:
|
||||
if currentBatchSize != 0 {
|
||||
metricProducer <- batch
|
||||
currentBatchSize = 0
|
||||
batch = make([]byte, 0)
|
||||
ackMsg(msg)
|
||||
}
|
||||
|
||||
timeout = time.After(500 * millisecond)
|
||||
case <-halt:
|
||||
if currentBatchSize != 0 {
|
||||
metricProducer <- batch
|
||||
ackMsg(msg)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("kafka", func() plugins.Plugin {
|
||||
return &Kafka{}
|
||||
})
|
||||
}
|
||||
62
plugins/kafka_consumer/kafka_consumer_integration_test.go
Normal file
62
plugins/kafka_consumer/kafka_consumer_integration_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReadsMetricsFromKafka(t *testing.T) {
|
||||
var zkPeers, brokerPeers []string
|
||||
|
||||
if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 {
|
||||
zkPeers = []string{"localhost:2181"}
|
||||
} else {
|
||||
zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",")
|
||||
}
|
||||
|
||||
if len(os.Getenv("KAFKA_PEERS")) == 0 {
|
||||
brokerPeers = []string{"localhost:9092"}
|
||||
} else {
|
||||
brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",")
|
||||
}
|
||||
|
||||
k := &Kafka{
|
||||
ConsumerGroupName: "telegraf_test_consumers",
|
||||
Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
|
||||
ZookeeperPeers: zkPeers,
|
||||
}
|
||||
|
||||
msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
|
||||
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
|
||||
require.NoError(t, err)
|
||||
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
|
||||
producer.Close()
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
// Sanity check
|
||||
assert.Equal(t, 0, len(acc.Points), "there should not be any points")
|
||||
|
||||
err = k.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
|
||||
|
||||
point := acc.Points[0]
|
||||
assert.Equal(t, "cpu_load_short", point.Measurement)
|
||||
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
|
||||
assert.Equal(t, map[string]string{
|
||||
"host": "server01",
|
||||
"direction": "in",
|
||||
"region": "us-west",
|
||||
}, point.Tags)
|
||||
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
|
||||
}
|
||||
95
plugins/kafka_consumer/kafka_consumer_test.go
Normal file
95
plugins/kafka_consumer/kafka_consumer_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
|
||||
|
||||
func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) {
|
||||
halt := make(chan bool, 1)
|
||||
metricChan := make(chan []byte, 1)
|
||||
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
kafkaChan <- saramaMsg(testMsg)
|
||||
}
|
||||
|
||||
expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg
|
||||
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
|
||||
batch := <-metricChan
|
||||
assert.Equal(t, expectedBatch, string(batch))
|
||||
|
||||
halt <- true
|
||||
|
||||
return nil
|
||||
}, halt)
|
||||
}
|
||||
|
||||
func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) {
|
||||
halt := make(chan bool, 1)
|
||||
metricChan := make(chan []byte, 1)
|
||||
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
|
||||
for i := 0; i < 3; i++ {
|
||||
kafkaChan <- saramaMsg(testMsg)
|
||||
}
|
||||
|
||||
expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg
|
||||
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
|
||||
batch := <-metricChan
|
||||
assert.Equal(t, expectedBatch, string(batch))
|
||||
|
||||
halt <- true
|
||||
|
||||
return nil
|
||||
}, halt)
|
||||
}
|
||||
|
||||
func TestEmitMetricsSendMetricsToAcc(t *testing.T) {
|
||||
k := &Kafka{}
|
||||
var acc testutil.Accumulator
|
||||
testChan := make(chan []byte, 1)
|
||||
testChan <- []byte(testMsg)
|
||||
|
||||
err := emitMetrics(k, &acc, testChan)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
|
||||
|
||||
point := acc.Points[0]
|
||||
assert.Equal(t, "cpu_load_short", point.Measurement)
|
||||
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
|
||||
assert.Equal(t, map[string]string{
|
||||
"host": "server01",
|
||||
"direction": "in",
|
||||
"region": "us-west",
|
||||
}, point.Tags)
|
||||
|
||||
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
|
||||
}
|
||||
|
||||
func TestEmitMetricsTimesOut(t *testing.T) {
|
||||
k := &Kafka{}
|
||||
var acc testutil.Accumulator
|
||||
testChan := make(chan []byte)
|
||||
|
||||
err := emitMetrics(k, &acc, testChan)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, len(acc.Points), "there should not be a any points")
|
||||
}
|
||||
|
||||
func saramaMsg(val string) *sarama.ConsumerMessage {
|
||||
return &sarama.ConsumerMessage{
|
||||
Key: nil,
|
||||
Value: []byte(val),
|
||||
Offset: 0,
|
||||
Partition: 0,
|
||||
}
|
||||
}
|
||||
134
plugins/memcached/memcached.go
Normal file
134
plugins/memcached/memcached.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package memcached
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
)
|
||||
|
||||
// Memcached is a memcached plugin
|
||||
type Memcached struct {
|
||||
Servers []string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of address to gather stats about. Specify an ip on hostname
|
||||
# with optional port. ie localhost, 10.0.0.1:11211, etc.
|
||||
#
|
||||
# If no servers are specified, then localhost is used as the host.
|
||||
servers = ["localhost"]`
|
||||
|
||||
var defaultTimeout = 5 * time.Second
|
||||
|
||||
// The list of metrics tha should be calculated
|
||||
var sendAsIs = []string{
|
||||
"get_hits",
|
||||
"get_misses",
|
||||
"evictions",
|
||||
"limit_maxbytes",
|
||||
"bytes",
|
||||
}
|
||||
|
||||
// SampleConfig returns sample configuration message
|
||||
func (m *Memcached) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description returns description of Memcached plugin
|
||||
func (m *Memcached) Description() string {
|
||||
return "Read metrics from one or many memcached servers"
|
||||
}
|
||||
|
||||
// Gather reads stats from all configured servers accumulates stats
|
||||
func (m *Memcached) Gather(acc plugins.Accumulator) error {
|
||||
if len(m.Servers) == 0 {
|
||||
return m.gatherServer(":11211", acc)
|
||||
}
|
||||
|
||||
for _, serverAddress := range m.Servers {
|
||||
if err := m.gatherServer(serverAddress, acc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
|
||||
_, _, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
address = address + ":11211"
|
||||
}
|
||||
|
||||
// Connect
|
||||
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Extend connection
|
||||
conn.SetDeadline(time.Now().Add(defaultTimeout))
|
||||
|
||||
// Read and write buffer
|
||||
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
|
||||
// Send command
|
||||
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = rw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read response
|
||||
values := make(map[string]string)
|
||||
|
||||
for {
|
||||
// Read line
|
||||
line, _, errRead := rw.Reader.ReadLine()
|
||||
if errRead != nil {
|
||||
return errRead
|
||||
}
|
||||
// Done
|
||||
if bytes.Equal(line, []byte("END")) {
|
||||
break
|
||||
}
|
||||
// Read values
|
||||
var name, value string
|
||||
n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value)
|
||||
if errScan != nil || n != 2 {
|
||||
return fmt.Errorf("unexpected line in stats response: %q", line)
|
||||
}
|
||||
|
||||
// Save values
|
||||
values[name] = value
|
||||
}
|
||||
|
||||
//
|
||||
tags := map[string]string{"server": address}
|
||||
|
||||
// Process values
|
||||
for _, key := range sendAsIs {
|
||||
if value, ok := values[key]; ok {
|
||||
// Mostly it is the number
|
||||
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
|
||||
acc.Add(key, value, tags)
|
||||
} else {
|
||||
acc.Add(key, iValue, tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("memcached", func() plugins.Plugin {
|
||||
return &Memcached{}
|
||||
})
|
||||
}
|
||||
26
plugins/memcached/memcached_test.go
Normal file
26
plugins/memcached/memcached_test.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package memcached
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMemcachedGeneratesMetrics(t *testing.T) {
|
||||
m := &Memcached{
|
||||
Servers: []string{"localhost"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := m.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
intMetrics := []string{"get_hits", "get_misses", "evictions", "limit_maxbytes", "bytes"}
|
||||
|
||||
for _, metric := range intMetrics {
|
||||
assert.True(t, acc.HasIntValue(metric), metric)
|
||||
}
|
||||
}
|
||||
@@ -91,7 +91,7 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
|
||||
|
||||
rows, err := db.Query(`SHOW /*!50002 GLOBAL */ STATUS`)
|
||||
if err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
|
||||
@@ -39,7 +39,7 @@ func TestMysqlGeneratesMetrics(t *testing.T) {
|
||||
var count int
|
||||
|
||||
for _, p := range acc.Points {
|
||||
if strings.HasPrefix(p.Name, prefix.prefix) {
|
||||
if strings.HasPrefix(p.Measurement, prefix.prefix) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
err := p.accRow(rows, acc)
|
||||
err := p.accRow(rows, acc, serv.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -100,7 +100,7 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
||||
for _, name := range serv.Databases {
|
||||
row := db.QueryRow(`SELECT * FROM pg_stat_database WHERE datname=$1`, name)
|
||||
|
||||
err := p.accRow(row, acc)
|
||||
err := p.accRow(row, acc, serv.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -114,7 +114,7 @@ type scanner interface {
|
||||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
|
||||
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, server string) error {
|
||||
var ignore interface{}
|
||||
var name string
|
||||
var commit, rollback, read, hit int64
|
||||
@@ -135,7 +135,7 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"db": name}
|
||||
tags := map[string]string{"server": server, "db": name}
|
||||
|
||||
acc.Add("xact_commit", commit, tags)
|
||||
acc.Add("xact_rollback", rollback, tags)
|
||||
|
||||
@@ -91,7 +91,7 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) {
|
||||
var found bool
|
||||
|
||||
for _, pnt := range acc.Points {
|
||||
if pnt.Name == "xact_commit" {
|
||||
if pnt.Measurement == "xact_commit" {
|
||||
if pnt.Tags["db"] == "postgres" {
|
||||
found = true
|
||||
break
|
||||
|
||||
105
plugins/prometheus/prometheus.go
Normal file
105
plugins/prometheus/prometheus.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
"github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
type Prometheus struct {
|
||||
Urls []string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of urls to scrape metrics from.
|
||||
urls = ["http://localhost:9100/metrics"]`
|
||||
|
||||
func (r *Prometheus) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (r *Prometheus) Description() string {
|
||||
return "Read metrics from one or many prometheus clients"
|
||||
}
|
||||
|
||||
var ErrProtocolError = errors.New("prometheus protocol error")
|
||||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *Prometheus) Gather(acc plugins.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
var outerr error
|
||||
|
||||
for _, serv := range g.Urls {
|
||||
wg.Add(1)
|
||||
go func(serv string) {
|
||||
defer wg.Done()
|
||||
outerr = g.gatherURL(serv, acc)
|
||||
}(serv)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return outerr
|
||||
}
|
||||
|
||||
func (g *Prometheus) gatherURL(url string, acc plugins.Accumulator) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
|
||||
}
|
||||
processor, err := extraction.ProcessorForRequestHeader(resp.Header)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting extractor for %s: %s", url, err)
|
||||
}
|
||||
|
||||
ingestor := &Ingester{
|
||||
acc: acc,
|
||||
}
|
||||
|
||||
options := &extraction.ProcessOptions{
|
||||
Timestamp: model.TimestampFromTime(time.Now()),
|
||||
}
|
||||
|
||||
err = processor.ProcessSingle(resp.Body, ingestor, options)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting processing samples for %s: %s", url, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Ingester struct {
|
||||
acc plugins.Accumulator
|
||||
}
|
||||
|
||||
// Ingest implements an extraction.Ingester.
|
||||
func (i *Ingester) Ingest(samples model.Samples) error {
|
||||
for _, sample := range samples {
|
||||
tags := map[string]string{}
|
||||
for key, value := range sample.Metric {
|
||||
if key == model.MetricNameLabel {
|
||||
continue
|
||||
}
|
||||
tags[string(key)] = string(value)
|
||||
}
|
||||
i.acc.Add(string(sample.Metric[model.MetricNameLabel]), float64(sample.Value), tags)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("prometheus", func() plugins.Plugin {
|
||||
return &Prometheus{}
|
||||
})
|
||||
}
|
||||
56
plugins/prometheus/prometheus_test.go
Normal file
56
plugins/prometheus/prometheus_test.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
||||
const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
|
||||
# TYPE go_gc_duration_seconds summary
|
||||
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
|
||||
go_gc_duration_seconds{quantile="0.25"} 0.000139108
|
||||
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
|
||||
go_gc_duration_seconds{quantile="0.75"} 0.000331463
|
||||
go_gc_duration_seconds{quantile="1"} 0.000667154
|
||||
go_gc_duration_seconds_sum 0.0018183950000000002
|
||||
go_gc_duration_seconds_count 7
|
||||
# HELP go_goroutines Number of goroutines that currently exist.
|
||||
# TYPE go_goroutines gauge
|
||||
go_goroutines 15
|
||||
`
|
||||
|
||||
func TestPrometheusGeneratesMetrics(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintln(w, sampleTextFormat)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
p := &Prometheus{
|
||||
Urls: []string{ts.URL},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := p.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := []struct {
|
||||
name string
|
||||
value float64
|
||||
tags map[string]string
|
||||
}{
|
||||
{"go_gc_duration_seconds_count", 7, map[string]string{}},
|
||||
{"go_goroutines", 15, map[string]string{}},
|
||||
}
|
||||
|
||||
for _, e := range expected {
|
||||
assert.NoError(t, acc.ValidateValue(e.name, e.value))
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -20,8 +21,9 @@ type Redis struct {
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of address to gather stats about. Specify an ip on hostname
|
||||
# with optional port. ie localhost, 10.10.3.33:18832, etc.
|
||||
# An array of URI to gather stats about. Specify an ip or hostname
|
||||
# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
|
||||
# 10.0.0.1:10000, etc.
|
||||
#
|
||||
# If no servers are specified, then localhost is used as the host.
|
||||
servers = ["localhost"]`
|
||||
@@ -73,7 +75,10 @@ var ErrProtocolError = errors.New("redis protocol error")
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *Redis) Gather(acc plugins.Accumulator) error {
|
||||
if len(g.Servers) == 0 {
|
||||
g.gatherServer(":6379", acc)
|
||||
url := &url.URL{
|
||||
Host: ":6379",
|
||||
}
|
||||
g.gatherServer(url, acc)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -82,10 +87,19 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
|
||||
var outerr error
|
||||
|
||||
for _, serv := range g.Servers {
|
||||
u, err := url.Parse(serv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
|
||||
} else if u.Scheme == "" {
|
||||
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||
u.Scheme = "tcp"
|
||||
u.Host = serv
|
||||
u.Path = ""
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(serv string) {
|
||||
defer wg.Done()
|
||||
outerr = g.gatherServer(serv, acc)
|
||||
outerr = g.gatherServer(u, acc)
|
||||
}(serv)
|
||||
}
|
||||
|
||||
@@ -96,17 +110,34 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
|
||||
|
||||
const defaultPort = "6379"
|
||||
|
||||
func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
|
||||
func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
||||
if g.c == nil {
|
||||
|
||||
_, _, err := net.SplitHostPort(addr)
|
||||
_, _, err := net.SplitHostPort(addr.Host)
|
||||
if err != nil {
|
||||
addr = addr + ":" + defaultPort
|
||||
addr.Host = addr.Host + ":" + defaultPort
|
||||
}
|
||||
|
||||
c, err := net.Dial("tcp", addr)
|
||||
c, err := net.Dial("tcp", addr.Host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr, err)
|
||||
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
|
||||
}
|
||||
|
||||
if addr.User != nil {
|
||||
pwd, set := addr.User.Password()
|
||||
if set && pwd != "" {
|
||||
c.Write([]byte(fmt.Sprintf("AUTH %s\n", pwd)))
|
||||
|
||||
r := bufio.NewReader(c)
|
||||
|
||||
line, err := r.ReadString('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if line[0] != '+' {
|
||||
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g.c = c
|
||||
@@ -157,11 +188,12 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
|
||||
continue
|
||||
}
|
||||
|
||||
tags := map[string]string{"host": addr.String()}
|
||||
val := strings.TrimSpace(parts[1])
|
||||
|
||||
ival, err := strconv.ParseUint(val, 10, 64)
|
||||
if err == nil {
|
||||
acc.Add(metric, ival, nil)
|
||||
acc.Add(metric, ival, tags)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -170,7 +202,7 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
|
||||
return err
|
||||
}
|
||||
|
||||
acc.Add(metric, fval, nil)
|
||||
acc.Add(metric, fval, tags)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestRedisGeneratesMetrics(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
addr := l.Addr().String()
|
||||
addr := fmt.Sprintf("redis://%s", l.Addr().String())
|
||||
|
||||
r := &Redis{
|
||||
Servers: []string{addr},
|
||||
@@ -131,7 +131,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
addr := l.Addr().String()
|
||||
addr := fmt.Sprintf("redis://%s", l.Addr().String())
|
||||
|
||||
r := &Redis{
|
||||
Servers: []string{addr},
|
||||
|
||||
92
plugins/rethinkdb/rethinkdb.go
Normal file
92
plugins/rethinkdb/rethinkdb.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package rethinkdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
|
||||
"gopkg.in/dancannon/gorethink.v1"
|
||||
)
|
||||
|
||||
type RethinkDB struct {
|
||||
Servers []string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of URI to gather stats about. Specify an ip or hostname
|
||||
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
|
||||
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
|
||||
#
|
||||
# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
|
||||
servers = ["127.0.0.1:28015"]`
|
||||
|
||||
func (r *RethinkDB) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (r *RethinkDB) Description() string {
|
||||
return "Read metrics from one or many RethinkDB servers"
|
||||
}
|
||||
|
||||
var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}}
|
||||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (r *RethinkDB) Gather(acc plugins.Accumulator) error {
|
||||
if len(r.Servers) == 0 {
|
||||
r.gatherServer(localhost, acc)
|
||||
return nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
var outerr error
|
||||
|
||||
for _, serv := range r.Servers {
|
||||
u, err := url.Parse(serv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
|
||||
} else if u.Scheme == "" {
|
||||
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||
u.Host = serv
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(serv string) {
|
||||
defer wg.Done()
|
||||
outerr = r.gatherServer(&Server{Url: u}, acc)
|
||||
}(serv)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return outerr
|
||||
}
|
||||
|
||||
func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error {
|
||||
var err error
|
||||
connectOpts := gorethink.ConnectOpts{
|
||||
Address: server.Url.Host,
|
||||
DiscoverHosts: false,
|
||||
}
|
||||
if server.Url.User != nil {
|
||||
pwd, set := server.Url.User.Password()
|
||||
if set && pwd != "" {
|
||||
connectOpts.AuthKey = pwd
|
||||
}
|
||||
}
|
||||
server.session, err = gorethink.Connect(connectOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error())
|
||||
}
|
||||
defer server.session.Close()
|
||||
|
||||
return server.gatherData(acc)
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("rethinkdb", func() plugins.Plugin {
|
||||
return &RethinkDB{}
|
||||
})
|
||||
}
|
||||
110
plugins/rethinkdb/rethinkdb_data.go
Normal file
110
plugins/rethinkdb/rethinkdb_data.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package rethinkdb
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
)
|
||||
|
||||
type serverStatus struct {
|
||||
Id string `gorethink:"id"`
|
||||
Network struct {
|
||||
Addresses []Address `gorethink:"canonical_addresses"`
|
||||
Hostname string `gorethink:"hostname"`
|
||||
DriverPort int `gorethink:"reql_port"`
|
||||
} `gorethink:"network"`
|
||||
Process struct {
|
||||
Version string `gorethink:"version"`
|
||||
RunningSince time.Time `gorethink:"time_started"`
|
||||
} `gorethink:"process"`
|
||||
}
|
||||
|
||||
type Address struct {
|
||||
Host string `gorethink:"host"`
|
||||
Port int `gorethink:"port"`
|
||||
}
|
||||
|
||||
type stats struct {
|
||||
Engine Engine `gorethink:"query_engine"`
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
ClientConns int64 `gorethink:"client_connections,omitempty"`
|
||||
ClientActive int64 `gorethink:"clients_active,omitempty"`
|
||||
QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"`
|
||||
TotalQueries int64 `gorethink:"queries_total,omitempty"`
|
||||
ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"`
|
||||
TotalReads int64 `gorethink:"read_docs_total,omitempty"`
|
||||
WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"`
|
||||
TotalWrites int64 `gorethink:"written_docs_total,omitempty"`
|
||||
}
|
||||
|
||||
type tableStatus struct {
|
||||
Id string `gorethink:"id"`
|
||||
DB string `gorethink:"db"`
|
||||
Name string `gorethink:"name"`
|
||||
}
|
||||
|
||||
type tableStats struct {
|
||||
Engine Engine `gorethink:"query_engine"`
|
||||
Storage Storage `gorethink:"storage_engine"`
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
Cache Cache `gorethink:"cache"`
|
||||
Disk Disk `gorethink:"disk"`
|
||||
}
|
||||
|
||||
type Cache struct {
|
||||
BytesInUse int64 `gorethink:"in_use_bytes"`
|
||||
}
|
||||
|
||||
type Disk struct {
|
||||
ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"`
|
||||
ReadBytesTotal int64 `gorethink:"read_bytes_total"`
|
||||
WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"`
|
||||
WriteBytesTotal int64 `gorethink:"written_bytes_total"`
|
||||
SpaceUsage SpaceUsage `gorethink:"space_usage"`
|
||||
}
|
||||
|
||||
type SpaceUsage struct {
|
||||
Data int64 `gorethink:"data_bytes"`
|
||||
Garbage int64 `gorethink:"garbage_bytes"`
|
||||
Metadata int64 `gorethink:"metadata_bytes"`
|
||||
Prealloc int64 `gorethink:"preallocated_bytes"`
|
||||
}
|
||||
|
||||
var engineStats = map[string]string{
|
||||
"active_clients": "ClientActive",
|
||||
"clients": "ClientConns",
|
||||
"queries_per_sec": "QueriesPerSec",
|
||||
"total_queries": "TotalQueries",
|
||||
"read_docs_per_sec": "ReadsPerSec",
|
||||
"total_reads": "TotalReads",
|
||||
"written_docs_per_sec": "WritesPerSec",
|
||||
"total_writes": "TotalWrites",
|
||||
}
|
||||
|
||||
func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) {
|
||||
engine := reflect.ValueOf(e).Elem()
|
||||
for _, key := range keys {
|
||||
acc.Add(
|
||||
key,
|
||||
engine.FieldByName(engineStats[key]).Interface(),
|
||||
tags,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) {
|
||||
acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags)
|
||||
acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags)
|
||||
acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags)
|
||||
acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags)
|
||||
acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags)
|
||||
acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags)
|
||||
acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags)
|
||||
acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags)
|
||||
acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags)
|
||||
}
|
||||
112
plugins/rethinkdb/rethinkdb_data_test.go
Normal file
112
plugins/rethinkdb/rethinkdb_data_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package rethinkdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var tags = make(map[string]string)
|
||||
|
||||
func TestAddEngineStats(t *testing.T) {
|
||||
engine := &Engine{
|
||||
ClientConns: 0,
|
||||
ClientActive: 0,
|
||||
QueriesPerSec: 0,
|
||||
TotalQueries: 0,
|
||||
ReadsPerSec: 0,
|
||||
TotalReads: 0,
|
||||
WritesPerSec: 0,
|
||||
TotalWrites: 0,
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
keys := []string{
|
||||
"active_clients",
|
||||
"clients",
|
||||
"queries_per_sec",
|
||||
"total_queries",
|
||||
"read_docs_per_sec",
|
||||
"total_reads",
|
||||
"written_docs_per_sec",
|
||||
"total_writes",
|
||||
}
|
||||
engine.AddEngineStats(keys, &acc, tags)
|
||||
|
||||
for _, metric := range keys {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddEngineStatsPartial(t *testing.T) {
|
||||
engine := &Engine{
|
||||
ClientConns: 0,
|
||||
ClientActive: 0,
|
||||
QueriesPerSec: 0,
|
||||
ReadsPerSec: 0,
|
||||
WritesPerSec: 0,
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
keys := []string{
|
||||
"active_clients",
|
||||
"clients",
|
||||
"queries_per_sec",
|
||||
"read_docs_per_sec",
|
||||
"written_docs_per_sec",
|
||||
}
|
||||
|
||||
missing_keys := []string{
|
||||
"total_queries",
|
||||
"total_reads",
|
||||
"total_writes",
|
||||
}
|
||||
engine.AddEngineStats(keys, &acc, tags)
|
||||
|
||||
for _, metric := range missing_keys {
|
||||
assert.False(t, acc.HasIntValue(metric))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddStorageStats(t *testing.T) {
|
||||
storage := &Storage{
|
||||
Cache: Cache{
|
||||
BytesInUse: 0,
|
||||
},
|
||||
Disk: Disk{
|
||||
ReadBytesPerSec: 0,
|
||||
ReadBytesTotal: 0,
|
||||
WriteBytesPerSec: 0,
|
||||
WriteBytesTotal: 0,
|
||||
SpaceUsage: SpaceUsage{
|
||||
Data: 0,
|
||||
Garbage: 0,
|
||||
Metadata: 0,
|
||||
Prealloc: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
keys := []string{
|
||||
"cache_bytes_in_use",
|
||||
"disk_read_bytes_per_sec",
|
||||
"disk_read_bytes_total",
|
||||
"disk_written_bytes_per_sec",
|
||||
"disk_written_bytes_total",
|
||||
"disk_usage_data_bytes",
|
||||
"disk_usage_garbage_bytes",
|
||||
"disk_usage_metadata_bytes",
|
||||
"disk_usage_preallocated_bytes",
|
||||
}
|
||||
|
||||
storage.AddStats(&acc, tags)
|
||||
|
||||
for _, metric := range keys {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
}
|
||||
193
plugins/rethinkdb/rethinkdb_server.go
Normal file
193
plugins/rethinkdb/rethinkdb_server.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package rethinkdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
|
||||
"gopkg.in/dancannon/gorethink.v1"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
Url *url.URL
|
||||
session *gorethink.Session
|
||||
serverStatus serverStatus
|
||||
}
|
||||
|
||||
func (s *Server) gatherData(acc plugins.Accumulator) error {
|
||||
if err := s.getServerStatus(); err != nil {
|
||||
return fmt.Errorf("Failed to get server_status, %s\n", err)
|
||||
}
|
||||
|
||||
if err := s.validateVersion(); err != nil {
|
||||
return fmt.Errorf("Failed version validation, %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.addClusterStats(acc); err != nil {
|
||||
fmt.Printf("error adding cluster stats, %s\n", err.Error())
|
||||
return fmt.Errorf("Error adding cluster stats, %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.addMemberStats(acc); err != nil {
|
||||
return fmt.Errorf("Error adding member stats, %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.addTableStats(acc); err != nil {
|
||||
return fmt.Errorf("Error adding table stats, %s\n", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) validateVersion() error {
|
||||
if s.serverStatus.Process.Version == "" {
|
||||
return errors.New("could not determine the RethinkDB server version: process.version key missing")
|
||||
}
|
||||
|
||||
versionRegexp := regexp.MustCompile("\\d.\\d.\\d")
|
||||
versionString := versionRegexp.FindString(s.serverStatus.Process.Version)
|
||||
if versionString == "" {
|
||||
return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version)
|
||||
}
|
||||
|
||||
majorVersion, err := strconv.Atoi(strings.Split(versionString, "")[0])
|
||||
if err != nil || majorVersion < 2 {
|
||||
return fmt.Errorf("unsupported major version %s\n", versionString)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) getServerStatus() error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cursor.IsNil() {
|
||||
return errors.New("could not determine the RethinkDB server version: no rows returned from the server_status table")
|
||||
}
|
||||
defer cursor.Close()
|
||||
var serverStatuses []serverStatus
|
||||
err = cursor.All(&serverStatuses)
|
||||
if err != nil {
|
||||
return errors.New("could not parse server_status results")
|
||||
}
|
||||
host, port, err := net.SplitHostPort(s.Url.Host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine provided hostname from %s\n", s.Url.Host)
|
||||
}
|
||||
driverPort, _ := strconv.Atoi(port)
|
||||
for _, ss := range serverStatuses {
|
||||
for _, address := range ss.Network.Addresses {
|
||||
if address.Host == host && ss.Network.DriverPort == driverPort {
|
||||
s.serverStatus = ss
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("unable to determine host id from server_status with %s", s.Url.Host)
|
||||
}
|
||||
|
||||
func (s *Server) getDefaultTags() map[string]string {
|
||||
tags := make(map[string]string)
|
||||
tags["host"] = s.Url.Host
|
||||
tags["hostname"] = s.serverStatus.Network.Hostname
|
||||
return tags
|
||||
}
|
||||
|
||||
var ClusterTracking = []string{
|
||||
"active_clients",
|
||||
"clients",
|
||||
"queries_per_sec",
|
||||
"read_docs_per_sec",
|
||||
"written_docs_per_sec",
|
||||
}
|
||||
|
||||
func (s *Server) addClusterStats(acc plugins.Accumulator) error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster stats query error, %s\n", err.Error())
|
||||
}
|
||||
defer cursor.Close()
|
||||
var clusterStats stats
|
||||
if err := cursor.One(&clusterStats); err != nil {
|
||||
return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error())
|
||||
}
|
||||
|
||||
tags := s.getDefaultTags()
|
||||
tags["type"] = "cluster"
|
||||
clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
var MemberTracking = []string{
|
||||
"active_clients",
|
||||
"clients",
|
||||
"queries_per_sec",
|
||||
"total_queries",
|
||||
"read_docs_per_sec",
|
||||
"total_reads",
|
||||
"written_docs_per_sec",
|
||||
"total_writes",
|
||||
}
|
||||
|
||||
func (s *Server) addMemberStats(acc plugins.Accumulator) error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("member stats query error, %s\n", err.Error())
|
||||
}
|
||||
defer cursor.Close()
|
||||
var memberStats stats
|
||||
if err := cursor.One(&memberStats); err != nil {
|
||||
return fmt.Errorf("failure to parse member stats, $s\n", err.Error())
|
||||
}
|
||||
|
||||
tags := s.getDefaultTags()
|
||||
tags["type"] = "member"
|
||||
memberStats.Engine.AddEngineStats(MemberTracking, acc, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
var TableTracking = []string{
|
||||
"read_docs_per_sec",
|
||||
"total_reads",
|
||||
"written_docs_per_sec",
|
||||
"total_writes",
|
||||
}
|
||||
|
||||
func (s *Server) addTableStats(acc plugins.Accumulator) error {
|
||||
tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session)
|
||||
defer tablesCursor.Close()
|
||||
var tables []tableStatus
|
||||
err = tablesCursor.All(&tables)
|
||||
if err != nil {
|
||||
return errors.New("could not parse table_status results")
|
||||
}
|
||||
for _, table := range tables {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").
|
||||
Get([]string{"table_server", table.Id, s.serverStatus.Id}).
|
||||
Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("table stats query error, %s\n", err.Error())
|
||||
}
|
||||
defer cursor.Close()
|
||||
var ts tableStats
|
||||
if err := cursor.One(&ts); err != nil {
|
||||
return fmt.Errorf("failure to parse table stats, %s\n", err.Error())
|
||||
}
|
||||
|
||||
tags := s.getDefaultTags()
|
||||
tags["type"] = "data"
|
||||
tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name)
|
||||
ts.Engine.AddEngineStats(TableTracking, acc, tags)
|
||||
ts.Storage.AddStats(acc, tags)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
81
plugins/rethinkdb/rethinkdb_server_test.go
Normal file
81
plugins/rethinkdb/rethinkdb_server_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
// +build integration
|
||||
|
||||
package rethinkdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestValidateVersion(t *testing.T) {
|
||||
err := server.validateVersion()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestGetDefaultTags(t *testing.T) {
|
||||
var tagTests = []struct {
|
||||
in string
|
||||
out string
|
||||
}{
|
||||
{"host", server.Url.Host},
|
||||
{"hostname", server.serverStatus.Network.Hostname},
|
||||
}
|
||||
defaultTags := server.getDefaultTags()
|
||||
for _, tt := range tagTests {
|
||||
if defaultTags[tt.in] != tt.out {
|
||||
t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddClusterStats(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := server.addClusterStats(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, metric := range ClusterTracking {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMemberStats(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := server.addMemberStats(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, metric := range MemberTracking {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddTableStats(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := server.addTableStats(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, metric := range TableTracking {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
|
||||
keys := []string{
|
||||
"cache_bytes_in_use",
|
||||
"disk_read_bytes_per_sec",
|
||||
"disk_read_bytes_total",
|
||||
"disk_written_bytes_per_sec",
|
||||
"disk_written_bytes_total",
|
||||
"disk_usage_data_bytes",
|
||||
"disk_usage_garbage_bytes",
|
||||
"disk_usage_metadata_bytes",
|
||||
"disk_usage_preallocated_bytes",
|
||||
}
|
||||
|
||||
for _, metric := range keys {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
}
|
||||
59
plugins/rethinkdb/rethinkdb_test.go
Normal file
59
plugins/rethinkdb/rethinkdb_test.go
Normal file
@@ -0,0 +1,59 @@
|
||||
// +build integration
|
||||
|
||||
package rethinkdb
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gopkg.in/dancannon/gorethink.v1"
|
||||
)
|
||||
|
||||
var connect_url, authKey string
|
||||
var server *Server
|
||||
|
||||
func init() {
|
||||
connect_url = os.Getenv("RETHINKDB_URL")
|
||||
if connect_url == "" {
|
||||
connect_url = "127.0.0.1:28015"
|
||||
}
|
||||
authKey = os.Getenv("RETHINKDB_AUTHKEY")
|
||||
|
||||
}
|
||||
|
||||
func testSetup(m *testing.M) {
|
||||
var err error
|
||||
server = &Server{Url: &url.URL{Host: connect_url}}
|
||||
server.session, _ = gorethink.Connect(gorethink.ConnectOpts{
|
||||
Address: server.Url.Host,
|
||||
AuthKey: authKey,
|
||||
DiscoverHosts: false,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalln(err.Error())
|
||||
}
|
||||
|
||||
err = server.getServerStatus()
|
||||
if err != nil {
|
||||
log.Fatalln(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func testTeardown(m *testing.M) {
|
||||
server.session.Close()
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// seed randomness for use with tests
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
|
||||
testSetup(m)
|
||||
res := m.Run()
|
||||
testTeardown(m)
|
||||
|
||||
os.Exit(res)
|
||||
}
|
||||
@@ -55,9 +55,12 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
|
||||
}
|
||||
|
||||
for _, io := range diskio {
|
||||
tags := map[string]string{
|
||||
"name": io.Name,
|
||||
"serial": io.SerialNumber,
|
||||
tags := map[string]string{}
|
||||
if len(io.Name) != 0 {
|
||||
tags["name"] = io.Name
|
||||
}
|
||||
if len(io.SerialNumber) != 0 {
|
||||
tags["serial"] = io.SerialNumber
|
||||
}
|
||||
|
||||
acc.Add("reads", io.ReadCount, tags)
|
||||
|
||||
@@ -272,7 +272,9 @@ func TestSystemStats_GenerateStats(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
dockertags := map[string]string{
|
||||
"id": "blah",
|
||||
"name": "blah",
|
||||
"id": "",
|
||||
"command": "",
|
||||
}
|
||||
|
||||
assert.True(t, acc.CheckTaggedValue("user", 3.1, dockertags))
|
||||
|
||||
@@ -15,4 +15,4 @@ build() {
|
||||
build "darwin" "amd64"
|
||||
build "linux" "amd64"
|
||||
build "linux" "386"
|
||||
|
||||
build "linux" "arm"
|
||||
|
||||
186
scripts/init.sh
Executable file
186
scripts/init.sh
Executable file
@@ -0,0 +1,186 @@
|
||||
#! /usr/bin/env bash
|
||||
|
||||
### BEGIN INIT INFO
|
||||
# Provides: telegraf
|
||||
# Required-Start: $all
|
||||
# Required-Stop: $remote_fs $syslog
|
||||
# Default-Start: 2 3 4 5
|
||||
# Default-Stop: 0 1 6
|
||||
# Short-Description: Start telegraf at boot time
|
||||
### END INIT INFO
|
||||
|
||||
# this init script supports three different variations:
|
||||
# 1. New lsb that define start-stop-daemon
|
||||
# 2. Old lsb that don't have start-stop-daemon but define, log, pidofproc and killproc
|
||||
# 3. Centos installations without lsb-core installed
|
||||
#
|
||||
# In the third case we have to define our own functions which are very dumb
|
||||
# and expect the args to be positioned correctly.
|
||||
|
||||
# Command-line options that can be set in /etc/default/telegraf. These will override
|
||||
# any config file values.
|
||||
TELEGRAF_OPTS=
|
||||
|
||||
USER=telegraf
|
||||
GROUP=telegraf
|
||||
|
||||
if [ -r /lib/lsb/init-functions ]; then
|
||||
source /lib/lsb/init-functions
|
||||
fi
|
||||
|
||||
DEFAULT=/etc/default/telegraf
|
||||
|
||||
if [ -r $DEFAULT ]; then
|
||||
source $DEFAULT
|
||||
fi
|
||||
|
||||
if [ -z "$STDOUT" ]; then
|
||||
STDOUT=/dev/null
|
||||
fi
|
||||
if [ ! -f "$STDOUT" ]; then
|
||||
mkdir -p `dirname $STDOUT`
|
||||
fi
|
||||
|
||||
if [ -z "$STDERR" ]; then
|
||||
STDERR=/var/log/influxdb/telegraf.log
|
||||
fi
|
||||
if [ ! -f "$STDERR" ]; then
|
||||
mkdir -p `dirname $STDERR`
|
||||
fi
|
||||
|
||||
|
||||
OPEN_FILE_LIMIT=65536
|
||||
|
||||
function pidofproc() {
|
||||
if [ $# -ne 3 ]; then
|
||||
echo "Expected three arguments, e.g. $0 -p pidfile daemon-name"
|
||||
fi
|
||||
|
||||
pid=`pgrep -f $3`
|
||||
local pidfile=`cat $2`
|
||||
|
||||
if [ "x$pidfile" == "x" ]; then
|
||||
return 1
|
||||
fi
|
||||
|
||||
if [ "x$pid" != "x" -a "$pidfile" == "$pid" ]; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
function killproc() {
|
||||
if [ $# -ne 3 ]; then
|
||||
echo "Expected three arguments, e.g. $0 -p pidfile signal"
|
||||
fi
|
||||
|
||||
pid=`cat $2`
|
||||
|
||||
kill -s $3 $pid
|
||||
}
|
||||
|
||||
function log_failure_msg() {
|
||||
echo "$@" "[ FAILED ]"
|
||||
}
|
||||
|
||||
function log_success_msg() {
|
||||
echo "$@" "[ OK ]"
|
||||
}
|
||||
|
||||
# Process name ( For display )
|
||||
name=telegraf
|
||||
|
||||
# Daemon name, where is the actual executable
|
||||
daemon=/opt/influxdb/telegraf
|
||||
|
||||
# pid file for the daemon
|
||||
pidfile=/var/run/influxdb/telegraf.pid
|
||||
piddir=`dirname $pidfile`
|
||||
|
||||
if [ ! -d "$piddir" ]; then
|
||||
mkdir -p $piddir
|
||||
chown $GROUP:$USER $piddir
|
||||
fi
|
||||
|
||||
# Configuration file
|
||||
config=/etc/opt/influxdb/telegraf.conf
|
||||
|
||||
# If the daemon is not there, then exit.
|
||||
[ -x $daemon ] || exit 5
|
||||
|
||||
case $1 in
|
||||
start)
|
||||
# Checked the PID file exists and check the actual status of process
|
||||
if [ -e $pidfile ]; then
|
||||
pidofproc -p $pidfile $daemon > /dev/null 2>&1 && status="0" || status="$?"
|
||||
# If the status is SUCCESS then don't need to start again.
|
||||
if [ "x$status" = "x0" ]; then
|
||||
log_failure_msg "$name process is running"
|
||||
exit 0 # Exit
|
||||
fi
|
||||
fi
|
||||
|
||||
# Bump the file limits, before launching the daemon. These will carry over to
|
||||
# launched processes.
|
||||
ulimit -n $OPEN_FILE_LIMIT
|
||||
if [ $? -ne 0 ]; then
|
||||
log_failure_msg "set open file limit to $OPEN_FILE_LIMIT"
|
||||
fi
|
||||
|
||||
log_success_msg "Starting the process" "$name"
|
||||
if which start-stop-daemon > /dev/null 2>&1; then
|
||||
start-stop-daemon --chuid $GROUP:$USER --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
|
||||
else
|
||||
nohup $daemon -pidfile $pidfile -config $config $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
|
||||
fi
|
||||
log_success_msg "$name process was started"
|
||||
;;
|
||||
|
||||
stop)
|
||||
# Stop the daemon.
|
||||
if [ -e $pidfile ]; then
|
||||
pidofproc -p $pidfile $daemon > /dev/null 2>&1 && status="0" || status="$?"
|
||||
if [ "$status" = 0 ]; then
|
||||
if killproc -p $pidfile SIGTERM && /bin/rm -rf $pidfile; then
|
||||
log_success_msg "$name process was stopped"
|
||||
else
|
||||
log_failure_msg "$name failed to stop service"
|
||||
fi
|
||||
fi
|
||||
else
|
||||
log_failure_msg "$name process is not running"
|
||||
fi
|
||||
;;
|
||||
|
||||
restart)
|
||||
# Restart the daemon.
|
||||
$0 stop && sleep 2 && $0 start
|
||||
;;
|
||||
|
||||
status)
|
||||
# Check the status of the process.
|
||||
if [ -e $pidfile ]; then
|
||||
if pidofproc -p $pidfile $daemon > /dev/null; then
|
||||
log_success_msg "$name Process is running"
|
||||
exit 0
|
||||
else
|
||||
log_failure_msg "$name Process is not running"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
log_failure_msg "$name Process is not running"
|
||||
exit 3
|
||||
fi
|
||||
;;
|
||||
|
||||
version)
|
||||
$daemon version
|
||||
;;
|
||||
|
||||
*)
|
||||
# For invalid arguments, print the usage message.
|
||||
echo "Usage: $0 {start|stop|restart|status|version}"
|
||||
exit 2
|
||||
;;
|
||||
esac
|
||||
@@ -2,6 +2,7 @@ package testutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -18,6 +19,9 @@ type Accumulator struct {
|
||||
}
|
||||
|
||||
func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) {
|
||||
if tags == nil {
|
||||
tags = map[string]string{}
|
||||
}
|
||||
a.Points = append(
|
||||
a.Points,
|
||||
&Point{
|
||||
@@ -70,30 +74,23 @@ func (a *Accumulator) CheckTaggedValue(measurement string, val interface{}, tags
|
||||
}
|
||||
|
||||
func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, tags map[string]string) error {
|
||||
if tags == nil {
|
||||
tags = map[string]string{}
|
||||
}
|
||||
for _, p := range a.Points {
|
||||
var found bool
|
||||
|
||||
if p.Tags == nil && tags == nil {
|
||||
found = true
|
||||
} else {
|
||||
for k, v := range p.Tags {
|
||||
if tags[k] == v {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(tags, p.Tags) {
|
||||
continue
|
||||
}
|
||||
|
||||
if found && p.Measurement == measurement {
|
||||
if p.Measurement == measurement {
|
||||
if p.Value != val {
|
||||
return fmt.Errorf("%v (%T) != %v (%T)", p.Value, p.Value, val, val)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("unknown value %s with tags %v", measurement, tags)
|
||||
return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags)
|
||||
}
|
||||
|
||||
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {
|
||||
|
||||
Reference in New Issue
Block a user