Compare commits

...

49 Commits

Author SHA1 Message Date
Todd Persen
120218f9c6 Telegraf should have its own directories. 2015-07-05 18:38:22 -07:00
Todd Persen
38ee6adcd2 Update CHANGELOG.md 2015-07-05 16:58:13 -07:00
Todd Persen
1d8e6473c6 Merge pull request #45 from jhofeditz/patch-1
skip disk tags with no value
2015-07-05 16:51:58 -07:00
Todd Persen
494704b479 Merge pull request #28 from brian-brazil/prometheus-plugin-only
Add support for Prometheus (plugin only)
2015-07-05 16:49:36 -07:00
Todd Persen
d634b08969 Merge pull request #47 from jipperinbham/rethinkdb-plugin
add RethinkDB plugin
2015-07-05 16:47:20 -07:00
Todd Persen
350f91601c Merge pull request #43 from marcosnils/mysql_fix
Fix mysql plugin due to test accumulator refactor
2015-07-05 16:44:13 -07:00
Todd Persen
659e1cfe85 Merge pull request #46 from zepouet/master
Update README.md
2015-07-05 16:43:45 -07:00
JP
1943d89147 add RethinkDB plugin 2015-07-04 15:09:33 -05:00
Nicolas
aa822756e7 Update README.md
Little syntax correction
2015-07-04 00:05:47 +02:00
Joe Hofeditz
073b1084b7 skip disk tags with no value
Influxdb 0.9.1 now rejects writes with empty tag values. This patch skips tags with no values. A disk serial number does not exist for many devices including md raid arrays and VMs. Other plugins may also want to check for empty tags.
2015-07-03 08:11:52 -06:00
Marcos Lilljedahl
5cbe15b676 Return error when can't execute stats query 2015-07-03 09:25:18 -03:00
Marcos Lilljedahl
e2cff9febe Fix mysql plugin due to test accumulator refactor 2015-07-03 09:17:53 -03:00
Paul Dix
e9ad786578 Merge pull request #35 from EmilS/plugins/kafka
Adds Kafka Plugin
2015-07-02 15:44:17 -04:00
Emil Stolarsky
0692b4be61 Add Kafka Consumer Plugin
The Kafka consumer plugin polls a specified Kafka topic and adds messages to
InfluxDB. The plugin assumes messages follow the line protocol. Consumer Group
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
2015-07-02 15:40:13 -04:00
Todd Persen
6550d4f634 Update CHANGELOG.md 2015-07-02 11:25:06 -07:00
Todd Persen
c523ae2c52 Update README.md 2015-07-01 12:37:44 -07:00
Todd Persen
5e1ba3fbb7 Update CHANGELOG.md 2015-07-01 12:36:52 -07:00
Evan Phoenix
6e8a298d21 Merge pull request #32 from tylernisonoff/master
fixed spelling mistake -- memoory -> memory
2015-07-01 11:33:25 -07:00
Tyler Nisonoff
815e9534b8 fixed spelling mistake -- memoory -> memory 2015-06-27 12:56:27 -07:00
Brian Brazil
5390a8ea71 Add Prometheus plugin.
This allows pulling Prometheus metrics from
any client library or exporter over HTTP.
2015-06-24 22:15:58 +01:00
Brian Brazil
e34c52402f Improve test infrastructure 2015-06-24 22:15:58 +01:00
Evan Phoenix
86a6f337f6 Cleanup the URL when one isn't specified 2015-06-23 14:51:55 -07:00
Evan Phoenix
a1f7d5549b Fix type error using URL as a string 2015-06-23 14:51:38 -07:00
Evan Phoenix
5fbd07b146 Add memcached to the all plugins package 2015-06-23 14:51:32 -07:00
Evan Phoenix
b8f3c68b89 Merge pull request #21 from fromYukki/memcached
Memcached plugin
2015-06-23 14:44:52 -07:00
Evan Phoenix
043b171028 Merge pull request #16 from jipperinbham/redis_auth
convert Redis to use URI, support Redis AUTH
2015-06-23 14:37:08 -07:00
Maksim Naumov
b86d789abe Explore "limit_maxbytes" and "bytes" individually 2015-06-23 09:44:39 +02:00
JP
e1c7dc80ae redis plugin accepts URI or string, support Redis AUTH 2015-06-22 20:49:11 -05:00
Evan Phoenix
1fe0791a74 Merge pull request #19 from sherifzain/master
Fixed: differentiate stats gathered from multiple servers/instances
2015-06-22 15:32:30 -07:00
Evan Phoenix
0d87eb4725 Merge pull request #20 from nkatsaros/master
protect accumulator values with a mutex
2015-06-22 15:32:08 -07:00
Sherif Zain
e2dac56a40 Added: server to tags 2015-06-22 10:40:30 +02:00
Maksim Naumov
039fc80ed7 Memcached plugin 2015-06-22 04:27:46 +02:00
Nicholas Katsaros
8e90a444c2 protect accumulator values with a mutex 2015-06-21 21:16:46 -04:00
Sherif Zain
2ccd828e81 Fixed: differentiate stats gathered from multiple redis servers/instances 2015-06-21 23:38:42 +02:00
Todd Persen
480f29bde7 Create a CHANGELOG. 2015-06-20 12:16:47 -07:00
Todd Persen
11a6db8268 Merge pull request #13 from influxdb/fix-packaging
Un-break the packaging script.
2015-06-20 12:10:16 -07:00
Todd Persen
6566cc51e3 Merge pull request #12 from influxdb/s3-cleanup
Clean up descriptions and stop pushing to both S3 buckets.
2015-06-20 12:10:00 -07:00
Gunnar
051cd03bbf Merge pull request #14 from voxxit/voxxit-linux-arm
Add linux/arm to list of built binaries
2015-06-19 23:17:13 -07:00
Joshua Delsman
0aa0a40d89 Add linux/arm to list of built binaries
This can help with Raspberry Pis, embedded devices, and other IoT applications.
2015-06-19 21:53:58 -07:00
Todd Persen
9dcbe750d1 Add Homebrew instructions to README.md 2015-06-19 18:53:35 -07:00
Todd Persen
10bf663a3b Un-break the packaging script. 2015-06-19 11:06:32 -07:00
Todd Persen
b71cfb7cfd Clean up descriptions and stop pushing to both S3 buckets. 2015-06-19 11:05:00 -07:00
Paul Dix
87fedcfa74 Fix typo 2015-06-19 17:43:24 +01:00
Paul Dix
3b9174a322 Add supported plugins 2015-06-19 17:42:29 +01:00
Paul Dix
851fdd439f Move plugins details into readme 2015-06-19 17:38:31 +01:00
Paul Dix
ab78e8efec Update README.md 2015-06-19 17:35:53 +01:00
Paul Dix
b829febe0d Update README.md 2015-06-19 17:34:27 +01:00
Evan Phoenix
39c90dd879 Add package.sh script
These 2 scripts are adapted from the influxd versions. Diffing them
against the influxd versions will show very minor changes.
2015-06-18 11:32:37 -07:00
Evan Phoenix
3a43042089 Add -pidfile and Commit variable 2015-06-18 11:31:16 -07:00
31 changed files with 2094 additions and 151 deletions

30
CHANGELOG.md Normal file
View File

@@ -0,0 +1,30 @@
## v0.1.3 [2015-07-05]
### Features
- [#35](https://github.com/influxdb/influxdb/pull/35): Add Kafka plugin. Thanks @EmilS!
- [#47](https://github.com/influxdb/influxdb/pull/47): Add RethinkDB plugin. Thanks @jipperinbham!
### Bugfixes
- [#45](https://github.com/influxdb/influxdb/pull/45): Skip disk tags that don't have a value. Thanks @jhofeditz!
- [#43](https://github.com/influxdb/influxdb/pull/43): Fix bug in MySQL plugin. Thanks @marcosnils!
## v0.1.2 [2015-07-01]
### Features
- [#12](https://github.com/influxdb/influxdb/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit!
- [#14](https://github.com/influxdb/influxdb/pull/14): Clarify the S3 buckets that Telegraf is pushed to.
- [#16](https://github.com/influxdb/influxdb/pull/16): Convert Redis to use URI, support Redis AUTH. Thanks @jipperinbham!
- [#21](https://github.com/influxdb/influxdb/pull/21): Add memcached plugin. Thanks @Yukki!
### Bugfixes
- [#13](https://github.com/influxdb/influxdb/pull/13): Fix the packaging script.
- [#19](https://github.com/influxdb/influxdb/pull/19): Add host name to metric tags. Thanks @sherifzain!
- [#20](https://github.com/influxdb/influxdb/pull/20): Fix race condition with accumulator mutex. Thanks @nkatsaros!
- [#23](https://github.com/influxdb/influxdb/pull/23): Change name of folder for packages. Thanks @colinrymer!
- [#32](https://github.com/influxdb/influxdb/pull/32): Fix spelling of memoory -> memory. Thanks @tylernisonoff!
## v0.1.1 [2015-06-19]
### Release Notes
This is the initial release of Telegraf.

View File

@@ -1,111 +0,0 @@
Telegraf is entirely plugin driven. This interface allows for operators to
pick and chose what is gathered as well as makes it easy for developers
to create new ways of generating metrics.
Plugin authorship is kept as simple as possible to promote people to develop
and submit new plugins.
## Guidelines
* A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the `github.com/influxdb/telegraf/plugins/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the plugin can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this plugin does.
### Plugin interface
```go
type Plugin interface {
SampleConfig() string
Description() string
Gather(Accumulator) error
}
type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddValuesWithTime(measurement string, values map[string]interface{}, tags map[string]string, timestamp time.Time)
}
```
### Accumulator
The way that a plugin emits metrics is by interacting with the Accumulator.
The `Add` function takes 3 arguments:
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
* **value**: A value for the metric. This accepts 5 different types of value:
* **int**: The most common type. All int types are accepted but favor using `int64`
Useful for counters, etc.
* **float**: Favor `float64`, useful for gauges, percentages, etc.
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
* **string**: Typically used to indicate a message, or some kind of freeform information.
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
* **tags**: This is a map of strings to strings to describe the where or who about the metric. For instance, the `net` plugin adds a tag named `"interface"` set to the name of the network interface, like `"eth0"`.
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
used are the same type profile as **value** above. The **timestamp** argument
allows a point to be registered as having occurred at an arbitrary time.
Let's say you've written a plugin that emits metrics about processes on the current host.
```go
type Process struct {
CPUTime float64
MemoryBytes int64
PID int
}
func Gather(acc plugins.Accumulator) error {
for _, process := range system.Processes() {
tags := map[string]string {
"pid": fmt.Sprintf("%d", process.Pid),
}
acc.Add("cpu", process.CPUTime, tags)
acc.Add("memoory", process.MemoryBytes, tags)
}
}
```
### Example
```go
// simple.go
import "github.com/influxdb/telegraf/plugins"
type Simple struct {
Ok bool
}
func (s *Simple) Description() string {
return "a demo plugin"
}
func (s *Simple) SampleConfig() string {
return "ok = true # indicate if everything is fine"
}
func (s *Simple) Gather(acc plugins.Accumulator) error {
if s.Ok {
acc.Add("state", "pretty good", nil)
} else {
acc.Add("state", "not great", nil)
}
return nil
}
func init() {
plugins.Add("simple", func() plugins.Plugin { &Simple{} })
}
```

154
README.md
View File

@@ -1,13 +1,37 @@
# Telegraf - A native agent for InfluxDB
Telegraf is an agent written in Go for collecting metrics from the system it's running on or from other services and writing them into InfluxDB.
Design goals are to have a minimal memory footprint with a plugin system so that developers in the community can easily add support for collecting metrics from well known services (like Hadoop, or Postgres, or Redis) and third party APIs (like Mailchimp, AWS CloudWatch, or Google Analytics).
We'll eagerly accept pull requests for new plugins and will manage the set of plugins that Telegraf supports. See the bottom of this doc for instructions on writing new plugins.
## Quickstart
* Build from source or download telegraf (binaries forthcoming)
* Build from source or download telegraf:
### Linux packages for Debian/Ubuntu and RHEL/CentOS:
```
http://get.influxdb.org/telegraf/telegraf_0.1.2_amd64.deb
http://get.influxdb.org/telegraf/telegraf-0.1.2-1.x86_64.rpm
```
### OSX via Homebrew:
```
brew update
brew install telegraf
```
### How to use it:
* Run `telegraf -sample-config > telegraf.toml` to create an initial configuration
* Edit the configuration to match your needs
* Run `telegraf -config telegraf.toml -test` to output one full measurement sample to STDOUT
* Run `telegraf -config telegraf.toml` to gather and send metrics to InfluxDB
## Telegraf Options
Telegraf has a few options you can configure under the `agent` section of the config. If you don't see an `agent` section run `telegraf -sample-config > telegraf.toml` to create a valid initial configuration:
@@ -16,6 +40,19 @@ Telegraf has a few options you can configure under the `agent` section of the co
* **interval**: How ofter to gather metrics. Uses a simple number + unit parser, ie "10s" for 10 seconds or "5m" for 5 minutes.
* **debug**: Set to true to gather and send metrics to STDOUT as well as InfluxDB.
## Supported Plugins
Telegraf currently has support for collecting metrics from:
* System (memory, CPU, network, etc.)
* Docker
* MySQL
* Prometheus (client libraries and exporters)
* PostgreSQL
* Redis
We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.
## Plugin Options
There are 3 configuration options that are configurable per plugin:
@@ -23,3 +60,118 @@ There are 3 configuration options that are configurable per plugin:
* **pass**: An array of strings that is used to filter metrics generated by the current plugin. Each string in the array is tested as a prefix against metrics and if it matches, the metric is emitted.
* **drop**: The inverse of pass, if a metric matches, it is not emitted.
* **interval**: How often to gather this metric. Normal plugins use a single global interval, but if one particular plugin should be run less or more often, you can configure that here.
## Plugins
This section is for developers that want to create new collection plugins. Telegraf is entirely plugin driven. This interface allows for operators to
pick and chose what is gathered as well as makes it easy for developers
to create new ways of generating metrics.
Plugin authorship is kept as simple as possible to promote people to develop
and submit new plugins.
## Guidelines
* A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the `github.com/influxdb/telegraf/plugins/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the plugin can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this plugin does.
### Plugin interface
```go
type Plugin interface {
SampleConfig() string
Description() string
Gather(Accumulator) error
}
type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddValuesWithTime(measurement string, values map[string]interface{}, tags map[string]string, timestamp time.Time)
}
```
### Accumulator
The way that a plugin emits metrics is by interacting with the Accumulator.
The `Add` function takes 3 arguments:
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
* **value**: A value for the metric. This accepts 5 different types of value:
* **int**: The most common type. All int types are accepted but favor using `int64`
Useful for counters, etc.
* **float**: Favor `float64`, useful for gauges, percentages, etc.
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
* **string**: Typically used to indicate a message, or some kind of freeform information.
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
* **tags**: This is a map of strings to strings to describe the where or who about the metric. For instance, the `net` plugin adds a tag named `"interface"` set to the name of the network interface, like `"eth0"`.
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
used are the same type profile as **value** above. The **timestamp** argument
allows a point to be registered as having occurred at an arbitrary time.
Let's say you've written a plugin that emits metrics about processes on the current host.
```go
type Process struct {
CPUTime float64
MemoryBytes int64
PID int
}
func Gather(acc plugins.Accumulator) error {
for _, process := range system.Processes() {
tags := map[string]string {
"pid": fmt.Sprintf("%d", process.Pid),
}
acc.Add("cpu", process.CPUTime, tags)
acc.Add("memory", process.MemoryBytes, tags)
}
}
```
### Example
```go
package simple
// simple.go
import "github.com/influxdb/telegraf/plugins"
type Simple struct {
Ok bool
}
func (s *Simple) Description() string {
return "a demo plugin"
}
func (s *Simple) SampleConfig() string {
return "ok = true # indicate if everything is fine"
}
func (s *Simple) Gather(acc plugins.Accumulator) error {
if s.Ok {
acc.Add("state", "pretty good", nil)
} else {
acc.Add("state", "not great", nil)
}
return nil
}
func init() {
plugins.Add("simple", func() plugins.Plugin { return &Simple{} })
}
```

View File

@@ -4,12 +4,15 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/influxdb/influxdb/client"
)
type BatchPoints struct {
mu sync.Mutex
client.BatchPoints
Debug bool
@@ -20,6 +23,9 @@ type BatchPoints struct {
}
func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) {
bp.mu.Lock()
defer bp.mu.Unlock()
measurement = bp.Prefix + measurement
if bp.Config != nil {
@@ -55,6 +61,9 @@ func (bp *BatchPoints) AddValuesWithTime(
tags map[string]string,
timestamp time.Time,
) {
bp.mu.Lock()
defer bp.mu.Unlock()
measurement = bp.Prefix + measurement
if bp.Config != nil {

View File

@@ -17,8 +17,10 @@ var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fConfig = flag.String("config", "", "configuration file to load")
var fVersion = flag.Bool("version", false, "display the version")
var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration")
var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var Version = "unreleased"
var Commit = ""
func main() {
flag.Parse()
@@ -104,5 +106,16 @@ func main() {
log.Printf("Tags enabled: %v", config.ListTags())
}
if *fPidfile != "" {
f, err := os.Create(*fPidfile)
if err != nil {
log.Fatalf("Unable to create pidfile: %s", err)
}
fmt.Fprintf(f, "%d\n", os.Getpid())
f.Close()
}
ag.Run(shutdown)
}

349
package.sh Executable file
View File

@@ -0,0 +1,349 @@
#!/usr/bin/env bash
###########################################################################
# Packaging script which creates debian and RPM packages. It optionally
# tags the repo with the given version.
#
# Requirements: GOPATH must be set. 'fpm' must be on the path, and the AWS
# CLI tools must also be installed.
#
# https://github.com/jordansissel/fpm
# http://aws.amazon.com/cli/
#
# Packaging process: to package a build, simple execute:
#
# package.sh <version>
#
# where <version> is the desired version. If generation of a debian and RPM
# package is successful, the script will offer to tag the repo using the
# supplied version string.
#
# AWS upload: the script will also offer to upload the packages to S3. If
# this option is selected, the credentials should be present in the file
# ~/aws.conf. The contents should be of the form:
#
# [default]
# aws_access_key_id=<access ID>
# aws_secret_access_key=<secret key>
# region = us-east-1
#
# Trim the leading spaces when creating the file. The script will exit if
# S3 upload is requested, but this file does not exist.
AWS_FILE=~/aws.conf
INSTALL_ROOT_DIR=/opt/telegraf
TELEGRAF_LOG_DIR=/var/log/telegraf
CONFIG_ROOT_DIR=/etc/opt/telegraf
SAMPLE_CONFIGURATION=etc/config.sample.toml
INITD_SCRIPT=scripts/init.sh
TMP_WORK_DIR=`mktemp -d`
POST_INSTALL_PATH=`mktemp`
ARCH=`uname -i`
LICENSE=MIT
URL=influxdb.com
MAINTAINER=support@influxdb.com
VENDOR=InfluxDB
DESCRIPTION="InfluxDB Telegraf agent"
PKG_DEPS=(coreutils)
GO_VERSION="go1.4.2"
GOPATH_INSTALL=
BINS=(
telegraf
)
###########################################################################
# Helper functions.
# usage prints simple usage information.
usage() {
echo -e "$0 [<version>] [-h]\n"
cleanup_exit $1
}
# cleanup_exit removes all resources created during the process and exits with
# the supplied returned code.
cleanup_exit() {
rm -r $TMP_WORK_DIR
rm $POST_INSTALL_PATH
exit $1
}
# check_gopath sanity checks the value of the GOPATH env variable, and determines
# the path where build artifacts are installed. GOPATH may be a colon-delimited
# list of directories.
check_gopath() {
[ -z "$GOPATH" ] && echo "GOPATH is not set." && cleanup_exit 1
GOPATH_INSTALL=`echo $GOPATH | cut -d ':' -f 1`
[ ! -d "$GOPATH_INSTALL" ] && echo "GOPATH_INSTALL is not a directory." && cleanup_exit 1
echo "GOPATH ($GOPATH) looks sane, using $GOPATH_INSTALL for installation."
}
check_gvm() {
source $HOME/.gvm/scripts/gvm
which gvm
if [ $? -ne 0 ]; then
echo "gvm not found -- aborting."
cleanup_exit $1
fi
gvm use $GO_VERSION
if [ $? -ne 0 ]; then
echo "gvm cannot find Go version $GO_VERSION -- aborting."
cleanup_exit $1
fi
}
# check_clean_tree ensures that no source file is locally modified.
check_clean_tree() {
modified=$(git ls-files --modified | wc -l)
if [ $modified -ne 0 ]; then
echo "The source tree is not clean -- aborting."
cleanup_exit 1
fi
echo "Git tree is clean."
}
# update_tree ensures the tree is in-sync with the repo.
update_tree() {
git pull origin master
if [ $? -ne 0 ]; then
echo "Failed to pull latest code -- aborting."
cleanup_exit 1
fi
git fetch --tags
if [ $? -ne 0 ]; then
echo "Failed to fetch tags -- aborting."
cleanup_exit 1
fi
echo "Git tree updated successfully."
}
# check_tag_exists checks if the existing release already exists in the tags.
check_tag_exists () {
version=$1
git tag | grep -q "^v$version$"
if [ $? -eq 0 ]; then
echo "Proposed version $version already exists as a tag -- aborting."
cleanup_exit 1
fi
}
# make_dir_tree creates the directory structure within the packages.
make_dir_tree() {
work_dir=$1
version=$2
mkdir -p $work_dir/$INSTALL_ROOT_DIR/versions/$version/scripts
if [ $? -ne 0 ]; then
echo "Failed to create installation directory -- aborting."
cleanup_exit 1
fi
mkdir -p $work_dir/$CONFIG_ROOT_DIR
if [ $? -ne 0 ]; then
echo "Failed to create configuration directory -- aborting."
cleanup_exit 1
fi
}
# do_build builds the code. The version and commit must be passed in.
do_build() {
version=$1
commit=`git rev-parse HEAD`
if [ $? -ne 0 ]; then
echo "Unable to retrieve current commit -- aborting"
cleanup_exit 1
fi
for b in ${BINS[*]}; do
rm -f $GOPATH_INSTALL/bin/$b
done
go get -u -f ./...
if [ $? -ne 0 ]; then
echo "WARNING: failed to 'go get' packages."
fi
go install -a -ldflags="-X main.Version $version -X main.Commit $commit" ./...
if [ $? -ne 0 ]; then
echo "Build failed, unable to create package -- aborting"
cleanup_exit 1
fi
echo "Build completed successfully."
}
# generate_postinstall_script creates the post-install script for the
# package. It must be passed the version.
generate_postinstall_script() {
version=$1
cat <<EOF >$POST_INSTALL_PATH
rm -f $INSTALL_ROOT_DIR/telegraf
rm -f $INSTALL_ROOT_DIR/init.sh
ln -s $INSTALL_ROOT_DIR/versions/$version/telegraf $INSTALL_ROOT_DIR/telegraf
ln -s $INSTALL_ROOT_DIR/versions/$version/scripts/init.sh $INSTALL_ROOT_DIR/init.sh
rm -f /etc/init.d/telegraf
ln -sfn $INSTALL_ROOT_DIR/init.sh /etc/init.d/telegraf
chmod +x /etc/init.d/telegraf
if which update-rc.d > /dev/null 2>&1 ; then
update-rc.d -f telegraf remove
update-rc.d telegraf defaults
else
chkconfig --add telegraf
fi
if ! id telegraf >/dev/null 2>&1; then
useradd --system -U -M telegraf
fi
chown -R -L telegraf:telegraf $INSTALL_ROOT_DIR
chmod -R a+rX $INSTALL_ROOT_DIR
mkdir -p $TELEGRAF_LOG_DIR
chown -R -L telegraf:telegraf $TELEGRAF_LOG_DIR
EOF
echo "Post-install script created successfully at $POST_INSTALL_PATH"
}
###########################################################################
# Start the packaging process.
if [ $# -ne 1 ]; then
usage 1
elif [ $1 == "-h" ]; then
usage 0
else
VERSION=$1
fi
echo -e "\nStarting package process...\n"
check_gvm
check_gopath
check_clean_tree
update_tree
check_tag_exists $VERSION
do_build $VERSION
make_dir_tree $TMP_WORK_DIR $VERSION
###########################################################################
# Copy the assets to the installation directories.
for b in ${BINS[*]}; do
cp $GOPATH_INSTALL/bin/$b $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION
if [ $? -ne 0 ]; then
echo "Failed to copy binaries to packaging directory -- aborting."
cleanup_exit 1
fi
done
echo "${BINS[*]} copied to $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION"
cp $INITD_SCRIPT $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION/scripts
if [ $? -ne 0 ]; then
echo "Failed to copy init.d script to packaging directory -- aborting."
cleanup_exit 1
fi
echo "$INITD_SCRIPT copied to $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION/scripts"
cp $SAMPLE_CONFIGURATION $TMP_WORK_DIR/$CONFIG_ROOT_DIR/telegraf.conf
if [ $? -ne 0 ]; then
echo "Failed to copy $SAMPLE_CONFIGURATION to packaging directory -- aborting."
cleanup_exit 1
fi
generate_postinstall_script $VERSION
###########################################################################
# Create the actual packages.
echo -n "Commence creation of $ARCH packages, version $VERSION? [Y/n] "
read response
response=`echo $response | tr 'A-Z' 'a-z'`
if [ "x$response" == "xn" ]; then
echo "Packaging aborted."
cleanup_exit 1
fi
if [ $ARCH == "i386" ]; then
rpm_package=telegraf-$VERSION-1.i686.rpm
debian_package=telegraf_${VERSION}_i686.deb
deb_args="-a i686"
rpm_args="setarch i686"
elif [ $ARCH == "arm" ]; then
rpm_package=telegraf-$VERSION-1.armel.rpm
debian_package=telegraf_${VERSION}_armel.deb
else
rpm_package=telegraf-$VERSION-1.x86_64.rpm
debian_package=telegraf_${VERSION}_amd64.deb
fi
COMMON_FPM_ARGS="-C $TMP_WORK_DIR --vendor $VENDOR --url $URL --license $LICENSE --maintainer $MAINTAINER --after-install $POST_INSTALL_PATH --name telegraf --version $VERSION --config-files $CONFIG_ROOT_DIR ."
$rpm_args fpm -s dir -t rpm --description "$DESCRIPTION" $COMMON_FPM_ARGS
if [ $? -ne 0 ]; then
echo "Failed to create RPM package -- aborting."
cleanup_exit 1
fi
echo "RPM package created successfully."
fpm -s dir -t deb $deb_args --description "$DESCRIPTION" $COMMON_FPM_ARGS
if [ $? -ne 0 ]; then
echo "Failed to create Debian package -- aborting."
cleanup_exit 1
fi
echo "Debian package created successfully."
###########################################################################
# Offer to tag the repo.
echo -n "Tag source tree with v$VERSION and push to repo? [y/N] "
read response
response=`echo $response | tr 'A-Z' 'a-z'`
if [ "x$response" == "xy" ]; then
echo "Creating tag v$VERSION and pushing to repo"
git tag v$VERSION
if [ $? -ne 0 ]; then
echo "Failed to create tag v$VERSION -- aborting"
cleanup_exit 1
fi
git push origin v$VERSION
if [ $? -ne 0 ]; then
echo "Failed to push tag v$VERSION to repo -- aborting"
cleanup_exit 1
fi
else
echo "Not creating tag v$VERSION."
fi
###########################################################################
# Offer to publish the packages.
echo -n "Publish packages to S3? [y/N] "
read response
response=`echo $response | tr 'A-Z' 'a-z'`
if [ "x$response" == "xy" ]; then
echo "Publishing packages to S3."
if [ ! -e "$AWS_FILE" ]; then
echo "$AWS_FILE does not exist -- aborting."
cleanup_exit 1
fi
for filepath in `ls *.{deb,rpm}`; do
echo "Uploading $filepath to S3"
filename=`basename $filepath`
echo "Uploading $filename to s3://get.influxdb.org/telegraf/$filename"
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath s3://get.influxdb.org/telegraf/$filename --acl public-read --region us-east-1
if [ $? -ne 0 ]; then
echo "Upload failed -- aborting".
cleanup_exit 1
fi
done
else
echo "Not publishing packages to S3."
fi
###########################################################################
# All done.
echo -e "\nPackaging process complete."
cleanup_exit 0

View File

@@ -1,8 +1,11 @@
package all
import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/prometheus"
_ "github.com/influxdb/telegraf/plugins/redis"
_ "github.com/influxdb/telegraf/plugins/system"
)

View File

@@ -0,0 +1,153 @@
package kafka_consumer
import (
"os"
"os/signal"
"time"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/telegraf/plugins"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)
type Kafka struct {
ConsumerGroupName string
Topic string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
BatchSize int
}
var sampleConfig = `
# topic to consume
topic = "topic_with_metrics"
# the name of the consumer group
consumerGroupName = "telegraf_metrics_consumers"
# an array of Zookeeper connection strings
zookeeperPeers = ["localhost:2181"]
# Batch size of points sent to InfluxDB
batchSize = 1000`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "read metrics from a Kafka topic"
}
type Metric struct {
Measurement string `json:"measurement"`
Values map[string]interface{} `json:"values"`
Tags map[string]string `json:"tags"`
Time time.Time `json:"time"`
}
func (k *Kafka) Gather(acc plugins.Accumulator) error {
var consumerErr error
metricQueue := make(chan []byte, 200)
if k.Consumer == nil {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroupName,
[]string{k.Topic},
k.ZookeeperPeers,
nil,
)
if consumerErr != nil {
return consumerErr
}
c := make(chan os.Signal, 1)
halt := make(chan bool, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
halt <- true
emitMetrics(k, acc, metricQueue)
k.Consumer.Close()
}()
go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
}
return emitMetrics(k, acc, metricQueue)
}
func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
timeout := time.After(1 * time.Second)
for {
select {
case batch := <-metricConsumer:
var points []tsdb.Point
var err error
if points, err = tsdb.ParsePoints(batch); err != nil {
return err
}
for _, point := range points {
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
}
case <-timeout:
return nil
}
}
}
const millisecond = 1000000 * time.Nanosecond
type ack func(*sarama.ConsumerMessage) error
func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
batch := make([]byte, 0)
currentBatchSize := 0
timeout := time.After(500 * millisecond)
var msg *sarama.ConsumerMessage
for {
select {
case msg = <-kafkaMsgs:
if currentBatchSize != 0 {
batch = append(batch, '\n')
}
batch = append(batch, msg.Value...)
currentBatchSize++
if currentBatchSize == maxBatchSize {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}
case <-timeout:
if currentBatchSize != 0 {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}
timeout = time.After(500 * millisecond)
case <-halt:
if currentBatchSize != 0 {
metricProducer <- batch
ackMsg(msg)
}
return
}
}
}
func init() {
plugins.Add("kafka", func() plugins.Plugin {
return &Kafka{}
})
}

View File

@@ -0,0 +1,62 @@
package kafka_consumer
import (
"fmt"
"os"
"strings"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestReadsMetricsFromKafka(t *testing.T) {
var zkPeers, brokerPeers []string
if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 {
zkPeers = []string{"localhost:2181"}
} else {
zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",")
}
if len(os.Getenv("KAFKA_PEERS")) == 0 {
brokerPeers = []string{"localhost:9092"}
} else {
brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",")
}
k := &Kafka{
ConsumerGroupName: "telegraf_test_consumers",
Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
ZookeeperPeers: zkPeers,
}
msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
require.NoError(t, err)
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
producer.Close()
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Points), "there should not be any points")
err = k.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
}

View File

@@ -0,0 +1,95 @@
package kafka_consumer
import (
"strings"
"testing"
"time"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/Shopify/sarama.v1"
)
const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) {
halt := make(chan bool, 1)
metricChan := make(chan []byte, 1)
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
for i := 0; i < 10; i++ {
kafkaChan <- saramaMsg(testMsg)
}
expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
batch := <-metricChan
assert.Equal(t, expectedBatch, string(batch))
halt <- true
return nil
}, halt)
}
func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) {
halt := make(chan bool, 1)
metricChan := make(chan []byte, 1)
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
for i := 0; i < 3; i++ {
kafkaChan <- saramaMsg(testMsg)
}
expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
batch := <-metricChan
assert.Equal(t, expectedBatch, string(batch))
halt <- true
return nil
}, halt)
}
func TestEmitMetricsSendMetricsToAcc(t *testing.T) {
k := &Kafka{}
var acc testutil.Accumulator
testChan := make(chan []byte, 1)
testChan <- []byte(testMsg)
err := emitMetrics(k, &acc, testChan)
require.NoError(t, err)
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
}
func TestEmitMetricsTimesOut(t *testing.T) {
k := &Kafka{}
var acc testutil.Accumulator
testChan := make(chan []byte)
err := emitMetrics(k, &acc, testChan)
require.NoError(t, err)
assert.Equal(t, 0, len(acc.Points), "there should not be a any points")
}
func saramaMsg(val string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
}
}

View File

@@ -0,0 +1,134 @@
package memcached
import (
"bufio"
"bytes"
"fmt"
"net"
"strconv"
"time"
"github.com/influxdb/telegraf/plugins"
)
// Memcached is a memcached plugin
type Memcached struct {
Servers []string
}
var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
var defaultTimeout = 5 * time.Second
// The list of metrics tha should be calculated
var sendAsIs = []string{
"get_hits",
"get_misses",
"evictions",
"limit_maxbytes",
"bytes",
}
// SampleConfig returns sample configuration message
func (m *Memcached) SampleConfig() string {
return sampleConfig
}
// Description returns description of Memcached plugin
func (m *Memcached) Description() string {
return "Read metrics from one or many memcached servers"
}
// Gather reads stats from all configured servers accumulates stats
func (m *Memcached) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
return m.gatherServer(":11211", acc)
}
for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, acc); err != nil {
return err
}
}
return nil
}
func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":11211"
}
// Connect
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()
// Extend connection
conn.SetDeadline(time.Now().Add(defaultTimeout))
// Read and write buffer
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
// Send command
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
return err
}
if err = rw.Flush(); err != nil {
return err
}
// Read response
values := make(map[string]string)
for {
// Read line
line, _, errRead := rw.Reader.ReadLine()
if errRead != nil {
return errRead
}
// Done
if bytes.Equal(line, []byte("END")) {
break
}
// Read values
var name, value string
n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value)
if errScan != nil || n != 2 {
return fmt.Errorf("unexpected line in stats response: %q", line)
}
// Save values
values[name] = value
}
//
tags := map[string]string{"server": address}
// Process values
for _, key := range sendAsIs {
if value, ok := values[key]; ok {
// Mostly it is the number
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
acc.Add(key, value, tags)
} else {
acc.Add(key, iValue, tags)
}
}
}
return nil
}
func init() {
plugins.Add("memcached", func() plugins.Plugin {
return &Memcached{}
})
}

View File

@@ -0,0 +1,26 @@
package memcached
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMemcachedGeneratesMetrics(t *testing.T) {
m := &Memcached{
Servers: []string{"localhost"},
}
var acc testutil.Accumulator
err := m.Gather(&acc)
require.NoError(t, err)
intMetrics := []string{"get_hits", "get_misses", "evictions", "limit_maxbytes", "bytes"}
for _, metric := range intMetrics {
assert.True(t, acc.HasIntValue(metric), metric)
}
}

View File

@@ -91,7 +91,7 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
rows, err := db.Query(`SHOW /*!50002 GLOBAL */ STATUS`)
if err != nil {
return nil
return err
}
for rows.Next() {

View File

@@ -39,7 +39,7 @@ func TestMysqlGeneratesMetrics(t *testing.T) {
var count int
for _, p := range acc.Points {
if strings.HasPrefix(p.Name, prefix.prefix) {
if strings.HasPrefix(p.Measurement, prefix.prefix) {
count++
}
}

View File

@@ -89,7 +89,7 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
defer rows.Close()
for rows.Next() {
err := p.accRow(rows, acc)
err := p.accRow(rows, acc, serv.Address)
if err != nil {
return err
}
@@ -100,7 +100,7 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
for _, name := range serv.Databases {
row := db.QueryRow(`SELECT * FROM pg_stat_database WHERE datname=$1`, name)
err := p.accRow(row, acc)
err := p.accRow(row, acc, serv.Address)
if err != nil {
return err
}
@@ -114,7 +114,7 @@ type scanner interface {
Scan(dest ...interface{}) error
}
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, server string) error {
var ignore interface{}
var name string
var commit, rollback, read, hit int64
@@ -135,7 +135,7 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
return err
}
tags := map[string]string{"db": name}
tags := map[string]string{"server": server, "db": name}
acc.Add("xact_commit", commit, tags)
acc.Add("xact_rollback", rollback, tags)

View File

@@ -91,7 +91,7 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) {
var found bool
for _, pnt := range acc.Points {
if pnt.Name == "xact_commit" {
if pnt.Measurement == "xact_commit" {
if pnt.Tags["db"] == "postgres" {
found = true
break

View File

@@ -0,0 +1,105 @@
package prometheus
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/influxdb/telegraf/plugins"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
)
type Prometheus struct {
Urls []string
}
var sampleConfig = `
# An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]`
func (r *Prometheus) SampleConfig() string {
return sampleConfig
}
func (r *Prometheus) Description() string {
return "Read metrics from one or many prometheus clients"
}
var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (g *Prometheus) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
var outerr error
for _, serv := range g.Urls {
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = g.gatherURL(serv, acc)
}(serv)
}
wg.Wait()
return outerr
}
func (g *Prometheus) gatherURL(url string, acc plugins.Accumulator) error {
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}
processor, err := extraction.ProcessorForRequestHeader(resp.Header)
if err != nil {
return fmt.Errorf("error getting extractor for %s: %s", url, err)
}
ingestor := &Ingester{
acc: acc,
}
options := &extraction.ProcessOptions{
Timestamp: model.TimestampFromTime(time.Now()),
}
err = processor.ProcessSingle(resp.Body, ingestor, options)
if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s", url, err)
}
return nil
}
type Ingester struct {
acc plugins.Accumulator
}
// Ingest implements an extraction.Ingester.
func (i *Ingester) Ingest(samples model.Samples) error {
for _, sample := range samples {
tags := map[string]string{}
for key, value := range sample.Metric {
if key == model.MetricNameLabel {
continue
}
tags[string(key)] = string(value)
}
i.acc.Add(string(sample.Metric[model.MetricNameLabel]), float64(sample.Value), tags)
}
return nil
}
func init() {
plugins.Add("prometheus", func() plugins.Plugin {
return &Prometheus{}
})
}

View File

@@ -0,0 +1,56 @@
package prometheus
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
`
func TestPrometheusGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Urls: []string{ts.URL},
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
expected := []struct {
name string
value float64
tags map[string]string
}{
{"go_gc_duration_seconds_count", 7, map[string]string{}},
{"go_goroutines", 15, map[string]string{}},
}
for _, e := range expected {
assert.NoError(t, acc.ValidateValue(e.name, e.value))
}
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"sync"
@@ -20,8 +21,9 @@ type Redis struct {
}
var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.10.3.33:18832, etc.
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
@@ -73,7 +75,10 @@ var ErrProtocolError = errors.New("redis protocol error")
// Returns one of the errors encountered while gather stats (if any).
func (g *Redis) Gather(acc plugins.Accumulator) error {
if len(g.Servers) == 0 {
g.gatherServer(":6379", acc)
url := &url.URL{
Host: ":6379",
}
g.gatherServer(url, acc)
return nil
}
@@ -82,10 +87,19 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
var outerr error
for _, serv := range g.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Scheme = "tcp"
u.Host = serv
u.Path = ""
}
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = g.gatherServer(serv, acc)
outerr = g.gatherServer(u, acc)
}(serv)
}
@@ -96,17 +110,34 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
const defaultPort = "6379"
func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
if g.c == nil {
_, _, err := net.SplitHostPort(addr)
_, _, err := net.SplitHostPort(addr.Host)
if err != nil {
addr = addr + ":" + defaultPort
addr.Host = addr.Host + ":" + defaultPort
}
c, err := net.Dial("tcp", addr)
c, err := net.Dial("tcp", addr.Host)
if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr, err)
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
}
if addr.User != nil {
pwd, set := addr.User.Password()
if set && pwd != "" {
c.Write([]byte(fmt.Sprintf("AUTH %s\n", pwd)))
r := bufio.NewReader(c)
line, err := r.ReadString('\n')
if err != nil {
return err
}
if line[0] != '+' {
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
}
}
}
g.c = c
@@ -157,11 +188,12 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
continue
}
tags := map[string]string{"host": addr.String()}
val := strings.TrimSpace(parts[1])
ival, err := strconv.ParseUint(val, 10, 64)
if err == nil {
acc.Add(metric, ival, nil)
acc.Add(metric, ival, tags)
continue
}
@@ -170,7 +202,7 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
return err
}
acc.Add(metric, fval, nil)
acc.Add(metric, fval, tags)
}
return nil

View File

@@ -40,7 +40,7 @@ func TestRedisGeneratesMetrics(t *testing.T) {
}
}()
addr := l.Addr().String()
addr := fmt.Sprintf("redis://%s", l.Addr().String())
r := &Redis{
Servers: []string{addr},
@@ -131,7 +131,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
}
}()
addr := l.Addr().String()
addr := fmt.Sprintf("redis://%s", l.Addr().String())
r := &Redis{
Servers: []string{addr},

View File

@@ -0,0 +1,92 @@
package rethinkdb
import (
"fmt"
"net/url"
"sync"
"github.com/influxdb/telegraf/plugins"
"gopkg.in/dancannon/gorethink.v1"
)
type RethinkDB struct {
Servers []string
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
servers = ["127.0.0.1:28015"]`
func (r *RethinkDB) SampleConfig() string {
return sampleConfig
}
func (r *RethinkDB) Description() string {
return "Read metrics from one or many RethinkDB servers"
}
var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *RethinkDB) Gather(acc plugins.Accumulator) error {
if len(r.Servers) == 0 {
r.gatherServer(localhost, acc)
return nil
}
var wg sync.WaitGroup
var outerr error
for _, serv := range r.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
}
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = r.gatherServer(&Server{Url: u}, acc)
}(serv)
}
wg.Wait()
return outerr
}
func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error {
var err error
connectOpts := gorethink.ConnectOpts{
Address: server.Url.Host,
DiscoverHosts: false,
}
if server.Url.User != nil {
pwd, set := server.Url.User.Password()
if set && pwd != "" {
connectOpts.AuthKey = pwd
}
}
server.session, err = gorethink.Connect(connectOpts)
if err != nil {
return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error())
}
defer server.session.Close()
return server.gatherData(acc)
}
func init() {
plugins.Add("rethinkdb", func() plugins.Plugin {
return &RethinkDB{}
})
}

