From 54056f38084cacdbd2e31dfab923489361e40285 Mon Sep 17 00:00:00 2001 From: Greg Date: Fri, 29 Jun 2018 19:16:52 -0600 Subject: [PATCH] Handle mysql input variations in the user_statistics collecting (#4306) --- plugins/inputs/mysql/mysql.go | 349 +++++++++++++++++++--------------- 1 file changed, 193 insertions(+), 156 deletions(-) diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 063452b7c..c17de3dcd 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -4,7 +4,6 @@ import ( "bytes" "database/sql" "fmt" - "log" "strconv" "strings" "sync" @@ -80,7 +79,7 @@ var sampleConfig = ` ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST gather_process_list = true # - ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS + ## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS gather_user_statistics = true # ## gather auto_increment columns and max values from information schema @@ -282,9 +281,8 @@ const ( GROUP BY command,state ORDER BY null` infoSchemaUserStatisticsQuery = ` - SELECT *,count(*) - FROM information_schema.user_statistics - GROUP BY user` + SELECT * + FROM information_schema.user_statistics` infoSchemaAutoIncQuery = ` SELECT table_schema, table_name, column_name, auto_increment, CAST(pow(2, case data_type @@ -761,103 +759,6 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum if len(fields) > 0 { acc.AddFields("mysql", fields, tags) } - // gather connection metrics from processlist for each user - if m.GatherProcessList { - conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") - if err != nil { - log.Printf("E! MySQL Error gathering process list: %s", err) - } else { - for conn_rows.Next() { - var user string - var connections int64 - - err = conn_rows.Scan(&user, &connections) - if err != nil { - return err - } - - tags := map[string]string{"server": servtag, "user": user} - fields := make(map[string]interface{}) - - if err != nil { - return err - } - fields["connections"] = connections - acc.AddFields("mysql_users", fields, tags) - } - } - } - - // gather connection metrics from user_statistics for each user - if m.GatherUserStatistics { - conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user") - if err != nil { - log.Printf("E! MySQL Error gathering user stats: %s", err) - } else { - for conn_rows.Next() { - var user string - var total_connections int64 - var concurrent_connections int64 - var connected_time int64 - var busy_time int64 - var cpu_time int64 - var bytes_received int64 - var bytes_sent int64 - var binlog_bytes_written int64 - var rows_fetched int64 - var rows_updated int64 - var table_rows_read int64 - var select_commands int64 - var update_commands int64 - var other_commands int64 - var commit_transactions int64 - var rollback_transactions int64 - var denied_connections int64 - var lost_connections int64 - var access_denied int64 - var empty_queries int64 - var total_ssl_connections int64 - - err = conn_rows.Scan(&user, &total_connections, &concurrent_connections, - &connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, - &rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, - &commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, - &empty_queries, &total_ssl_connections, - ) - - if err != nil { - return err - } - - tags := map[string]string{"server": servtag, "user": user} - fields := map[string]interface{}{ - "total_connections": total_connections, - "concurrent_connections": concurrent_connections, - "connected_time": connected_time, - "busy_time": busy_time, - "cpu_time": cpu_time, - "bytes_received": bytes_received, - "bytes_sent": bytes_sent, - "binlog_bytes_written": binlog_bytes_written, - "rows_fetched": rows_fetched, - "rows_updated": rows_updated, - "table_rows_read": table_rows_read, - "select_commands": select_commands, - "update_commands": update_commands, - "other_commands": other_commands, - "commit_transactions": commit_transactions, - "rollback_transactions": rollback_transactions, - "denied_connections": denied_connections, - "lost_connections": lost_connections, - "access_denied": access_denied, - "empty_queries": empty_queries, - "total_ssl_connections": total_ssl_connections, - } - - acc.AddFields("mysql_user_stats", fields, tags) - } - } - } return nil } @@ -908,6 +809,29 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. } else { acc.AddFields("mysql_process_list", fields, tags) } + + // get count of connections from each user + conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") + if err != nil { + return err + } + + for conn_rows.Next() { + var user string + var connections int64 + + err = conn_rows.Scan(&user, &connections) + if err != nil { + return err + } + + tags := map[string]string{"server": servtag, "user": user} + fields := make(map[string]interface{}) + + fields["connections"] = connections + acc.AddFields("mysql_users", fields, tags) + } + return nil } @@ -917,77 +841,190 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr // run query rows, err := db.Query(infoSchemaUserStatisticsQuery) if err != nil { + // disable collecting if table is not found (mysql specific error) + // (suppresses repeat errors) + if strings.Contains(err.Error(), "nknown table 'user_statistics'") { + m.GatherUserStatistics = false + } return err } defer rows.Close() - var ( - user string - total_connections int64 - concurrent_connections int64 - connected_time int64 - busy_time int64 - cpu_time int64 - bytes_received int64 - bytes_sent int64 - binlog_bytes_written int64 - rows_fetched int64 - rows_updated int64 - table_rows_read int64 - select_commands int64 - update_commands int64 - other_commands int64 - commit_transactions int64 - rollback_transactions int64 - denied_connections int64 - lost_connections int64 - access_denied int64 - empty_queries int64 - total_ssl_connections int64 - count uint32 - ) + + cols, err := columnsToLower(rows.Columns()) + if err != nil { + return err + } + + read, err := getColSlice(len(cols)) + if err != nil { + return err + } servtag := getDSNTag(serv) for rows.Next() { - err = rows.Scan(&user, &total_connections, &concurrent_connections, - &connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, - &rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, - &commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, - &empty_queries, &total_ssl_connections, &count, - ) + err = rows.Scan(read...) if err != nil { return err } - tags := map[string]string{"server": servtag, "user": user} - fields := map[string]interface{}{ + tags := map[string]string{"server": servtag, "user": *read[0].(*string)} + fields := map[string]interface{}{} - "total_connections": total_connections, - "concurrent_connections": concurrent_connections, - "connected_time": connected_time, - "busy_time": busy_time, - "cpu_time": cpu_time, - "bytes_received": bytes_received, - "bytes_sent": bytes_sent, - "binlog_bytes_written": binlog_bytes_written, - "rows_fetched": rows_fetched, - "rows_updated": rows_updated, - "table_rows_read": table_rows_read, - "select_commands": select_commands, - "update_commands": update_commands, - "other_commands": other_commands, - "commit_transactions": commit_transactions, - "rollback_transactions": rollback_transactions, - "denied_connections": denied_connections, - "lost_connections": lost_connections, - "access_denied": access_denied, - "empty_queries": empty_queries, - "total_ssl_connections": total_ssl_connections, + for i := range cols { + if i == 0 { + continue // skip "user" + } + switch v := read[i].(type) { + case *int64: + fields[cols[i]] = *v + case *float64: + fields[cols[i]] = *v + case *string: + fields[cols[i]] = *v + default: + return fmt.Errorf("Unknown column type - %T", v) + } } acc.AddFields("mysql_user_stats", fields, tags) } return nil } +// columnsToLower converts selected column names to lowercase. +func columnsToLower(s []string, e error) ([]string, error) { + if e != nil { + return nil, e + } + d := make([]string, len(s)) + + for i := range s { + d[i] = strings.ToLower(s[i]) + } + return d, nil +} + +// getColSlice returns an in interface slice that can be used in the row.Scan(). +func getColSlice(l int) ([]interface{}, error) { + // list of all possible column names + var ( + user string + total_connections int64 + concurrent_connections int64 + connected_time int64 + busy_time int64 + cpu_time int64 + bytes_received int64 + bytes_sent int64 + binlog_bytes_written int64 + rows_read int64 + rows_sent int64 + rows_deleted int64 + rows_inserted int64 + rows_updated int64 + select_commands int64 + update_commands int64 + other_commands int64 + commit_transactions int64 + rollback_transactions int64 + denied_connections int64 + lost_connections int64 + access_denied int64 + empty_queries int64 + total_ssl_connections int64 + max_statement_time_exceeded int64 + // maria specific + fbusy_time float64 + fcpu_time float64 + // percona specific + rows_fetched int64 + table_rows_read int64 + ) + + switch l { + case 23: // maria5 + return []interface{}{ + &user, + &total_connections, + &concurrent_connections, + &connected_time, + &fbusy_time, + &fcpu_time, + &bytes_received, + &bytes_sent, + &binlog_bytes_written, + &rows_read, + &rows_sent, + &rows_deleted, + &rows_inserted, + &rows_updated, + &select_commands, + &update_commands, + &other_commands, + &commit_transactions, + &rollback_transactions, + &denied_connections, + &lost_connections, + &access_denied, + &empty_queries, + }, nil + case 25: // maria10 + return []interface{}{ + &user, + &total_connections, + &concurrent_connections, + &connected_time, + &fbusy_time, + &fcpu_time, + &bytes_received, + &bytes_sent, + &binlog_bytes_written, + &rows_read, + &rows_sent, + &rows_deleted, + &rows_inserted, + &rows_updated, + &select_commands, + &update_commands, + &other_commands, + &commit_transactions, + &rollback_transactions, + &denied_connections, + &lost_connections, + &access_denied, + &empty_queries, + &total_ssl_connections, + &max_statement_time_exceeded, + }, nil + case 22: // percona + return []interface{}{ + &user, + &total_connections, + &concurrent_connections, + &connected_time, + &busy_time, + &cpu_time, + &bytes_received, + &bytes_sent, + &binlog_bytes_written, + &rows_fetched, + &rows_updated, + &table_rows_read, + &select_commands, + &update_commands, + &other_commands, + &commit_transactions, + &rollback_transactions, + &denied_connections, + &lost_connections, + &access_denied, + &empty_queries, + &total_ssl_connections, + }, nil + } + + return nil, fmt.Errorf("Not Supported - %d columns", l) +} + // gatherPerfTableIOWaits can be used to get total count and time // of I/O wait event for each table and process func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {