Compare commits

...

14 Commits

Author SHA1 Message Date
Daniel Nelson
27b89dff48 Only split metrics if there is an udp output (#2799) 2017-05-12 15:34:31 -07:00
Sebastian Borza
b16eb6eae6 split metrics based on UDPPayload size (#2795) 2017-05-12 14:42:18 -07:00
Daniel Nelson
feaf76913b Add missing plugins to README 2017-05-09 13:51:26 -07:00
Daniel Nelson
ff704fbe0d Add SLES11 support to rpm package (#2768) 2017-05-05 14:30:31 -07:00
Sébastien
ebef47f56a fix systemd path in order to add compatibility with SuSe (#2499) 2017-05-05 14:30:24 -07:00
Daniel Nelson
18fd2d987d Return an error if no valid patterns. (#2753) 2017-05-02 14:55:16 -07:00
Alexander Blagoev
5e70cb3e44 Improve redis input documentation (#2708) 2017-05-02 14:12:09 -07:00
Patrick Hemmer
ce203dc687 fix close on closed socket_writer (#2748) 2017-05-02 11:07:58 -07:00
Daniel Nelson
b0a2e8e1bd Add initial documentation for rabbitmq input. (#2745) 2017-05-01 18:57:19 -07:00
Daniel Nelson
499495f844 Don't log error creating database on connect (#2740)
closes #2739
2017-04-28 15:59:28 -07:00
Daniel Nelson
20ab8fb2c3 Update telegraf.conf 2017-04-28 13:49:09 -07:00
Daniel Nelson
bc474d3a53 Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:48:24 -07:00
Daniel Nelson
547be87d79 Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:43:00 -07:00
Daniel Nelson
619d4d5d29 Use go 1.8.1 for CI and Release builds (#2732) 2017-04-27 16:22:41 -07:00
19 changed files with 338 additions and 94 deletions

View File

@@ -79,6 +79,8 @@ be deprecated eventually.
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option - [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin - [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
### Bugfixes ### Bugfixes
@@ -118,6 +120,7 @@ be deprecated eventually.
- [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale - [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale
- [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload. - [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload.
- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs - [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs
- [#2706](https://github.com/influxdata/telegraf/issues/2706): Improve documentation for redis input plugin
## v1.2.1 [2017-02-01] ## v1.2.1 [2017-02-01]

View File

@@ -111,6 +111,7 @@ configuration options.
* [couchbase](./plugins/inputs/couchbase) * [couchbase](./plugins/inputs/couchbase)
* [couchdb](./plugins/inputs/couchdb) * [couchdb](./plugins/inputs/couchdb)
* [disque](./plugins/inputs/disque) * [disque](./plugins/inputs/disque)
* [dmcache](./plugins/inputs/dmcache)
* [dns query time](./plugins/inputs/dns_query) * [dns query time](./plugins/inputs/dns_query)
* [docker](./plugins/inputs/docker) * [docker](./plugins/inputs/docker)
* [dovecot](./plugins/inputs/dovecot) * [dovecot](./plugins/inputs/dovecot)
@@ -127,6 +128,7 @@ configuration options.
* [ipmi_sensor](./plugins/inputs/ipmi_sensor) * [ipmi_sensor](./plugins/inputs/ipmi_sensor)
* [iptables](./plugins/inputs/iptables) * [iptables](./plugins/inputs/iptables)
* [jolokia](./plugins/inputs/jolokia) * [jolokia](./plugins/inputs/jolokia)
* [kapacitor](./plugins/inputs/kapacitor)
* [kubernetes](./plugins/inputs/kubernetes) * [kubernetes](./plugins/inputs/kubernetes)
* [leofs](./plugins/inputs/leofs) * [leofs](./plugins/inputs/leofs)
* [lustre2](./plugins/inputs/lustre2) * [lustre2](./plugins/inputs/lustre2)
@@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
* [github](./plugins/inputs/webhooks/github) * [github](./plugins/inputs/webhooks/github)
* [mandrill](./plugins/inputs/webhooks/mandrill) * [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar) * [rollbar](./plugins/inputs/webhooks/rollbar)
* [papertrail](./plugins/inputs/webhooks/papertrail)
Telegraf is able to parse the following input data formats into metrics, these Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option: formats may be used with input plugins supporting the `data_format` option:

View File

@@ -1,13 +1,11 @@
machine: machine:
go:
version: 1.8.1
services: services:
- docker - docker
post: - memcached
- sudo service zookeeper stop - redis
- go version - rabbitmq-server
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.8.linux-amd64.tar.gz
- go version
dependencies: dependencies:
override: override:

View File

@@ -95,7 +95,8 @@
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@@ -141,7 +142,7 @@
# ## described here: https://www.rabbitmq.com/plugins.html # ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN" # # auth_method = "PLAIN"
# ## Telegraf tag to use as a routing key # ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, it's value will be used as the routing key # ## ie, if this tag exists, its value will be used as the routing key
# routing_tag = "host" # routing_tag = "host"
# #
# ## InfluxDB retention policy # ## InfluxDB retention policy
@@ -335,6 +336,10 @@
# ## Use SSL but skip chain & host verification # ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false # # insecure_skip_verify = false
# #
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to output. # ## Data format to output.
# ## Each data format has its own unique set of configuration options, read # ## Each data format has its own unique set of configuration options, read
# ## more about them here: # ## more about them here:
@@ -1325,6 +1330,18 @@
# attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" # attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
# # Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints
# [[inputs.kapacitor]]
# ## Multiple URLs from which to read Kapacitor-formatted JSON
# ## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
# urls = [
# "http://localhost:9092/kapacitor/v1/debug/vars"
# ]
#
# ## Time limit for http requests
# timeout = "5s"
# # Get kernel statistics from /proc/vmstat # # Get kernel statistics from /proc/vmstat
# [[inputs.kernel_vmstat]] # [[inputs.kernel_vmstat]]
# # no configuration # # no configuration

View File

@@ -118,11 +118,18 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named // Give Patterns fake names so that they can be treated as named
// "custom patterns" // "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns)) p.namedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns { for i, pattern := range p.Patterns {
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n" p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}" p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
}
if len(p.namedPatterns) == 0 {
return fmt.Errorf("pattern required")
} }
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse

View File

@@ -117,16 +117,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
// compile log parser patterns: // compile log parser patterns:
var haveError bool
for _, parser := range l.parsers { for _, parser := range l.parsers {
if err := parser.Compile(); err != nil { if err := parser.Compile(); err != nil {
acc.AddError(err) return err
haveError = true
} }
} }
if haveError {
return nil
}
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()

View File

@@ -38,12 +38,8 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
logparser.Start(&acc) err := logparser.Start(&acc)
if assert.NotEmpty(t, acc.Errors) { assert.Error(t, err)
assert.Error(t, acc.Errors[0])
}
logparser.Stop()
} }
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {

View File

@@ -0,0 +1,121 @@
# RabbitMQ Input Plugin
Reads metrics from RabbitMQ servers via the [Management Plugin](https://www.rabbitmq.com/management.html).
For additional details reference the [RabbitMQ Management HTTP Stats](https://cdn.rawgit.com/rabbitmq/rabbitmq-management/master/priv/www/doc/stats.html).
### Configuration:
```toml
[[inputs.rabbitmq]]
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional request timeouts
##
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
## for a server's response headers after fully writing the request.
# header_timeout = "3s"
##
## client_timeout specifies a time limit for requests made by this client.
## Includes connection time, any redirects, and reading the response body.
# client_timeout = "4s"
## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
```
### Measurements & Fields:
- rabbitmq_overview
- channels (int, channels)
- connections (int, connections)
- consumers (int, consumers)
- exchanges (int, exchanges)
- messages (int, messages)
- messages_acked (int, messages)
- messages_delivered (int, messages)
- messages_published (int, messages)
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- rabbitmq_queue
- consumer_utilisation (float, percent)
- consumers (int, int)
- idle_since (string, time - e.g., "2006-01-02 15:04:05")
- memory (int, bytes)
- message_bytes (int, bytes)
- message_bytes_persist (int, bytes)
- message_bytes_ram (int, bytes)
- message_bytes_ready (int, bytes)
- message_bytes_unacked (int, bytes)
- messages (int, count)
- messages_ack (int, count)
- messages_ack_rate (float, messages per second)
- messages_deliver (int, count)
- messages_deliver_rate (float, messages per second)
- messages_deliver_get (int, count)
- messages_deliver_get_rate (float, messages per second)
- messages_publish (int, count)
- messages_publish_rate (float, messages per second)
- messages_ready (int, count)
- messages_redeliver (int, count)
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)
### Tags:
- All measurements have the following tags:
- url
- rabbitmq_overview
- name
- rabbitmq_node
- node
- rabbitmq_queue
- url
- queue
- vhost
- node
- durable
- auto_delete
### Sample Queries:
### Example Output:
```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
```

View File

@@ -140,8 +140,11 @@ type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = ` var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672" # url = "http://localhost:15672"
# name = "rmq-server-1" # optional tag ## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
@@ -174,7 +177,7 @@ func (r *RabbitMQ) SampleConfig() string {
// Description ... // Description ...
func (r *RabbitMQ) Description() string { func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Reads metrics from RabbitMQ servers via the Management Plugin"
} }
// Gather ... // Gather ...

View File

@@ -18,47 +18,103 @@
### Measurements & Fields: ### Measurements & Fields:
- Measurement The plugin gathers the results of the [INFO](https://redis.io/commands/info) redis command.
- uptime_in_seconds There are two separate measurements: _redis_ and _redis\_keyspace_, the latter is used for gathering database related statistics.
- connected_clients
- used_memory Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) and the elapsed time since the last rdb save (rdb\_last\_save\_time\_elapsed).
- used_memory_rss
- used_memory_peak - redis
- used_memory_lua - keyspace_hitrate(float, number)
- rdb_changes_since_last_save - rdb_last_save_time_elapsed(int, seconds)
- total_connections_received
- total_commands_processed **Server**
- instantaneous_ops_per_sec - uptime(int, seconds)
- instantaneous_input_kbps - lru_clock(int, number)
- instantaneous_output_kbps
- sync_full **Clients**
- sync_partial_ok - clients(int, number)
- sync_partial_err - client_longest_output_list(int, number)
- expired_keys - client_biggest_input_buf(int, number)
- evicted_keys - blocked_clients(int, number)
- keyspace_hits
- keyspace_misses **Memory**
- pubsub_channels - used_memory(int, bytes)
- pubsub_patterns - used_memory_rss(int, bytes)
- latest_fork_usec - used_memory_peak(int, bytes)
- connected_slaves - total_system_memory(int, bytes)
- master_repl_offset - used_memory_lua(int, bytes)
- master_last_io_seconds_ago - maxmemory(int, bytes)
- repl_backlog_active - maxmemory_policy(string)
- repl_backlog_size - mem_fragmentation_ratio(float, number)
- repl_backlog_histlen
- mem_fragmentation_ratio **Persistance**
- used_cpu_sys - loading(int,flag)
- used_cpu_user - rdb_changes_since_last_save(int, number)
- used_cpu_sys_children - rdb_bgsave_in_progress(int, flag)
- used_cpu_user_children - rdb_last_save_time(int, seconds)
- rdb_last_bgsave_status(string)
- rdb_last_bgsave_time_sec(int, seconds)
- rdb_current_bgsave_time_sec(int, seconds)
- aof_enabled(int, flag)
- aof_rewrite_in_progress(int, flag)
- aof_rewrite_scheduled(int, flag)
- aof_last_rewrite_time_sec(int, seconds)
- aof_current_rewrite_time_sec(int, seconds)
- aof_last_bgrewrite_status(string)
- aof_last_write_status(string)
**Stats**
- total_connections_received(int, number)
- total_commands_processed(int, number)
- instantaneous_ops_per_sec(int, number)
- total_net_input_bytes(int, bytes)
- total_net_output_bytes(int, bytes)
- instantaneous_input_kbps(float, bytes)
- instantaneous_output_kbps(float, bytes)
- rejected_connections(int, number)
- sync_full(int, number)
- sync_partial_ok(int, number)
- sync_partial_err(int, number)
- expired_keys(int, number)
- evicted_keys(int, number)
- keyspace_hits(int, number)
- keyspace_misses(int, number)
- pubsub_channels(int, number)
- pubsub_patterns(int, number)
- latest_fork_usec(int, microseconds)
- migrate_cached_sockets(int, number)
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)
- repl_backlog_histlen(int, bytes)
**CPU**
- used_cpu_sys(float, number)
- used_cpu_user(float, number)
- used_cpu_sys_children(float, number)
- used_cpu_user_children(float, number)
**Cluster**
- cluster_enabled(int, flag)
- redis_keyspace
- keys(int, number)
- expires(int, number)
- avg_ttl(int, number)
### Tags: ### Tags:
- All measurements have the following tags: - All measurements have the following tags:
- port - port
- server - server
- replication role - replication_role
- The redis_keyspace measurement has an additional database tag:
- database
### Example Output: ### Example Output:
@@ -84,5 +140,10 @@ When run with:
It produces: It produces:
``` ```
* Plugin: redis, Collection 1 * Plugin: redis, Collection 1
> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383 > redis,server=localhost,port=6379,replication_role=master,host=host keyspace_hitrate=1,clients=2i,blocked_clients=0i,instantaneous_input_kbps=0,sync_full=0i,pubsub_channels=0i,pubsub_patterns=0i,total_net_output_bytes=6659253i,used_memory=842448i,total_system_memory=8351916032i,aof_current_rewrite_time_sec=-1i,rdb_changes_since_last_save=0i,sync_partial_err=0i,latest_fork_usec=508i,instantaneous_output_kbps=0,expired_keys=0i,used_memory_peak=843416i,aof_rewrite_in_progress=0i,aof_last_bgrewrite_status="ok",migrate_cached_sockets=0i,connected_slaves=0i,maxmemory_policy="noeviction",aof_rewrite_scheduled=0i,total_net_input_bytes=3125i,used_memory_rss=9564160i,repl_backlog_histlen=0i,rdb_last_bgsave_status="ok",aof_last_rewrite_time_sec=-1i,keyspace_misses=0i,client_biggest_input_buf=5i,used_cpu_user=1.33,maxmemory=0i,rdb_current_bgsave_time_sec=-1i,total_commands_processed=271i,repl_backlog_size=1048576i,used_cpu_sys=3,uptime=2822i,lru_clock=16706281i,used_memory_lua=37888i,rejected_connections=0i,sync_partial_ok=0i,evicted_keys=0i,rdb_last_save_time_elapsed=1922i,rdb_last_save_time=1493099368i,instantaneous_ops_per_sec=0i,used_cpu_user_children=0,client_longest_output_list=0i,master_repl_offset=0i,repl_backlog_active=0i,keyspace_hits=2i,used_cpu_sys_children=0,cluster_enabled=0i,rdb_last_bgsave_time_sec=0i,aof_last_write_status="ok",total_connections_received=263i,aof_enabled=0i,repl_backlog_first_byte_offset=0i,mem_fragmentation_ratio=11.35,loading=0i,rdb_bgsave_in_progress=0i 1493101290000000000
```
redis_keyspace:
```
> redis_keyspace,database=db1,host=host,server=localhost,port=6379,replication_role=master keys=1i,expires=0i,avg_ttl=0i 1493101350000000000
``` ```

View File

@@ -18,7 +18,8 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@@ -52,7 +53,7 @@ to write to. Each URL should start with either `http://` or `udp://`
### Optional parameters: ### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all". * `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Retention policy to write to. * `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended). * `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb * `username`: Username for influxdb
* `password`: Password for influxdb * `password`: Password for influxdb

View File

@@ -25,6 +25,7 @@ type UDPConfig struct {
PayloadSize int PayloadSize int
} }
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) { func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL) p, err := url.Parse(config.URL)
if err != nil { if err != nil {
@@ -55,20 +56,22 @@ type udpClient struct {
buffer []byte buffer []byte
} }
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error { func (c *udpClient) Query(command string) error {
return nil return nil
} }
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) { func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// write params are ignored by the UDP client // WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// contentLength is ignored by the UDP client. // WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int var totaln int
for { for {
@@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
return totaln, nil return totaln, nil
} }
// contentLength is ignored by the UDP client. // WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client // write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1) return c.WriteStream(r, -1)
} }
// Close will terminate the provided client connection
func (c *udpClient) Close() error { func (c *udpClient) Close() error {
return c.conn.Close() return c.conn.Close()
} }

View File

@@ -2,6 +2,7 @@ package influxdb
import ( import (
"fmt" "fmt"
"io"
"log" "log"
"math/rand" "math/rand"
"strings" "strings"
@@ -15,6 +16,7 @@ import (
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
) )
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct { type InfluxDB struct {
// URL is only for backwards compatability // URL is only for backwards compatability
URL string URL string
@@ -40,7 +42,8 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored. // Precision is only here for legacy support. It will be ignored.
Precision string Precision string
clients []client.Client clients []client.Client
splitPayload bool
} }
var sampleConfig = ` var sampleConfig = `
@@ -55,7 +58,8 @@ var sampleConfig = `
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@@ -78,11 +82,10 @@ var sampleConfig = `
# insecure_skip_verify = false # insecure_skip_verify = false
` `
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
var urls []string var urls []string
for _, u := range i.URLs { urls = append(urls, i.URLs...)
urls = append(urls, u)
}
// Backward-compatability with single Influx URL config files // Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list // This could eventually be removed in favor of specifying the urls as a list
@@ -108,6 +111,7 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
i.splitPayload = true
default: default:
// If URL doesn't start with "udp", assume HTTP client // If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{ config := client.HTTPConfig{
@@ -131,7 +135,9 @@ func (i *InfluxDB) Connect() error {
err = c.Query("CREATE DATABASE " + i.Database) err = c.Query("CREATE DATABASE " + i.Database)
if err != nil { if err != nil {
log.Println("E! Database creation failed: " + err.Error()) if !strings.Contains(err.Error(), "Status Code [403]") {
log.Println("I! Database creation failed: " + err.Error())
}
continue continue
} }
} }
@@ -141,26 +147,41 @@ func (i *InfluxDB) Connect() error {
return nil return nil
} }
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error { func (i *InfluxDB) Close() error {
return nil return nil
} }
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) SampleConfig() string { func (i *InfluxDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string { func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
// Choose a random server in the cluster to write to until a successful write func (i *InfluxDB) getReader(metrics []telegraf.Metric) io.Reader {
if !i.splitPayload {
return metric.NewReader(metrics)
}
splitData := make([]telegraf.Metric, 0)
for _, m := range metrics {
splitData = append(splitData, m.Split(i.UDPPayload)...)
}
return metric.NewReader(splitData)
}
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0 bufsize := 0
for _, m := range metrics { for _, m := range metrics {
bufsize += m.Len() bufsize += m.Len()
} }
r := metric.NewReader(metrics) r := i.getReader(metrics)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err := fmt.Errorf("Could not write to any InfluxDB server in cluster") err := fmt.Errorf("Could not write to any InfluxDB server in cluster")

View File

@@ -124,6 +124,16 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
// Close closes the connection. Noop if already closed.
func (sw *SocketWriter) Close() error {
if sw.Conn == nil {
return nil
}
err := sw.Conn.Close()
sw.Conn = nil
return err
}
func newSocketWriter() *SocketWriter { func newSocketWriter() *SocketWriter {
s, _ := serializers.NewInfluxSerializer() s, _ := serializers.NewInfluxSerializer()
return &SocketWriter{ return &SocketWriter{

View File

@@ -143,7 +143,7 @@ func TestSocketWriter_Write_err(t *testing.T) {
// close the socket to generate an error // close the socket to generate an error
lconn.Close() lconn.Close()
sw.Close() sw.Conn.Close()
err = sw.Write(metrics) err = sw.Write(metrics)
require.Error(t, err) require.Error(t, err)
assert.Nil(t, sw.Conn) assert.Nil(t, sw.Conn)

View File

@@ -636,6 +636,10 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
elif package_type not in ['zip', 'tar'] and static or "static_" in arch: elif package_type not in ['zip', 'tar'] and static or "static_" in arch:
logging.info("Skipping package type '{}' for static builds.".format(package_type)) logging.info("Skipping package type '{}' for static builds.".format(package_type))
else: else:
if package_type == 'rpm' and release and '~' in package_version:
package_version, suffix = package_version.split('~', 1)
# The ~ indicatees that this is a prerelease so we give it a leading 0.
package_iteration = "0.%s" % suffix
fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format( fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format(
fpm_common_args, fpm_common_args,
name, name,
@@ -664,9 +668,6 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
if package_type == 'rpm': if package_type == 'rpm':
# rpm's convert any dashes to underscores # rpm's convert any dashes to underscores
package_version = package_version.replace("-", "_") package_version = package_version.replace("-", "_")
new_outfile = outfile.replace("{}-{}".format(package_version, package_iteration), package_version)
os.rename(outfile, new_outfile)
outfile = new_outfile
outfiles.append(os.path.join(os.getcwd(), outfile)) outfiles.append(os.path.join(os.getcwd(), outfile))
logging.debug("Produced package files: {}".format(outfiles)) logging.debug("Produced package files: {}".format(outfiles))
return outfiles return outfiles

View File

@@ -135,7 +135,9 @@ case $1 in
fi fi
log_success_msg "Starting the process" "$name" log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then if command -v startproc >/dev/null; then
startproc -u "$USER" -g "$GROUP" -p "$pidfile" -q -- "$daemon" -pidfile "$pidfile" -config "$config" -config-directory "$confdir" $TELEGRAF_OPTS
elif which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR & start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
else else
su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER

View File

@@ -11,7 +11,7 @@ function install_init {
} }
function install_systemd { function install_systemd {
cp -f $SCRIPT_DIR/telegraf.service /lib/systemd/system/telegraf.service cp -f $SCRIPT_DIR/telegraf.service $1
systemctl enable telegraf || true systemctl enable telegraf || true
systemctl daemon-reload || true systemctl daemon-reload || true
} }
@@ -24,12 +24,12 @@ function install_chkconfig {
chkconfig --add telegraf chkconfig --add telegraf
} }
if ! grep "^telegraf:" /etc/group &>/dev/null; then
groupadd -r telegraf
fi
if ! id telegraf &>/dev/null; then if ! id telegraf &>/dev/null; then
if ! grep "^telegraf:" /etc/group &>/dev/null; then useradd -r -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf
else
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
fi fi
test -d $LOG_DIR || mkdir -p $LOG_DIR test -d $LOG_DIR || mkdir -p $LOG_DIR
@@ -56,10 +56,10 @@ if [[ ! -d /etc/telegraf/telegraf.d ]]; then
fi fi
# Distribution-specific logic # Distribution-specific logic
if [[ -f /etc/redhat-release ]]; then if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic # RHEL-variant logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd install_systemd /usr/lib/systemd/system/telegraf.service
else else
# Assuming SysVinit # Assuming SysVinit
install_init install_init
@@ -73,10 +73,10 @@ if [[ -f /etc/redhat-release ]]; then
elif [[ -f /etc/debian_version ]]; then elif [[ -f /etc/debian_version ]]; then
# Debian/Ubuntu logic # Debian/Ubuntu logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd install_systemd /lib/systemd/system/telegraf.service
systemctl restart telegraf || echo "WARNING: systemd not running." systemctl restart telegraf || echo "WARNING: systemd not running."
else else
# Assuming SysVinit # Assuming SysVinit
install_init install_init
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then if which update-rc.d &>/dev/null; then
@@ -89,7 +89,7 @@ elif [[ -f /etc/debian_version ]]; then
elif [[ -f /etc/os-release ]]; then elif [[ -f /etc/os-release ]]; then
source /etc/os-release source /etc/os-release
if [[ $ID = "amzn" ]]; then if [[ $ID = "amzn" ]]; then
# Amazon Linux logic # Amazon Linux logic
install_init install_init
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then if which update-rc.d &>/dev/null; then
@@ -97,5 +97,6 @@ elif [[ -f /etc/os-release ]]; then
else else
install_chkconfig install_chkconfig
fi fi
/etc/init.d/telegraf restart
fi fi
fi fi

View File

@@ -2,7 +2,7 @@
function disable_systemd { function disable_systemd {
systemctl disable telegraf systemctl disable telegraf
rm -f /lib/systemd/system/telegraf.service rm -f $1
} }
function disable_update_rcd { function disable_update_rcd {
@@ -15,14 +15,14 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf rm -f /etc/init.d/telegraf
} }
if [[ -f /etc/redhat-release ]]; then if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic # RHEL-variant logic
if [[ "$1" = "0" ]]; then if [[ "$1" = "0" ]]; then
# InfluxDB is no longer installed, remove from init system # InfluxDB is no longer installed, remove from init system
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd disable_systemd /usr/lib/systemd/system/telegraf.service
else else
# Assuming sysv # Assuming sysv
disable_chkconfig disable_chkconfig
@@ -35,7 +35,7 @@ elif [[ -f /etc/debian_version ]]; then
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd disable_systemd /lib/systemd/system/telegraf.service
else else
# Assuming sysv # Assuming sysv
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available