From 338f01a8dab618c4890002a7b26e2d5f14388e58 Mon Sep 17 00:00:00 2001 From: acezellponce Date: Fri, 13 Jan 2017 06:25:25 -0800 Subject: [PATCH] Added userstats to mysql input plugin (#2137) * Added GatherUserStatistics, row Uptime in gatherGlobalStatuses, and version fields & tags * Updated README file * pulling in latest from master * ran go fmt to fix formatting * fix unreachable code * few fixes * cleaning up and applying suggestions from sparrc --- plugins/inputs/mysql/README.md | 30 ++++++ plugins/inputs/mysql/mysql.go | 177 ++++++++++++++++++++++++++++++++- 2 files changed, 206 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/mysql/README.md b/plugins/inputs/mysql/README.md index 34bb07bef..a55ddb8ff 100644 --- a/plugins/inputs/mysql/README.md +++ b/plugins/inputs/mysql/README.md @@ -7,6 +7,7 @@ This plugin gathers the statistic data from MySQL server * Slave statuses * Binlog size * Process list +* User Statistics * Info schema auto increment columns * Table I/O waits * Index I/O waits @@ -44,6 +45,9 @@ This plugin gathers the statistic data from MySQL server ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST gather_process_list = true # + ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS + gather_user_statistics = true + # ## gather auto_increment columns and max values from information schema gather_info_schema_auto_inc = true # @@ -89,6 +93,30 @@ Requires to be turned on in configuration. * binary_files_count(int, number) * Process list - connection metrics from processlist for each user. It has the following tags * connections(int, number) +* User Statistics - connection metrics from user statistics for each user. It has the following fields + * access_denied + * binlog_bytes_written + * busy_time + * bytes_received + * bytes_sent + * commit_transactions + * concurrent_connections + * connected_time + * cpu_time + * denied_connections + * empty_queries + * hostlost_connections + * other_commands + * rollback_transactions + * rows_fetched + * rows_updated + * select_commands + * server + * table_rows_read + * total_connections + * total_ssl_connections + * update_commands + * user * Perf Table IO waits - total count and time of I/O waits event for each table and process. It has following fields: * table_io_waits_total_fetch(float, number) @@ -158,6 +186,8 @@ The unit of fields varies by the tags. * server (the host name from which the metrics are gathered) * Process list measurement has following tags * user (username for whom the metrics are gathered) +* User Statistics measurement has following tags + * user (username for whom the metrics are gathered) * Perf table IO waits measurement has following tags * schema * name (object name for event or process) diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 54f296586..c0a31aeed 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -23,6 +23,7 @@ type Mysql struct { PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statemetns_time_limit"` TableSchemaDatabases []string `toml:"table_schema_databases"` GatherProcessList bool `toml:"gather_process_list"` + GatherUserStatistics bool `toml:"gather_user_statistics"` GatherInfoSchemaAutoInc bool `toml:"gather_info_schema_auto_inc"` GatherSlaveStatus bool `toml:"gather_slave_status"` GatherBinaryLogs bool `toml:"gather_binary_logs"` @@ -60,6 +61,9 @@ var sampleConfig = ` ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST gather_process_list = true # + ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS + gather_user_statistics = true + # ## gather auto_increment columns and max values from information schema gather_info_schema_auto_inc = true # @@ -415,6 +419,10 @@ const ( WHERE ID != connection_id() GROUP BY command,state ORDER BY null` + infoSchemaUserStatisticsQuery = ` + SELECT *,count(*) + FROM information_schema.user_statistics + GROUP BY user` infoSchemaAutoIncQuery = ` SELECT table_schema, table_name, column_name, auto_increment, CAST(pow(2, case data_type @@ -530,7 +538,6 @@ const ( table_name FROM information_schema.tables WHERE table_schema = 'performance_schema' AND table_name = ? - ` ) @@ -582,6 +589,13 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { } } + if m.GatherUserStatistics { + err = m.GatherUserStatisticsStatuses(db, serv, acc) + if err != nil { + return err + } + } + if m.GatherSlaveStatus { err = m.gatherSlaveStatuses(db, serv, acc) if err != nil { @@ -669,6 +683,11 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu return err } key = strings.ToLower(key) + // parse mysql version and put into field and tag + if strings.Contains(key, "version") { + fields[key] = string(val) + tags[key] = string(val) + } // parse value, if it is numeric then save, otherwise ignore if floatVal, ok := parseValue(val); ok { fields[key] = floatVal @@ -854,6 +873,12 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum return err } fields["syncs"] = i + case "Uptime": + i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) + if err != nil { + return err + } + fields["uptime"] = i } } // Send any remaining fields @@ -884,6 +909,74 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum } } + // gather connection metrics from user_statistics for each user + if m.GatherUserStatistics { + conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user") + + for conn_rows.Next() { + var user string + var total_connections int64 + var concurrent_connections int64 + var connected_time int64 + var busy_time int64 + var cpu_time int64 + var bytes_received int64 + var bytes_sent int64 + var binlog_bytes_written int64 + var rows_fetched int64 + var rows_updated int64 + var table_rows_read int64 + var select_commands int64 + var update_commands int64 + var other_commands int64 + var commit_transactions int64 + var rollback_transactions int64 + var denied_connections int64 + var lost_connections int64 + var access_denied int64 + var empty_queries int64 + var total_ssl_connections int64 + + err = conn_rows.Scan(&user, &total_connections, &concurrent_connections, + &connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, + &rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, + &commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, + &empty_queries, &total_ssl_connections, + ) + + if err != nil { + return err + } + + tags := map[string]string{"server": servtag, "user": user} + fields := map[string]interface{}{ + "total_connections": total_connections, + "concurrent_connections": concurrent_connections, + "connected_time": connected_time, + "busy_time": busy_time, + "cpu_time": cpu_time, + "bytes_received": bytes_received, + "bytes_sent": bytes_sent, + "binlog_bytes_written": binlog_bytes_written, + "rows_fetched": rows_fetched, + "rows_updated": rows_updated, + "table_rows_read": table_rows_read, + "select_commands": select_commands, + "update_commands": update_commands, + "other_commands": other_commands, + "commit_transactions": commit_transactions, + "rollback_transactions": rollback_transactions, + "denied_connections": denied_connections, + "lost_connections": lost_connections, + "access_denied": access_denied, + "empty_queries": empty_queries, + "total_ssl_connections": total_ssl_connections, + } + + acc.AddFields("mysql_user_stats", fields, tags) + } + } + return nil } @@ -932,6 +1025,88 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. return nil } +// GatherUserStatistics can be used to collect metrics on each running command +// and its state with its running count +func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + // run query + rows, err := db.Query(infoSchemaUserStatisticsQuery) + if err != nil { + return err + } + defer rows.Close() + var ( + user string + total_connections int64 + concurrent_connections int64 + connected_time int64 + busy_time int64 + cpu_time int64 + bytes_received int64 + bytes_sent int64 + binlog_bytes_written int64 + rows_fetched int64 + rows_updated int64 + table_rows_read int64 + select_commands int64 + update_commands int64 + other_commands int64 + commit_transactions int64 + rollback_transactions int64 + denied_connections int64 + lost_connections int64 + access_denied int64 + empty_queries int64 + total_ssl_connections int64 + count uint32 + ) + + var servtag string + servtag, err = parseDSN(serv) + if err != nil { + servtag = "localhost" + } + + for rows.Next() { + err = rows.Scan(&user, &total_connections, &concurrent_connections, + &connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, + &rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, + &commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, + &empty_queries, &total_ssl_connections, &count, + ) + if err != nil { + return err + } + + tags := map[string]string{"server": servtag, "user": user} + fields := map[string]interface{}{ + + "total_connections": total_connections, + "concurrent_connections": concurrent_connections, + "connected_time": connected_time, + "busy_time": busy_time, + "cpu_time": cpu_time, + "bytes_received": bytes_received, + "bytes_sent": bytes_sent, + "binlog_bytes_written": binlog_bytes_written, + "rows_fetched": rows_fetched, + "rows_updated": rows_updated, + "table_rows_read": table_rows_read, + "select_commands": select_commands, + "update_commands": update_commands, + "other_commands": other_commands, + "commit_transactions": commit_transactions, + "rollback_transactions": rollback_transactions, + "denied_connections": denied_connections, + "lost_connections": lost_connections, + "access_denied": access_denied, + "empty_queries": empty_queries, + "total_ssl_connections": total_ssl_connections, + } + acc.AddFields("mysql_user_stats", fields, tags) + } + return nil +} + // gatherPerfTableIOWaits can be used to get total count and time // of I/O wait event for each table and process func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {