diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go index a9fa2edc3..14391884d 100644 --- a/outputs/influxdb/influxdb.go +++ b/outputs/influxdb/influxdb.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/url" "strings" + "time" "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/internal" @@ -110,6 +111,7 @@ func (i *InfluxDB) Connect() error { } i.conns = conns + rand.Seed(time.Now().UnixNano()) return nil } diff --git a/plugins/rabbitmq/rabbitmq.go b/plugins/rabbitmq/rabbitmq.go index 27580a13a..7101ab431 100644 --- a/plugins/rabbitmq/rabbitmq.go +++ b/plugins/rabbitmq/rabbitmq.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strconv" + "time" "github.com/influxdb/telegraf/plugins" ) @@ -199,20 +200,20 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan if serv.Name != "" { tags["name"] = serv.Name } - - acc.Add("messages", overview.QueueTotals.Messages, tags) - acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags) - acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags) - - acc.Add("channels", overview.ObjectTotals.Channels, tags) - acc.Add("connections", overview.ObjectTotals.Connections, tags) - acc.Add("consumers", overview.ObjectTotals.Consumers, tags) - acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags) - acc.Add("queues", overview.ObjectTotals.Queues, tags) - - acc.Add("messages_acked", overview.MessageStats.Ack, tags) - acc.Add("messages_delivered", overview.MessageStats.Deliver, tags) - acc.Add("messages_published", overview.MessageStats.Publish, tags) + fields := map[string]interface{}{ + "messages": overview.QueueTotals.Messages, + "messages_ready": overview.QueueTotals.MessagesReady, + "messages_unacked": overview.QueueTotals.MessagesUnacknowledged, + "channels": overview.ObjectTotals.Channels, + "connections": overview.ObjectTotals.Connections, + "consumers": overview.ObjectTotals.Consumers, + "exchanges": overview.ObjectTotals.Exchanges, + "queues": overview.ObjectTotals.Queues, + "messages_acked": overview.MessageStats.Ack, + "messages_delivered": overview.MessageStats.Deliver, + "messages_published": overview.MessageStats.Publish, + } + acc.AddFields("rabbitmq_overview", fields, tags) errChan <- nil } @@ -225,6 +226,7 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha errChan <- err return } + now := time.Now() for _, node := range nodes { if !shouldGatherNode(node, serv) { @@ -234,17 +236,20 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha tags := map[string]string{"url": serv.URL} tags["node"] = node.Name - acc.Add("disk_free", node.DiskFree, tags) - acc.Add("disk_free_limit", node.DiskFreeLimit, tags) - acc.Add("fd_total", node.FdTotal, tags) - acc.Add("fd_used", node.FdUsed, tags) - acc.Add("mem_limit", node.MemLimit, tags) - acc.Add("mem_used", node.MemUsed, tags) - acc.Add("proc_total", node.ProcTotal, tags) - acc.Add("proc_used", node.ProcUsed, tags) - acc.Add("run_queue", node.RunQueue, tags) - acc.Add("sockets_total", node.SocketsTotal, tags) - acc.Add("sockets_used", node.SocketsUsed, tags) + fields := map[string]interface{}{ + "disk_free": node.DiskFree, + "disk_free_limit": node.DiskFreeLimit, + "fd_total": node.FdTotal, + "fd_used": node.FdUsed, + "mem_limit": node.MemLimit, + "mem_used": node.MemUsed, + "proc_total": node.ProcTotal, + "proc_used": node.ProcUsed, + "run_queue": node.RunQueue, + "sockets_total": node.SocketsTotal, + "sockets_used": node.SocketsUsed, + } + acc.AddFields("rabbitmq_node", fields, tags, now) } errChan <- nil @@ -273,7 +278,7 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch } acc.AddFields( - "queue", + "rabbitmq_queue", map[string]interface{}{ // common information "consumers": queue.Consumers, diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go index 151fb4f46..2e338ff19 100644 --- a/plugins/redis/redis.go +++ b/plugins/redis/redis.go @@ -164,6 +164,7 @@ func gatherInfoOutput( var keyspace_hits, keyspace_misses uint64 = 0, 0 scanner := bufio.NewScanner(rdr) + fields := make(map[string]interface{}) for scanner.Scan() { line := scanner.Text() if strings.Contains(line, "ERR") { @@ -199,7 +200,7 @@ func gatherInfoOutput( } if err == nil { - acc.Add(metric, ival, tags) + fields[metric] = ival continue } @@ -208,13 +209,14 @@ func gatherInfoOutput( return err } - acc.Add(metric, fval, tags) + fields[metric] = fval } var keyspace_hitrate float64 = 0.0 if keyspace_hits != 0 || keyspace_misses != 0 { keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses) } - acc.Add("keyspace_hitrate", keyspace_hitrate, tags) + fields["keyspace_hitrate"] = keyspace_hitrate + acc.AddFields("redis", fields, tags) return nil } @@ -229,15 +231,17 @@ func gatherKeyspaceLine( tags map[string]string, ) { if strings.Contains(line, "keys=") { + fields := make(map[string]interface{}) tags["database"] = name dbparts := strings.Split(line, ",") for _, dbp := range dbparts { kv := strings.Split(dbp, "=") ival, err := strconv.ParseUint(kv[1], 10, 64) if err == nil { - acc.Add(kv[0], ival, tags) + fields[kv[0]] = ival } } + acc.AddFields("redis_keyspace", fields, tags) } }