View File

@@ -0,0 +1,110 @@
package rethinkdb
import (
"reflect"
"time"
"github.com/influxdb/telegraf/plugins"
)
type serverStatus struct {
Id string `gorethink:"id"`
Network struct {
Addresses []Address `gorethink:"canonical_addresses"`
Hostname string `gorethink:"hostname"`
DriverPort int `gorethink:"reql_port"`
} `gorethink:"network"`
Process struct {
Version string `gorethink:"version"`
RunningSince time.Time `gorethink:"time_started"`
} `gorethink:"process"`
}
type Address struct {
Host string `gorethink:"host"`
Port int `gorethink:"port"`
}
type stats struct {
Engine Engine `gorethink:"query_engine"`
}
type Engine struct {
ClientConns int64 `gorethink:"client_connections,omitempty"`
ClientActive int64 `gorethink:"clients_active,omitempty"`
QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"`
TotalQueries int64 `gorethink:"queries_total,omitempty"`
ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"`
TotalReads int64 `gorethink:"read_docs_total,omitempty"`
WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"`
TotalWrites int64 `gorethink:"written_docs_total,omitempty"`
}
type tableStatus struct {
Id string `gorethink:"id"`
DB string `gorethink:"db"`
Name string `gorethink:"name"`
}
type tableStats struct {
Engine Engine `gorethink:"query_engine"`
Storage Storage `gorethink:"storage_engine"`
}
type Storage struct {
Cache Cache `gorethink:"cache"`
Disk Disk `gorethink:"disk"`
}
type Cache struct {
BytesInUse int64 `gorethink:"in_use_bytes"`
}
type Disk struct {
ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"`
ReadBytesTotal int64 `gorethink:"read_bytes_total"`
WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"`
WriteBytesTotal int64 `gorethink:"written_bytes_total"`
SpaceUsage SpaceUsage `gorethink:"space_usage"`
}
type SpaceUsage struct {
Data int64 `gorethink:"data_bytes"`
Garbage int64 `gorethink:"garbage_bytes"`
Metadata int64 `gorethink:"metadata_bytes"`
Prealloc int64 `gorethink:"preallocated_bytes"`
}
var engineStats = map[string]string{
"active_clients": "ClientActive",
"clients": "ClientConns",
"queries_per_sec": "QueriesPerSec",
"total_queries": "TotalQueries",
"read_docs_per_sec": "ReadsPerSec",
"total_reads": "TotalReads",
"written_docs_per_sec": "WritesPerSec",
"total_writes": "TotalWrites",
}
func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) {
engine := reflect.ValueOf(e).Elem()
for _, key := range keys {
acc.Add(
key,
engine.FieldByName(engineStats[key]).Interface(),
tags,
)
}
}
func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) {
acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags)
acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags)
acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags)
acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags)
acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags)
acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags)
acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags)
acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags)
acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags)
}

