From 375710488df06ce5f2b6af4d234a65a64585fae8 Mon Sep 17 00:00:00 2001 From: Matt Jones Date: Tue, 19 Jul 2016 05:24:06 -0400 Subject: [PATCH 1/9] Add support for self-signed certs to RabbitMQ input plugin (#1503) * add initial support to allow self-signed certs When using self-signed the metrics collection will fail, this will allow the user to specify in the input configuration file if they want to skip certificate verification. This is functionally identical to `curl -k` At some point this functionality should be moved to the agent as it is already implemented identically in several different input plugins. * Add initial comment strings to remove noise These should be properly fleshed out at some point to ensure code completeness * refactor to use generic helper function * fix import statement against fork * update changelog --- CHANGELOG.md | 1 + plugins/inputs/rabbitmq/rabbitmq.go | 57 ++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5aa149a89..517abea96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ should now look like: ### Features +- [#1503](https://github.com/influxdata/telegraf/pull/1503): Add tls support for certs to RabbitMQ input plugin - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin. diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 18d666a08..8a879d179 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -9,35 +9,59 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) +// DefaultUsername will set a default value that corrasponds to the default +// value used by Rabbitmq const DefaultUsername = "guest" + +// DefaultPassword will set a default value that corrasponds to the default +// value used by Rabbitmq const DefaultPassword = "guest" + +// DefaultURL will set a default value that corrasponds to the default value +// used by Rabbitmq const DefaultURL = "http://localhost:15672" +// RabbitMQ defines the configuration necessary for gathering metrics, +// see the sample config for further details type RabbitMQ struct { URL string Name string Username string Password string - Nodes []string - Queues []string + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + // InsecureSkipVerify bool + Nodes []string + Queues []string Client *http.Client } +// OverviewResponse ... type OverviewResponse struct { MessageStats *MessageStats `json:"message_stats"` ObjectTotals *ObjectTotals `json:"object_totals"` QueueTotals *QueueTotals `json:"queue_totals"` } +// Details ... type Details struct { Rate float64 } +// MessageStats ... type MessageStats struct { Ack int64 AckDetails Details `json:"ack_details"` @@ -51,6 +75,7 @@ type MessageStats struct { RedeliverDetails Details `json:"redeliver_details"` } +// ObjectTotals ... type ObjectTotals struct { Channels int64 Connections int64 @@ -59,6 +84,7 @@ type ObjectTotals struct { Queues int64 } +// QueueTotals ... type QueueTotals struct { Messages int64 MessagesReady int64 `json:"messages_ready"` @@ -66,10 +92,11 @@ type QueueTotals struct { MessageBytes int64 `json:"message_bytes"` MessageBytesReady int64 `json:"message_bytes_ready"` MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"` - MessageRam int64 `json:"message_bytes_ram"` + MessageRAM int64 `json:"message_bytes_ram"` MessagePersistent int64 `json:"message_bytes_persistent"` } +// Queue ... type Queue struct { QueueTotals // just to not repeat the same code MessageStats `json:"message_stats"` @@ -83,6 +110,7 @@ type Queue struct { AutoDelete bool `json:"auto_delete"` } +// Node ... type Node struct { Name string @@ -99,6 +127,7 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } +// gatherFunc ... type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} @@ -109,22 +138,40 @@ var sampleConfig = ` # username = "guest" # password = "guest" + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + ## A list of nodes to pull metrics about. If not specified, metrics for ## all nodes are gathered. # nodes = ["rabbit@node1", "rabbit@node2"] ` +// SampleConfig ... func (r *RabbitMQ) SampleConfig() string { return sampleConfig } +// Description ... func (r *RabbitMQ) Description() string { return "Read metrics from one or many RabbitMQ servers via the management API" } +// Gather ... func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { if r.Client == nil { - tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + tlsCfg, err := internal.GetTLSConfig( + r.SSLCert, r.SSLKey, r.SSLCA, r.InsecureSkipVerify) + if err != nil { + return err + } + tr := &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + TLSClientConfig: tlsCfg, + } r.Client = &http.Client{ Transport: tr, Timeout: time.Duration(4 * time.Second), @@ -286,7 +333,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { "message_bytes": queue.MessageBytes, "message_bytes_ready": queue.MessageBytesReady, "message_bytes_unacked": queue.MessageBytesUnacknowledged, - "message_bytes_ram": queue.MessageRam, + "message_bytes_ram": queue.MessageRAM, "message_bytes_persist": queue.MessagePersistent, "messages": queue.Messages, "messages_ready": queue.MessagesReady, From 0be69b8a44aa56fa012b7a24d384de411ad8c962 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20GERMAIN?= Date: Fri, 15 Jul 2016 13:35:32 +0000 Subject: [PATCH 2/9] Make the user able to specify full path for HAproxy stats closes #1499 closes #1019 Do no try to guess HAproxy stats url, just add ";csv" at the end of the url if not present. Signed-off-by: tgermain --- CHANGELOG.md | 1 + plugins/inputs/haproxy/haproxy.go | 18 ++++++++++++------ plugins/inputs/haproxy/haproxy_test.go | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 517abea96..60949047f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ should now look like: - [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin - [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config. - [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors. +- [#1499](https://github.com/influxdata/telegraf/pull/1499): Make the user able to specify full path for HAproxy stats ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/haproxy/haproxy.go b/plugins/inputs/haproxy/haproxy.go index 0a0b3da82..9529bad3f 100644 --- a/plugins/inputs/haproxy/haproxy.go +++ b/plugins/inputs/haproxy/haproxy.go @@ -92,9 +92,11 @@ type haproxy struct { var sampleConfig = ` ## An array of address to gather stats about. Specify an ip on hostname ## with optional port. ie localhost, 10.10.3.33:1936, etc. - - ## If no servers are specified, then default to 127.0.0.1:1936 - servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] + ## Make sure you specify the complete path to the stats endpoint + ## ie 10.10.3.33:1936/haproxy?stats + # + ## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats + servers = ["http://myhaproxy.com:1936/haproxy?stats"] ## Or you can also use local socket ## servers = ["socket:/run/haproxy/admin.sock"] ` @@ -111,7 +113,7 @@ func (r *haproxy) Description() string { // Returns one of the errors encountered while gather stats (if any). func (g *haproxy) Gather(acc telegraf.Accumulator) error { if len(g.Servers) == 0 { - return g.gatherServer("http://127.0.0.1:1936", acc) + return g.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc) } var wg sync.WaitGroup @@ -167,12 +169,16 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { g.client = client } + if !strings.HasSuffix(addr, ";csv") { + addr += "/;csv" + } + u, err := url.Parse(addr) if err != nil { return fmt.Errorf("Unable parse server address '%s': %s", addr, err) } - req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s/;csv", u.Scheme, u.Host, u.Path), nil) + req, err := http.NewRequest("GET", addr, nil) if u.User != nil { p, _ := u.User.Password() req.SetBasicAuth(u.User.Username(), p) @@ -184,7 +190,7 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { } if res.StatusCode != 200 { - return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err) + return fmt.Errorf("Unable to get valid stat result from '%s', http response code : %d", addr, res.StatusCode) } return importCsvResult(res.Body, acc, u.Host) diff --git a/plugins/inputs/haproxy/haproxy_test.go b/plugins/inputs/haproxy/haproxy_test.go index f9057e0cd..befcabd97 100644 --- a/plugins/inputs/haproxy/haproxy_test.go +++ b/plugins/inputs/haproxy/haproxy_test.go @@ -243,7 +243,7 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) { err := r.Gather(&acc) require.Error(t, err) - assert.Contains(t, err.Error(), "127.0.0.1:1936/;csv") + assert.Contains(t, err.Error(), "127.0.0.1:1936/haproxy?stats/;csv") } const csvOutputSample = ` From 5f14ad9fa1e0b375552b6412d3079d5743e756e9 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 19 Jul 2016 11:15:09 +0100 Subject: [PATCH 3/9] clean up and finish aerospike refactor & readme --- plugins/inputs/aerospike/README.md | 280 ++++---------------------- plugins/inputs/aerospike/aerospike.go | 31 ++- plugins/inputs/ceph/README.md | 14 +- 3 files changed, 63 insertions(+), 262 deletions(-) diff --git a/plugins/inputs/aerospike/README.md b/plugins/inputs/aerospike/README.md index 6fb6bb189..60c470cd3 100644 --- a/plugins/inputs/aerospike/README.md +++ b/plugins/inputs/aerospike/README.md @@ -1,265 +1,55 @@ -## Telegraf Plugin: Aerospike +# Aerospike Input Plugin -#### Plugin arguments: -- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000) - -#### Description - -The aerospike plugin queries aerospike server(s) and get node statistics. It also collects stats for +The aerospike plugin queries aerospike server(s) and get node statistics & stats for all the configured namespaces. For what the measurements mean, please consult the [Aerospike Metrics Reference Docs](http://www.aerospike.com/docs/reference/metrics). The metric names, to make it less complicated in querying, have replaced all `-` with `_` as Aerospike metrics come in both forms (no idea why). -# Measurements: -#### Aerospike Statistics [values]: +All metrics are attempted to be cast to integers, then booleans, then strings. -Meta: -- units: Integer +### Measurements: -Measurement names: -- batch_index_queue -- batch_index_unused_buffers -- batch_queue -- batch_tree_count -- client_connections -- data_used_bytes_memory -- index_used_bytes_memory -- info_queue -- migrate_progress_recv -- migrate_progress_send -- migrate_rx_objs -- migrate_tx_objs -- objects -- ongoing_write_reqs -- partition_absent -- partition_actual -- partition_desync -- partition_object_count -- partition_ref_count -- partition_replica -- proxy_in_progress -- query_agg_avg_rec_count -- query_avg_rec_count -- query_lookup_avg_rec_count -- queue -- record_locks -- record_refs -- sindex_used_bytes_memory -- sindex_gc_garbage_cleaned -- system_free_mem_pct -- total_bytes_disk -- total_bytes_memory -- tree_count -- scans_active -- uptime -- used_bytes_disk -- used_bytes_memory -- cluster_size -- waiting_transactions +The aerospike metrics are under two measurement names: -#### Aerospike Statistics [cumulative]: +***aerospike_node***: These are the aerospike **node** measurements, which are +available from the aerospike `statistics` command. -Meta: -- units: Integer + ie, + ``` + telnet localhost 3003 + statistics + ... + ``` -Measurement names: -- batch_errors -- batch_index_complete -- batch_index_errors -- batch_index_initiate -- batch_index_timeout -- batch_initiate -- batch_timeout -- err_duplicate_proxy_request -- err_out_of_space -- err_replica_non_null_node -- err_replica_null_node -- err_rw_cant_put_unique -- err_rw_pending_limit -- err_rw_request_not_found -- err_storage_queue_full -- err_sync_copy_null_master -- err_sync_copy_null_node -- err_tsvc_requests -- err_write_fail_bin_exists -- err_write_fail_generation -- err_write_fail_generation_xdr -- err_write_fail_incompatible_type -- err_write_fail_key_exists -- err_write_fail_key_mismatch -- err_write_fail_not_found -- err_write_fail_noxdr -- err_write_fail_parameter -- err_write_fail_prole_delete -- err_write_fail_prole_generation -- err_write_fail_prole_unknown -- err_write_fail_unknown -- fabric_msgs_rcvd -- fabric_msgs_sent -- heartbeat_received_foreign -- heartbeat_received_self -- migrate_msgs_recv -- migrate_msgs_sent -- migrate_num_incoming_accepted -- migrate_num_incoming_refused -- proxy_action -- proxy_initiate -- proxy_retry -- proxy_retry_new_dest -- proxy_retry_q_full -- proxy_retry_same_dest -- proxy_unproxy -- query_abort -- query_agg -- query_agg_abort -- query_agg_err -- query_agg_success -- query_bad_records -- query_fail -- query_long_queue_full -- query_long_running -- query_lookup_abort -- query_lookup_err -- query_lookups -- query_lookup_success -- query_reqs -- query_short_queue_full -- query_short_running -- query_success -- query_tracked -- read_dup_prole -- reaped_fds -- rw_err_ack_badnode -- rw_err_ack_internal -- rw_err_ack_nomatch -- rw_err_dup_cluster_key -- rw_err_dup_internal -- rw_err_dup_send -- rw_err_write_cluster_key -- rw_err_write_internal -- rw_err_write_send -- sindex_ucgarbage_found -- sindex_gc_locktimedout -- sindex_gc_inactivity_dur -- sindex_gc_activity_dur -- sindex_gc_list_creation_time -- sindex_gc_list_deletion_time -- sindex_gc_objects_validated -- sindex_gc_garbage_found -- stat_cluster_key_err_ack_dup_trans_reenqueue -- stat_cluster_key_err_ack_rw_trans_reenqueue -- stat_cluster_key_prole_retry -- stat_cluster_key_regular_processed -- stat_cluster_key_trans_to_proxy_retry -- stat_deleted_set_object -- stat_delete_success -- stat_duplicate_operation -- stat_evicted_objects -- stat_evicted_objects_time -- stat_evicted_set_objects -- stat_expired_objects -- stat_nsup_deletes_not_shipped -- stat_proxy_errs -- stat_proxy_reqs -- stat_proxy_reqs_xdr -- stat_proxy_success -- stat_read_errs_notfound -- stat_read_errs_other -- stat_read_reqs -- stat_read_reqs_xdr -- stat_read_success -- stat_rw_timeout -- stat_slow_trans_queue_batch_pop -- stat_slow_trans_queue_pop -- stat_slow_trans_queue_push -- stat_write_errs -- stat_write_errs_notfound -- stat_write_errs_other -- stat_write_reqs -- stat_write_reqs_xdr -- stat_write_success -- stat_xdr_pipe_miss -- stat_xdr_pipe_writes -- stat_zero_bin_records -- storage_defrag_corrupt_record -- storage_defrag_wait -- transactions -- basic_scans_succeeded -- basic_scans_failed -- aggr_scans_succeeded -- aggr_scans_failed -- udf_bg_scans_succeeded -- udf_bg_scans_failed -- udf_delete_err_others -- udf_delete_reqs -- udf_delete_success -- udf_lua_errs -- udf_query_rec_reqs -- udf_read_errs_other -- udf_read_reqs -- udf_read_success -- udf_replica_writes -- udf_scan_rec_reqs -- udf_write_err_others -- udf_write_reqs -- udf_write_success -- write_master -- write_prole +***aerospike_namespace***: These are aerospike namespace measurements, which +are available from the aerospike `namespace/` command. -#### Aerospike Statistics [percentage]: + ie, + ``` + telnet localhost 3003 + namespaces + ;;etc. + namespace/ + ... + ``` -Meta: -- units: percent (out of 100) +### Tags: -Measurement names: -- free_pct_disk -- free_pct_memory +All measurements have tags: -# Measurements: -#### Aerospike Namespace Statistics [values]: +- aerospike_host -Meta: -- units: Integer -- tags: `namespace=` +Namespace metrics have tags: -Measurement names: -- available_bin_names -- available_pct -- current_time -- data_used_bytes_memory -- index_used_bytes_memory -- master_objects -- max_evicted_ttl -- max_void_time -- non_expirable_objects -- objects -- prole_objects -- sindex_used_bytes_memory -- total_bytes_disk -- total_bytes_memory -- used_bytes_disk -- used_bytes_memory +- namespace_name -#### Aerospike Namespace Statistics [cumulative]: +### Example Output: -Meta: -- units: Integer -- tags: `namespace=` - -Measurement names: -- evicted_objects -- expired_objects -- set_deleted_objects -- set_evicted_objects - -#### Aerospike Namespace Statistics [percentage]: - -Meta: -- units: percent (out of 100) -- tags: `namespace=` - -Measurement names: -- free_pct_disk -- free_pct_memory +``` +% telegraf --config ~/db/ws/telegraf.conf --input-filter aerospike --test +* Plugin: aerospike, Collection 1 +> aerospike_node,aerospike_host=localhost:3000,host=tars batch_error=0i,batch_index_complete=0i,batch_index_created_buffers=0i,batch_index_destroyed_buffers=0i,batch_index_error=0i,batch_index_huge_buffers=0i,batch_index_initiate=0i,batch_index_queue="0:0,0:0,0:0,0:0",batch_index_timeout=0i,batch_index_unused_buffers=0i,batch_initiate=0i,batch_queue=0i,batch_timeout=0i,client_connections=6i,cluster_integrity=true,cluster_key="8AF422E05281249E",cluster_size=1i,delete_queue=0i,demarshal_error=0i,early_tsvc_batch_sub_error=0i,early_tsvc_client_error=0i,early_tsvc_udf_sub_error=0i,fabric_connections=16i,fabric_msgs_rcvd=0i,fabric_msgs_sent=0i,heartbeat_connections=0i,heartbeat_received_foreign=0i,heartbeat_received_self=0i,info_complete=47i,info_queue=0i,migrate_allowed=true,migrate_partitions_remaining=0i,migrate_progress_recv=0i,migrate_progress_send=0i,node_name="BB9020011AC4202",objects=0i,paxos_principal="BB9020011AC4202",proxy_in_progress=0i,proxy_retry=0i,query_long_running=0i,query_short_running=0i,reaped_fds=0i,record_refs=0i,rw_in_progress=0i,scans_active=0i,sindex_gc_activity_dur=0i,sindex_gc_garbage_cleaned=0i,sindex_gc_garbage_found=0i,sindex_gc_inactivity_dur=0i,sindex_gc_list_creation_time=0i,sindex_gc_list_deletion_time=0i,sindex_gc_locktimedout=0i,sindex_gc_objects_validated=0i,sindex_ucgarbage_found=0i,sub_objects=0i,system_free_mem_pct=92i,system_swapping=false,tsvc_queue=0i,uptime=1457i 1468923222000000000 +> aerospike_namespace,aerospike_host=localhost:3000,host=tars,namespace=test allow_nonxdr_writes=true,allow_xdr_writes=true,available_bin_names=32768i,batch_sub_proxy_complete=0i,batch_sub_proxy_error=0i,batch_sub_proxy_timeout=0i,batch_sub_read_error=0i,batch_sub_read_not_found=0i,batch_sub_read_success=0i,batch_sub_read_timeout=0i,batch_sub_tsvc_error=0i,batch_sub_tsvc_timeout=0i,client_delete_error=0i,client_delete_not_found=0i,client_delete_success=0i,client_delete_timeout=0i,client_lang_delete_success=0i,client_lang_error=0i,client_lang_read_success=0i,client_lang_write_success=0i,client_proxy_complete=0i,client_proxy_error=0i,client_proxy_timeout=0i,client_read_error=0i,client_read_not_found=0i,client_read_success=0i,client_read_timeout=0i,client_tsvc_error=0i,client_tsvc_timeout=0i,client_udf_complete=0i,client_udf_error=0i,client_udf_timeout=0i,client_write_error=0i,client_write_success=0i,client_write_timeout=0i,cold_start_evict_ttl=4294967295i,conflict_resolution_policy="generation",current_time=206619222i,data_in_index=false,default_ttl=432000i,device_available_pct=99i,device_free_pct=100i,device_total_bytes=4294967296i,device_used_bytes=0i,disallow_null_setname=false,enable_benchmarks_batch_sub=false,enable_benchmarks_read=false,enable_benchmarks_storage=false,enable_benchmarks_udf=false,enable_benchmarks_udf_sub=false,enable_benchmarks_write=false,enable_hist_proxy=false,enable_xdr=false,evict_hist_buckets=10000i,evict_tenths_pct=5i,evict_ttl=0i,evicted_objects=0i,expired_objects=0i,fail_generation=0i,fail_key_busy=0i,fail_record_too_big=0i,fail_xdr_forbidden=0i,geo2dsphere_within.earth_radius_meters=6371000i,geo2dsphere_within.level_mod=1i,geo2dsphere_within.max_cells=12i,geo2dsphere_within.max_level=30i,geo2dsphere_within.min_level=1i,geo2dsphere_within.strict=true,geo_region_query_cells=0i,geo_region_query_falsepos=0i,geo_region_query_points=0i,geo_region_query_reqs=0i,high_water_disk_pct=50i,high_water_memory_pct=60i,hwm_breached=false,ldt_enabled=false,ldt_gc_rate=0i,ldt_page_size=8192i,master_objects=0i,master_sub_objects=0i,max_ttl=315360000i,max_void_time=0i,memory_free_pct=100i,memory_size=1073741824i,memory_used_bytes=0i,memory_used_data_bytes=0i,memory_used_index_bytes=0i,memory_used_sindex_bytes=0i,migrate_order=5i,migrate_record_receives=0i,migrate_record_retransmits=0i,migrate_records_skipped=0i,migrate_records_transmitted=0i,migrate_rx_instances=0i,migrate_rx_partitions_active=0i,migrate_rx_partitions_initial=0i,migrate_rx_partitions_remaining=0i,migrate_sleep=1i,migrate_tx_instances=0i,migrate_tx_partitions_active=0i,migrate_tx_partitions_imbalance=0i,migrate_tx_partitions_initial=0i,migrate_tx_partitions_remaining=0i,node_name="BB9020011AC4202",non_expirable_objects=0i,ns_forward_xdr_writes=false,nsup_cycle_duration=0i,nsup_cycle_sleep_pct=0i,objects=0i,prole_objects=0i,prole_sub_objects=0i,query_agg=0i,query_agg_abort=0i,query_agg_avg_rec_count=0i,query_agg_error=0i,query_agg_success=0i,query_fail=0i,query_long_queue_full=0i,query_long_reqs=0i,query_lookup_abort=0i,query_lookup_avg_rec_count=0i,query_lookup_error=0i,query_lookup_success=0i,query_lookups=0i,query_reqs=0i,query_short_queue_full=0i,query_short_reqs=0i,query_udf_bg_failure=0i,query_udf_bg_success=0i,read_consistency_level_override="off",repl_factor=1i,scan_aggr_abort=0i,scan_aggr_complete=0i,scan_aggr_error=0i,scan_basic_abort=0i,scan_basic_complete=0i,scan_basic_error=0i,scan_udf_bg_abort=0i,scan_udf_bg_complete=0i,scan_udf_bg_error=0i,set_deleted_objects=0i,sets_enable_xdr=true,sindex.data_max_memory="ULONG_MAX",sindex.num_partitions=32i,single_bin=false,stop_writes=false,stop_writes_pct=90i,storage_engine="device",storage_engine.cold_start_empty=false,storage_engine.data_in_memory=true,storage_engine.defrag_lwm_pct=50i,storage_engine.defrag_queue_min=0i,storage_engine.defrag_sleep=1000i,storage_engine.defrag_startup_minimum=10i,storage_engine.disable_odirect=false,storage_engine.enable_osync=false,storage_engine.file="/opt/aerospike/data/test.dat",storage_engine.filesize=4294967296i,storage_engine.flush_max_ms=1000i,storage_engine.fsync_max_sec=0i,storage_engine.max_write_cache=67108864i,storage_engine.min_avail_pct=5i,storage_engine.post_write_queue=0i,storage_engine.scheduler_mode="null",storage_engine.write_block_size=1048576i,storage_engine.write_threads=1i,sub_objects=0i,udf_sub_lang_delete_success=0i,udf_sub_lang_error=0i,udf_sub_lang_read_success=0i,udf_sub_lang_write_success=0i,udf_sub_tsvc_error=0i,udf_sub_tsvc_timeout=0i,udf_sub_udf_complete=0i,udf_sub_udf_error=0i,udf_sub_udf_timeout=0i,write_commit_level_override="off",xdr_write_error=0i,xdr_write_success=0i,xdr_write_timeout=0i,{test}_query_hist_track_back=300i,{test}_query_hist_track_slice=10i,{test}_query_hist_track_thresholds="1,8,64",{test}_read_hist_track_back=300i,{test}_read_hist_track_slice=10i,{test}_read_hist_track_thresholds="1,8,64",{test}_udf_hist_track_back=300i,{test}_udf_hist_track_slice=10i,{test}_udf_hist_track_thresholds="1,8,64",{test}_write_hist_track_back=300i,{test}_write_hist_track_slice=10i,{test}_write_hist_track_thresholds="1,8,64" 1468923222000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 29e51cb82..eb608723e 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -72,18 +72,17 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro nodes := c.GetNodes() for _, n := range nodes { tags := map[string]string{ - "node_name": n.GetName(), "aerospike_host": hostport, } - fields := make(map[string]interface{}) + fields := map[string]interface{}{ + "node_name": n.GetName(), + } stats, err := as.RequestNodeStats(n) if err != nil { return err } for k, v := range stats { - if iv, err := strconv.ParseInt(v, 10, 64); err == nil { - fields[strings.Replace(k, "-", "_", -1)] = iv - } + fields[strings.Replace(k, "-", "_", -1)] = parseValue(v) } acc.AddFields("aerospike_node", fields, tags, time.Now()) @@ -94,9 +93,13 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro namespaces := strings.Split(info["namespaces"], ";") for _, namespace := range namespaces { - nTags := copyTags(tags) + nTags := map[string]string{ + "aerospike_host": hostport, + } nTags["namespace"] = namespace - nFields := make(map[string]interface{}) + nFields := map[string]interface{}{ + "node_name": n.GetName(), + } info, err := as.RequestNodeInfo(n, "namespace/"+namespace) if err != nil { continue @@ -107,9 +110,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro if len(parts) < 2 { continue } - if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil { - nFields[strings.Replace(parts[0], "-", "_", -1)] = iv - } + nFields[strings.Replace(parts[0], "-", "_", -1)] = parseValue(parts[1]) } acc.AddFields("aerospike_namespace", nFields, nTags, time.Now()) } @@ -117,6 +118,16 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro return nil } +func parseValue(v string) interface{} { + if parsed, err := strconv.ParseInt(v, 10, 64); err == nil { + return parsed + } else if parsed, err := strconv.ParseBool(v); err == nil { + return parsed + } else { + return v + } +} + func copyTags(m map[string]string) map[string]string { out := make(map[string]string) for k, v := range m { diff --git a/plugins/inputs/ceph/README.md b/plugins/inputs/ceph/README.md index 61b275650..ab358daaa 100644 --- a/plugins/inputs/ceph/README.md +++ b/plugins/inputs/ceph/README.md @@ -1,18 +1,18 @@ # Ceph Storage Input Plugin -Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. +Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. The plugin works by scanning the configured SocketDir for OSD and MON socket files. When it finds -a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump** +a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump** The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are used as collection tags, and all sub-keys are flattened. For example: ``` - { - "paxos": { + { + "paxos": { "refresh": 9363435, - "refresh_latency": { + "refresh_latency": { "avgcount": 9363435, "sum": 5378.794002000 } @@ -50,7 +50,7 @@ Would be parsed into the following metrics, all of which would be tagged with co ### Measurements & Fields: -All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go. +All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go. ### Tags: @@ -95,7 +95,7 @@ All measurements will have the following tags: - throttle-objecter_ops - throttle-osd_client_bytes - throttle-osd_client_messages - + ### Example Output: From cbf5a55c7df8e24cc9835a6d94e28ac5dfea47be Mon Sep 17 00:00:00 2001 From: Victor Garcia Date: Tue, 19 Jul 2016 13:47:12 +0200 Subject: [PATCH 4/9] MongoDB input plugin: Adding per DB stats (#1466) --- CHANGELOG.md | 1 + plugins/inputs/mongodb/README.md | 13 ++++ plugins/inputs/mongodb/mongodb.go | 10 +-- plugins/inputs/mongodb/mongodb_data.go | 46 +++++++++++++ plugins/inputs/mongodb/mongodb_server.go | 27 +++++++- plugins/inputs/mongodb/mongodb_server_test.go | 4 +- plugins/inputs/mongodb/mongostat.go | 65 +++++++++++++++++++ 7 files changed, 159 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60949047f..7ca37b1e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ should now look like: - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. +- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() ### Bugfixes diff --git a/plugins/inputs/mongodb/README.md b/plugins/inputs/mongodb/README.md index 66ff2668e..72f87feb8 100644 --- a/plugins/inputs/mongodb/README.md +++ b/plugins/inputs/mongodb/README.md @@ -10,6 +10,7 @@ ## mongodb://10.10.3.33:18832, ## 10.0.0.1:10000, etc. servers = ["127.0.0.1:27017"] + gather_perdb_stats = false ``` For authenticated mongodb istances use connection mongdb connection URI @@ -52,3 +53,15 @@ and create a single measurement containing values e.g. * ttl_passes_per_sec * repl_lag * jumbo_chunks (only if mongos or mongo config) + +If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats() +creating another measurement called mongodb_db_stats and containing values: + * collections + * objects + * avg_obj_size + * data_size + * storage_size + * num_extents + * indexes + * index_size + * ok diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index f38fa31ef..0fdb90f74 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -15,9 +15,10 @@ import ( ) type MongoDB struct { - Servers []string - Ssl Ssl - mongos map[string]*Server + Servers []string + Ssl Ssl + mongos map[string]*Server + GatherPerdbStats bool } type Ssl struct { @@ -32,6 +33,7 @@ var sampleConfig = ` ## mongodb://10.10.3.33:18832, ## 10.0.0.1:10000, etc. servers = ["127.0.0.1:27017"] + gather_perdb_stats = false ` func (m *MongoDB) SampleConfig() string { @@ -135,7 +137,7 @@ func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error { } server.Session = sess } - return server.gatherData(acc) + return server.gatherData(acc, m.GatherPerdbStats) } func init() { diff --git a/plugins/inputs/mongodb/mongodb_data.go b/plugins/inputs/mongodb/mongodb_data.go index 7a52d650a..afa4ddd2f 100644 --- a/plugins/inputs/mongodb/mongodb_data.go +++ b/plugins/inputs/mongodb/mongodb_data.go @@ -12,6 +12,12 @@ type MongodbData struct { StatLine *StatLine Fields map[string]interface{} Tags map[string]string + DbData []DbData +} + +type DbData struct { + Name string + Fields map[string]interface{} } func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData { @@ -22,6 +28,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData { StatLine: statLine, Tags: tags, Fields: make(map[string]interface{}), + DbData: []DbData{}, } } @@ -72,6 +79,34 @@ var WiredTigerStats = map[string]string{ "percent_cache_used": "CacheUsedPercent", } +var DbDataStats = map[string]string{ + "collections": "Collections", + "objects": "Objects", + "avg_obj_size": "AvgObjSize", + "data_size": "DataSize", + "storage_size": "StorageSize", + "num_extents": "NumExtents", + "indexes": "Indexes", + "index_size": "IndexSize", + "ok": "Ok", +} + +func (d *MongodbData) AddDbStats() { + for _, dbstat := range d.StatLine.DbStatsLines { + dbStatLine := reflect.ValueOf(&dbstat).Elem() + newDbData := &DbData{ + Name: dbstat.Name, + Fields: make(map[string]interface{}), + } + newDbData.Fields["type"] = "db_stat" + for key, value := range DbDataStats { + val := dbStatLine.FieldByName(value).Interface() + newDbData.Fields[key] = val + } + d.DbData = append(d.DbData, *newDbData) + } +} + func (d *MongodbData) AddDefaultStats() { statLine := reflect.ValueOf(d.StatLine).Elem() d.addStat(statLine, DefaultStats) @@ -113,4 +148,15 @@ func (d *MongodbData) flush(acc telegraf.Accumulator) { d.StatLine.Time, ) d.Fields = make(map[string]interface{}) + + for _, db := range d.DbData { + d.Tags["db_name"] = db.Name + acc.AddFields( + "mongodb_db_stats", + db.Fields, + d.Tags, + d.StatLine.Time, + ) + db.Fields = make(map[string]interface{}) + } } diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go index e4213bbaf..e797fd6ab 100644 --- a/plugins/inputs/mongodb/mongodb_server.go +++ b/plugins/inputs/mongodb/mongodb_server.go @@ -22,7 +22,7 @@ func (s *Server) getDefaultTags() map[string]string { return tags } -func (s *Server) gatherData(acc telegraf.Accumulator) error { +func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error { s.Session.SetMode(mgo.Eventual, true) s.Session.SetSocketTimeout(0) result_server := &ServerStatus{} @@ -42,10 +42,34 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error { JumboChunksCount: int64(jumbo_chunks), } + result_db_stats := &DbStats{} + + if gatherDbStats == true { + names := []string{} + names, err = s.Session.DatabaseNames() + if err != nil { + log.Println("Error getting database names (" + err.Error() + ")") + } + for _, db_name := range names { + db_stat_line := &DbStatsData{} + err = s.Session.DB(db_name).Run(bson.D{{"dbStats", 1}}, db_stat_line) + if err != nil { + log.Println("Error getting db stats from " + db_name + "(" + err.Error() + ")") + } + db := &Db{ + Name: db_name, + DbStatsData: db_stat_line, + } + + result_db_stats.Dbs = append(result_db_stats.Dbs, *db) + } + } + result := &MongoStatus{ ServerStatus: result_server, ReplSetStatus: result_repl, ClusterStatus: result_cluster, + DbStats: result_db_stats, } defer func() { @@ -64,6 +88,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error { s.getDefaultTags(), ) data.AddDefaultStats() + data.AddDbStats() data.flush(acc) } return nil diff --git a/plugins/inputs/mongodb/mongodb_server_test.go b/plugins/inputs/mongodb/mongodb_server_test.go index 52869724c..7ad0f38a2 100644 --- a/plugins/inputs/mongodb/mongodb_server_test.go +++ b/plugins/inputs/mongodb/mongodb_server_test.go @@ -29,12 +29,12 @@ func TestGetDefaultTags(t *testing.T) { func TestAddDefaultStats(t *testing.T) { var acc testutil.Accumulator - err := server.gatherData(&acc) + err := server.gatherData(&acc, false) require.NoError(t, err) time.Sleep(time.Duration(1) * time.Second) // need to call this twice so it can perform the diff - err = server.gatherData(&acc) + err = server.gatherData(&acc, false) require.NoError(t, err) for key, _ := range DefaultStats { diff --git a/plugins/inputs/mongodb/mongostat.go b/plugins/inputs/mongodb/mongostat.go index 23bd05f72..50f65333e 100644 --- a/plugins/inputs/mongodb/mongostat.go +++ b/plugins/inputs/mongodb/mongostat.go @@ -35,6 +35,7 @@ type MongoStatus struct { ServerStatus *ServerStatus ReplSetStatus *ReplSetStatus ClusterStatus *ClusterStatus + DbStats *DbStats } type ServerStatus struct { @@ -65,6 +66,32 @@ type ServerStatus struct { Metrics *MetricsStats `bson:"metrics"` } +// DbStats stores stats from all dbs +type DbStats struct { + Dbs []Db +} + +// Db represent a single DB +type Db struct { + Name string + DbStatsData *DbStatsData +} + +// DbStatsData stores stats from a db +type DbStatsData struct { + Db string `bson:"db"` + Collections int64 `bson:"collections"` + Objects int64 `bson:"objects"` + AvgObjSize float64 `bson:"avgObjSize"` + DataSize int64 `bson:"dataSize"` + StorageSize int64 `bson:"storageSize"` + NumExtents int64 `bson:"numExtents"` + Indexes int64 `bson:"indexes"` + IndexSize int64 `bson:"indexSize"` + Ok int64 `bson:"ok"` + GleStats interface{} `bson:"gleStats"` +} + // ClusterStatus stores information related to the whole cluster type ClusterStatus struct { JumboChunksCount int64 @@ -396,6 +423,22 @@ type StatLine struct { // Cluster fields JumboChunksCount int64 + + // DB stats field + DbStatsLines []DbStatLine +} + +type DbStatLine struct { + Name string + Collections int64 + Objects int64 + AvgObjSize float64 + DataSize int64 + StorageSize int64 + NumExtents int64 + Indexes int64 + IndexSize int64 + Ok int64 } func parseLocks(stat ServerStatus) map[string]LockUsage { @@ -677,5 +720,27 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec newClusterStat := *newMongo.ClusterStatus returnVal.JumboChunksCount = newClusterStat.JumboChunksCount + newDbStats := *newMongo.DbStats + for _, db := range newDbStats.Dbs { + dbStatsData := db.DbStatsData + // mongos doesn't have the db key, so setting the db name + if dbStatsData.Db == "" { + dbStatsData.Db = db.Name + } + dbStatLine := &DbStatLine{ + Name: dbStatsData.Db, + Collections: dbStatsData.Collections, + Objects: dbStatsData.Objects, + AvgObjSize: dbStatsData.AvgObjSize, + DataSize: dbStatsData.DataSize, + StorageSize: dbStatsData.StorageSize, + NumExtents: dbStatsData.NumExtents, + Indexes: dbStatsData.Indexes, + IndexSize: dbStatsData.IndexSize, + Ok: dbStatsData.Ok, + } + returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine) + } + return returnVal } From 82166a36d02e21524c65ef8fcfeb1f0da55bc100 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 19 Jul 2016 14:03:28 +0100 Subject: [PATCH 5/9] Fix err race condition and partial failure issues closes #1439 closes #1440 closes #1441 closes #1442 closes #1443 closes #1444 closes #1445 --- CHANGELOG.md | 6 ++++++ plugins/inputs/dns_query/dns_query.go | 14 ++++++++------ plugins/inputs/dovecot/dovecot.go | 20 ++++++++------------ plugins/inputs/memcached/memcached.go | 12 +++++------- plugins/inputs/mongodb/mongodb.go | 10 ++++------ plugins/inputs/mysql/mysql.go | 25 ++++++++++++++----------- plugins/inputs/mysql/mysql_test.go | 1 - plugins/inputs/nginx/nginx.go | 8 ++++---- plugins/inputs/nsq/nsq.go | 9 ++++----- 9 files changed, 53 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ca37b1e7..76263dc69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## v1.0 [unreleased] +### Features + +### Bugfixes + +- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. + ## v1.0 beta 3 [2016-07-18] ### Release Notes diff --git a/plugins/inputs/dns_query/dns_query.go b/plugins/inputs/dns_query/dns_query.go index 2231f2921..1bccc52c0 100644 --- a/plugins/inputs/dns_query/dns_query.go +++ b/plugins/inputs/dns_query/dns_query.go @@ -3,12 +3,14 @@ package dns_query import ( "errors" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "github.com/miekg/dns" "net" "strconv" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" + "github.com/influxdata/telegraf/plugins/inputs" ) type DnsQuery struct { @@ -55,12 +57,12 @@ func (d *DnsQuery) Description() string { } func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { d.setDefaultValues() + + errChan := errchan.New(len(d.Domains) * len(d.Servers)) for _, domain := range d.Domains { for _, server := range d.Servers { dnsQueryTime, err := d.getDnsQueryTime(domain, server) - if err != nil { - return err - } + errChan.C <- err tags := map[string]string{ "server": server, "domain": domain, @@ -72,7 +74,7 @@ func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { } } - return nil + return errChan.Error() } func (d *DnsQuery) setDefaultValues() { diff --git a/plugins/inputs/dovecot/dovecot.go b/plugins/inputs/dovecot/dovecot.go index 0347016d1..56290e759 100644 --- a/plugins/inputs/dovecot/dovecot.go +++ b/plugins/inputs/dovecot/dovecot.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -51,7 +52,6 @@ const defaultPort = "24242" // Reads stats from all configured servers. func (d *Dovecot) Gather(acc telegraf.Accumulator) error { - if !validQuery[d.Type] { return fmt.Errorf("Error: %s is not a valid query type\n", d.Type) @@ -61,31 +61,27 @@ func (d *Dovecot) Gather(acc telegraf.Accumulator) error { d.Servers = append(d.Servers, "127.0.0.1:24242") } - var wg sync.WaitGroup - - var outerr error - if len(d.Filters) <= 0 { d.Filters = append(d.Filters, "") } - for _, serv := range d.Servers { + var wg sync.WaitGroup + errChan := errchan.New(len(d.Servers) * len(d.Filters)) + for _, server := range d.Servers { for _, filter := range d.Filters { wg.Add(1) - go func(serv string, filter string) { + go func(s string, f string) { defer wg.Done() - outerr = d.gatherServer(serv, acc, d.Type, filter) - }(serv, filter) + errChan.C <- d.gatherServer(s, acc, d.Type, f) + }(server, filter) } } wg.Wait() - - return outerr + return errChan.Error() } func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error { - _, _, err := net.SplitHostPort(addr) if err != nil { return fmt.Errorf("Error: %s on url %s\n", err, addr) diff --git a/plugins/inputs/memcached/memcached.go b/plugins/inputs/memcached/memcached.go index c631a1ed1..5ee538e93 100644 --- a/plugins/inputs/memcached/memcached.go +++ b/plugins/inputs/memcached/memcached.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -73,19 +74,16 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error { return m.gatherServer(":11211", false, acc) } + errChan := errchan.New(len(m.Servers) + len(m.UnixSockets)) for _, serverAddress := range m.Servers { - if err := m.gatherServer(serverAddress, false, acc); err != nil { - return err - } + errChan.C <- m.gatherServer(serverAddress, false, acc) } for _, unixAddress := range m.UnixSockets { - if err := m.gatherServer(unixAddress, true, acc); err != nil { - return err - } + errChan.C <- m.gatherServer(unixAddress, true, acc) } - return nil + return errChan.Error() } func (m *Memcached) gatherServer( diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index 0fdb90f74..a4bdabd96 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -10,6 +10,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" "gopkg.in/mgo.v2" ) @@ -55,9 +56,7 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - - var outerr error - + errChan := errchan.New(len(m.Servers)) for _, serv := range m.Servers { u, err := url.Parse(serv) if err != nil { @@ -73,13 +72,12 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(srv *Server) { defer wg.Done() - outerr = m.gatherServer(srv, acc) + errChan.C <- m.gatherServer(srv, acc) }(m.getMongoServer(u)) } wg.Wait() - - return outerr + return errChan.Error() } func (m *MongoDB) getMongoServer(url *url.URL) *Server { diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 5011e82b9..10b8c2f75 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -7,10 +7,12 @@ import ( "net/url" "strconv" "strings" + "sync" "time" _ "github.com/go-sql-driver/mysql" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -118,26 +120,27 @@ func (m *Mysql) InitMysql() { func (m *Mysql) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 { - // if we can't get stats in this case, thats fine, don't report - // an error. - m.gatherServer(localhost, acc) - return nil + // default to localhost if nothing specified. + return m.gatherServer(localhost, acc) } - // Initialise additional query intervals if !initDone { m.InitMysql() } + var wg sync.WaitGroup + errChan := errchan.New(len(m.Servers)) // Loop through each server and collect metrics - for _, serv := range m.Servers { - err := m.gatherServer(serv, acc) - if err != nil { - return err - } + for _, server := range m.Servers { + wg.Add(1) + go func(s string) { + defer wg.Done() + errChan.C <- m.gatherServer(s, acc) + }(server) } - return nil + wg.Wait() + return errChan.Error() } type mapping struct { diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 989c21722..3ab9187b5 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -20,7 +20,6 @@ func TestMysqlDefaultsToLocal(t *testing.T) { } var acc testutil.Accumulator - err := m.Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/nginx/nginx.go b/plugins/inputs/nginx/nginx.go index b15b539de..3fe8c04d1 100644 --- a/plugins/inputs/nginx/nginx.go +++ b/plugins/inputs/nginx/nginx.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -34,7 +35,7 @@ func (n *Nginx) Description() string { func (n *Nginx) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error + errChan := errchan.New(len(n.Urls)) for _, u := range n.Urls { addr, err := url.Parse(u) @@ -45,13 +46,12 @@ func (n *Nginx) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(addr *url.URL) { defer wg.Done() - outerr = n.gatherUrl(addr, acc) + errChan.C <- n.gatherUrl(addr, acc) }(addr) } wg.Wait() - - return outerr + return errChan.Error() } var tr = &http.Transport{ diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 35ba76866..8bfd72788 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -32,6 +32,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -65,19 +66,17 @@ func (n *NSQ) Description() string { func (n *NSQ) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - + errChan := errchan.New(len(n.Endpoints)) for _, e := range n.Endpoints { wg.Add(1) go func(e string) { defer wg.Done() - outerr = n.gatherEndpoint(e, acc) + errChan.C <- n.gatherEndpoint(e, acc) }(e) } wg.Wait() - - return outerr + return errChan.Error() } var tr = &http.Transport{ From d54b169d6798e160a4ecfd5061e568fc4d3c8a88 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 19 Jul 2016 12:42:59 +0100 Subject: [PATCH 6/9] nstat: fix nstat setting path for snmp6 closes #1477 --- CHANGELOG.md | 1 + plugins/inputs/nstat/nstat.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76263dc69..9c4a7e35b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Bugfixes - [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. +- [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic. ## v1.0 beta 3 [2016-07-18] diff --git a/plugins/inputs/nstat/nstat.go b/plugins/inputs/nstat/nstat.go index d32ef004c..5096d7b03 100644 --- a/plugins/inputs/nstat/nstat.go +++ b/plugins/inputs/nstat/nstat.go @@ -43,9 +43,9 @@ var sampleConfig = ` ## file paths for proc files. If empty default paths will be used: ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6 ## These can also be overridden with env variables, see README. - proc_net_netstat = "" - proc_net_snmp = "" - proc_net_snmp6 = "" + proc_net_netstat = "/proc/net/netstat" + proc_net_snmp = "/proc/net/snmp" + proc_net_snmp6 = "/proc/net/snmp6" ## dump metrics with 0 values too dump_zeros = true ` @@ -141,7 +141,7 @@ func (ns *Nstat) loadPaths() { ns.ProcNetSNMP = proc(ENV_SNMP, NET_SNMP) } if ns.ProcNetSNMP6 == "" { - ns.ProcNetSNMP = proc(ENV_SNMP6, NET_SNMP6) + ns.ProcNetSNMP6 = proc(ENV_SNMP6, NET_SNMP6) } } From 42d9d5d237f92c3ebcc8a7ecfcae022625f85bd5 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Tue, 19 Jul 2016 16:24:10 +0200 Subject: [PATCH 7/9] Fix Redis url, an extra "tcp://" was added (#1521) --- CHANGELOG.md | 1 + plugins/inputs/redis/redis.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c4a7e35b..84d7bae3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ should now look like: - [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config. - [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors. - [#1499](https://github.com/influxdata/telegraf/pull/1499): Make the user able to specify full path for HAproxy stats +- [#1521](https://github.com/influxdata/telegraf/pull/1521): Fix Redis url, an extra "tcp://" was added. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 649786c2c..b08eedee3 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -99,7 +99,7 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup errChan := errchan.New(len(r.Servers)) for _, serv := range r.Servers { - if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") { + if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { serv = "tcp://" + serv } From 191608041f4e421c3e137afc342480f5211f8740 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 19 Jul 2016 17:31:01 +0100 Subject: [PATCH 8/9] Strip container_version from container_image tag closes #1413 --- CHANGELOG.md | 2 ++ plugins/inputs/docker/docker.go | 13 +++++++++++-- plugins/inputs/docker/docker_test.go | 12 +++++++----- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84d7bae3f..729cf5a2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Features +- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. + ### Bugfixes - [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 0af7820e1..dfd768c1a 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -207,9 +207,18 @@ func (d *Docker) gatherContainer( cname = strings.TrimPrefix(container.Names[0], "/") } + // the image name sometimes has a version part. + // ie, rabbitmq:3-management + imageParts := strings.Split(container.Image, ":") + imageName := imageParts[0] + imageVersion := "unknown" + if len(imageParts) > 1 { + imageVersion = imageParts[1] + } tags := map[string]string{ - "container_name": cname, - "container_image": container.Image, + "container_name": cname, + "container_image": imageName, + "container_version": imageVersion, } if len(d.ContainerNames) > 0 { if !sliceContains(cname, d.ContainerNames) { diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index 1574009b8..b1c76f5af 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -378,9 +378,10 @@ func TestDockerGatherInfo(t *testing.T) { "container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", }, map[string]string{ - "container_name": "etcd2", - "container_image": "quay.io/coreos/etcd:v2.2.2", - "cpu": "cpu3", + "container_name": "etcd2", + "container_image": "quay.io/coreos/etcd", + "cpu": "cpu3", + "container_version": "v2.2.2", }, ) acc.AssertContainsTaggedFields(t, @@ -423,8 +424,9 @@ func TestDockerGatherInfo(t *testing.T) { "container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", }, map[string]string{ - "container_name": "etcd2", - "container_image": "quay.io/coreos/etcd:v2.2.2", + "container_name": "etcd2", + "container_image": "quay.io/coreos/etcd", + "container_version": "v2.2.2", }, ) From 0af0fa7c2e4063bcc11b975c514950a71a4d65a4 Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 20 Jul 2016 15:47:04 +0200 Subject: [PATCH 9/9] jolokia: handle multiple multi-dimensional attributes (#1524) fixes #1481 --- CHANGELOG.md | 1 + plugins/inputs/jolokia/jolokia.go | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 729cf5a2b..dda3ba750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. - [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic. +- [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes. ## v1.0 beta 3 [2016-07-18] diff --git a/plugins/inputs/jolokia/jolokia.go b/plugins/inputs/jolokia/jolokia.go index 244338559..53bb65fd0 100644 --- a/plugins/inputs/jolokia/jolokia.go +++ b/plugins/inputs/jolokia/jolokia.go @@ -249,7 +249,14 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error { switch t := values.(type) { case map[string]interface{}: for k, v := range t { - fields[measurement+"_"+k] = v + switch t2 := v.(type) { + case map[string]interface{}: + for k2, v2 := range t2 { + fields[measurement+"_"+k+"_"+k2] = v2 + } + case interface{}: + fields[measurement+"_"+k] = t2 + } } case interface{}: fields[measurement] = t