View File

@@ -0,0 +1,112 @@
package rethinkdb
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
var tags = make(map[string]string)
func TestAddEngineStats(t *testing.T) {
engine := &Engine{
ClientConns: 0,
ClientActive: 0,
QueriesPerSec: 0,
TotalQueries: 0,
ReadsPerSec: 0,
TotalReads: 0,
WritesPerSec: 0,
TotalWrites: 0,
}
var acc testutil.Accumulator
keys := []string{
"active_clients",
"clients",
"queries_per_sec",
"total_queries",
"read_docs_per_sec",
"total_reads",
"written_docs_per_sec",
"total_writes",
}
engine.AddEngineStats(keys, &acc, tags)
for _, metric := range keys {
assert.True(t, acc.HasIntValue(metric))
}
}
func TestAddEngineStatsPartial(t *testing.T) {
engine := &Engine{
ClientConns: 0,
ClientActive: 0,
QueriesPerSec: 0,
ReadsPerSec: 0,
WritesPerSec: 0,
}
var acc testutil.Accumulator
keys := []string{
"active_clients",
"clients",
"queries_per_sec",
"read_docs_per_sec",
"written_docs_per_sec",
}
missing_keys := []string{
"total_queries",
"total_reads",
"total_writes",
}
engine.AddEngineStats(keys, &acc, tags)
for _, metric := range missing_keys {
assert.False(t, acc.HasIntValue(metric))
}
}
func TestAddStorageStats(t *testing.T) {
storage := &Storage{
Cache: Cache{
BytesInUse: 0,
},
Disk: Disk{
ReadBytesPerSec: 0,
ReadBytesTotal: 0,
WriteBytesPerSec: 0,
WriteBytesTotal: 0,
SpaceUsage: SpaceUsage{
Data: 0,
Garbage: 0,
Metadata: 0,
Prealloc: 0,
},
},
}
var acc testutil.Accumulator
keys := []string{
"cache_bytes_in_use",
"disk_read_bytes_per_sec",
"disk_read_bytes_total",
"disk_written_bytes_per_sec",
"disk_written_bytes_total",
"disk_usage_data_bytes",
"disk_usage_garbage_bytes",
"disk_usage_metadata_bytes",
"disk_usage_preallocated_bytes",
}
storage.AddStats(&acc, tags)
for _, metric := range keys {
assert.True(t, acc.HasIntValue(metric))
}
}

View File

@@ -0,0 +1,193 @@
package rethinkdb
import (
"errors"
"fmt"
"net"
"net/url"
"regexp"
"strconv"
"strings"
"github.com/influxdb/telegraf/plugins"
"gopkg.in/dancannon/gorethink.v1"
)
type Server struct {
Url *url.URL
session *gorethink.Session
serverStatus serverStatus
}
func (s *Server) gatherData(acc plugins.Accumulator) error {
if err := s.getServerStatus(); err != nil {
return fmt.Errorf("Failed to get server_status, %s\n", err)
}
if err := s.validateVersion(); err != nil {
return fmt.Errorf("Failed version validation, %s\n", err.Error())
}
if err := s.addClusterStats(acc); err != nil {
fmt.Printf("error adding cluster stats, %s\n", err.Error())
return fmt.Errorf("Error adding cluster stats, %s\n", err.Error())
}
if err := s.addMemberStats(acc); err != nil {
return fmt.Errorf("Error adding member stats, %s\n", err.Error())
}
if err := s.addTableStats(acc); err != nil {
return fmt.Errorf("Error adding table stats, %s\n", err.Error())
}
return nil
}
func (s *Server) validateVersion() error {
if s.serverStatus.Process.Version == "" {
return errors.New("could not determine the RethinkDB server version: process.version key missing")
}
versionRegexp := regexp.MustCompile("\\d.\\d.\\d")
versionString := versionRegexp.FindString(s.serverStatus.Process.Version)
if versionString == "" {
return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version)
}
majorVersion, err := strconv.Atoi(strings.Split(versionString, "")[0])
if err != nil || majorVersion < 2 {
return fmt.Errorf("unsupported major version %s\n", versionString)
}
return nil
}
func (s *Server) getServerStatus() error {
cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session)
if err != nil {
return err
}
if cursor.IsNil() {
return errors.New("could not determine the RethinkDB server version: no rows returned from the server_status table")
}
defer cursor.Close()
var serverStatuses []serverStatus
err = cursor.All(&serverStatuses)
if err != nil {
return errors.New("could not parse server_status results")
}
host, port, err := net.SplitHostPort(s.Url.Host)
if err != nil {
return fmt.Errorf("unable to determine provided hostname from %s\n", s.Url.Host)
}
driverPort, _ := strconv.Atoi(port)
for _, ss := range serverStatuses {
for _, address := range ss.Network.Addresses {
if address.Host == host && ss.Network.DriverPort == driverPort {
s.serverStatus = ss
return nil
}
}
}
return fmt.Errorf("unable to determine host id from server_status with %s", s.Url.Host)
}
func (s *Server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["host"] = s.Url.Host
tags["hostname"] = s.serverStatus.Network.Hostname
return tags
}
var ClusterTracking = []string{
"active_clients",
"clients",
"queries_per_sec",
"read_docs_per_sec",
"written_docs_per_sec",
}
func (s *Server) addClusterStats(acc plugins.Accumulator) error {
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session)
if err != nil {
return fmt.Errorf("cluster stats query error, %s\n", err.Error())
}
defer cursor.Close()
var clusterStats stats
if err := cursor.One(&clusterStats); err != nil {
return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error())
}
tags := s.getDefaultTags()
tags["type"] = "cluster"
clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags)
return nil
}
var MemberTracking = []string{
"active_clients",
"clients",
"queries_per_sec",
"total_queries",
"read_docs_per_sec",
"total_reads",
"written_docs_per_sec",
"total_writes",
}
func (s *Server) addMemberStats(acc plugins.Accumulator) error {
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session)
if err != nil {
return fmt.Errorf("member stats query error, %s\n", err.Error())
}
defer cursor.Close()
var memberStats stats
if err := cursor.One(&memberStats); err != nil {
return fmt.Errorf("failure to parse member stats, $s\n", err.Error())
}
tags := s.getDefaultTags()
tags["type"] = "member"
memberStats.Engine.AddEngineStats(MemberTracking, acc, tags)
return nil
}
var TableTracking = []string{
"read_docs_per_sec",
"total_reads",
"written_docs_per_sec",
"total_writes",
}
func (s *Server) addTableStats(acc plugins.Accumulator) error {
tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session)
defer tablesCursor.Close()
var tables []tableStatus
err = tablesCursor.All(&tables)
if err != nil {
return errors.New("could not parse table_status results")
}
for _, table := range tables {
cursor, err := gorethink.DB("rethinkdb").Table("stats").
Get([]string{"table_server", table.Id, s.serverStatus.Id}).
Run(s.session)
if err != nil {
return fmt.Errorf("table stats query error, %s\n", err.Error())
}
defer cursor.Close()
var ts tableStats
if err := cursor.One(&ts); err != nil {
return fmt.Errorf("failure to parse table stats, %s\n", err.Error())
}
tags := s.getDefaultTags()
tags["type"] = "data"
tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name)
ts.Engine.AddEngineStats(TableTracking, acc, tags)
ts.Storage.AddStats(acc, tags)
}
return nil
}

View File

@@ -0,0 +1,81 @@
// +build integration
package rethinkdb
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestValidateVersion(t *testing.T) {
err := server.validateVersion()
require.NoError(t, err)
}
func TestGetDefaultTags(t *testing.T) {
var tagTests = []struct {
in string
out string
}{
{"host", server.Url.Host},
{"hostname", server.serverStatus.Network.Hostname},
}
defaultTags := server.getDefaultTags()
for _, tt := range tagTests {
if defaultTags[tt.in] != tt.out {
t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in])
}
}
}
func TestAddClusterStats(t *testing.T) {
var acc testutil.Accumulator
err := server.addClusterStats(&acc)
require.NoError(t, err)
for _, metric := range ClusterTracking {
assert.True(t, acc.HasIntValue(metric))
}
}
func TestAddMemberStats(t *testing.T) {
var acc testutil.Accumulator
err := server.addMemberStats(&acc)
require.NoError(t, err)
for _, metric := range MemberTracking {
assert.True(t, acc.HasIntValue(metric))
}
}
func TestAddTableStats(t *testing.T) {
var acc testutil.Accumulator
err := server.addTableStats(&acc)
require.NoError(t, err)
for _, metric := range TableTracking {
assert.True(t, acc.HasIntValue(metric))
}
keys := []string{
"cache_bytes_in_use",
"disk_read_bytes_per_sec",
"disk_read_bytes_total",
"disk_written_bytes_per_sec",
"disk_written_bytes_total",
"disk_usage_data_bytes",
"disk_usage_garbage_bytes",
"disk_usage_metadata_bytes",
"disk_usage_preallocated_bytes",
}
for _, metric := range keys {
assert.True(t, acc.HasIntValue(metric))
}
}

View File

@@ -0,0 +1,59 @@
// +build integration
package rethinkdb
import (
"log"
"math/rand"
"net/url"
"os"
"testing"
"time"
"gopkg.in/dancannon/gorethink.v1"
)
var connect_url, authKey string
var server *Server
func init() {
connect_url = os.Getenv("RETHINKDB_URL")
if connect_url == "" {
connect_url = "127.0.0.1:28015"
}
authKey = os.Getenv("RETHINKDB_AUTHKEY")
}
func testSetup(m *testing.M) {
var err error
server = &Server{Url: &url.URL{Host: connect_url}}
server.session, _ = gorethink.Connect(gorethink.ConnectOpts{
Address: server.Url.Host,
AuthKey: authKey,
DiscoverHosts: false,
})
if err != nil {
log.Fatalln(err.Error())
}
err = server.getServerStatus()
if err != nil {
log.Fatalln(err.Error())
}
}
func testTeardown(m *testing.M) {
server.session.Close()
}
func TestMain(m *testing.M) {
// seed randomness for use with tests
rand.Seed(time.Now().UTC().UnixNano())
testSetup(m)
res := m.Run()
testTeardown(m)
os.Exit(res)
}

View File

@@ -55,9 +55,12 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
}
for _, io := range diskio {
tags := map[string]string{
"name": io.Name,
"serial": io.SerialNumber,
tags := map[string]string{}
if len(io.Name) != 0 {
tags["name"] = io.Name
}
if len(io.SerialNumber) != 0 {
tags["serial"] = io.SerialNumber
}
acc.Add("reads", io.ReadCount, tags)

View File

@@ -272,7 +272,9 @@ func TestSystemStats_GenerateStats(t *testing.T) {
require.NoError(t, err)
dockertags := map[string]string{
"id": "blah",
"name": "blah",
"id": "",
"command": "",
}
assert.True(t, acc.CheckTaggedValue("user", 3.1, dockertags))

View File

@@ -15,4 +15,4 @@ build() {
build "darwin" "amd64"
build "linux" "amd64"
build "linux" "386"
build "linux" "arm"

186
scripts/init.sh Executable file
View File

@@ -0,0 +1,186 @@
#! /usr/bin/env bash
### BEGIN INIT INFO
# Provides: telegraf
# Required-Start: $all
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Start telegraf at boot time
### END INIT INFO
# this init script supports three different variations:
# 1. New lsb that define start-stop-daemon
# 2. Old lsb that don't have start-stop-daemon but define, log, pidofproc and killproc
# 3. Centos installations without lsb-core installed
#
# In the third case we have to define our own functions which are very dumb
# and expect the args to be positioned correctly.
# Command-line options that can be set in /etc/default/telegraf. These will override
# any config file values.
TELEGRAF_OPTS=
USER=telegraf
GROUP=telegraf
if [ -r /lib/lsb/init-functions ]; then
source /lib/lsb/init-functions
fi
DEFAULT=/etc/default/telegraf
if [ -r $DEFAULT ]; then
source $DEFAULT
fi
if [ -z "$STDOUT" ]; then
STDOUT=/dev/null
fi
if [ ! -f "$STDOUT" ]; then
mkdir -p `dirname $STDOUT`
fi
if [ -z "$STDERR" ]; then
STDERR=/var/log/influxdb/telegraf.log
fi
if [ ! -f "$STDERR" ]; then
mkdir -p `dirname $STDERR`
fi
OPEN_FILE_LIMIT=65536
function pidofproc() {
if [ $# -ne 3 ]; then
echo "Expected three arguments, e.g. $0 -p pidfile daemon-name"
fi
pid=`pgrep -f $3`
local pidfile=`cat $2`
if [ "x$pidfile" == "x" ]; then
return 1
fi
if [ "x$pid" != "x" -a "$pidfile" == "$pid" ]; then
return 0
fi
return 1
}
function killproc() {
if [ $# -ne 3 ]; then
echo "Expected three arguments, e.g. $0 -p pidfile signal"
fi
pid=`cat $2`
kill -s $3 $pid
}
function log_failure_msg() {
echo "$@" "[ FAILED ]"
}
function log_success_msg() {
echo "$@" "[ OK ]"
}
# Process name ( For display )
name=telegraf
# Daemon name, where is the actual executable
daemon=/opt/influxdb/telegraf
# pid file for the daemon
pidfile=/var/run/influxdb/telegraf.pid
piddir=`dirname $pidfile`
if [ ! -d "$piddir" ]; then
mkdir -p $piddir
chown $GROUP:$USER $piddir
fi
# Configuration file
config=/etc/opt/influxdb/telegraf.conf
# If the daemon is not there, then exit.
[ -x $daemon ] || exit 5
case $1 in
start)
# Checked the PID file exists and check the actual status of process
if [ -e $pidfile ]; then
pidofproc -p $pidfile $daemon > /dev/null 2>&1 && status="0" || status="$?"
# If the status is SUCCESS then don't need to start again.
if [ "x$status" = "x0" ]; then
log_failure_msg "$name process is running"
exit 0 # Exit
fi
fi
# Bump the file limits, before launching the daemon. These will carry over to
# launched processes.
ulimit -n $OPEN_FILE_LIMIT
if [ $? -ne 0 ]; then
log_failure_msg "set open file limit to $OPEN_FILE_LIMIT"
fi
log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid $GROUP:$USER --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
else
nohup $daemon -pidfile $pidfile -config $config $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
fi
log_success_msg "$name process was started"
;;
stop)
# Stop the daemon.
if [ -e $pidfile ]; then
pidofproc -p $pidfile $daemon > /dev/null 2>&1 && status="0" || status="$?"
if [ "$status" = 0 ]; then
if killproc -p $pidfile SIGTERM && /bin/rm -rf $pidfile; then
log_success_msg "$name process was stopped"
else
log_failure_msg "$name failed to stop service"
fi
fi
else
log_failure_msg "$name process is not running"
fi
;;
restart)
# Restart the daemon.
$0 stop && sleep 2 && $0 start
;;
status)
# Check the status of the process.
if [ -e $pidfile ]; then
if pidofproc -p $pidfile $daemon > /dev/null; then
log_success_msg "$name Process is running"
exit 0
else
log_failure_msg "$name Process is not running"
exit 1
fi
else
log_failure_msg "$name Process is not running"
exit 3
fi
;;
version)
$daemon version
;;
*)
# For invalid arguments, print the usage message.
echo "Usage: $0 {start|stop|restart|status|version}"
exit 2
;;
esac

View File

@@ -2,6 +2,7 @@ package testutil
import (
"fmt"
"reflect"
"time"
)
@@ -18,6 +19,9 @@ type Accumulator struct {
}
func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) {
if tags == nil {
tags = map[string]string{}
}
a.Points = append(
a.Points,
&Point{
@@ -70,30 +74,23 @@ func (a *Accumulator) CheckTaggedValue(measurement string, val interface{}, tags
}
func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, tags map[string]string) error {
if tags == nil {
tags = map[string]string{}
}
for _, p := range a.Points {
var found bool
if p.Tags == nil && tags == nil {
found = true
} else {
for k, v := range p.Tags {
if tags[k] == v {
found = true
break
}
}
if !reflect.DeepEqual(tags, p.Tags) {
continue
}
if found && p.Measurement == measurement {
if p.Measurement == measurement {
if p.Value != val {
return fmt.Errorf("%v (%T) != %v (%T)", p.Value, p.Value, val, val)
}
return nil
}
}
return fmt.Errorf("unknown value %s with tags %v", measurement, tags)
return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags)
}
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {