Handle mysql input variations in the user_statistics collecting (#4306)
(cherry picked from commit 54056f3808
)
This commit is contained in:
parent
d112323d81
commit
70c20b0bc4
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -80,7 +79,7 @@ var sampleConfig = `
|
||||||
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
||||||
gather_process_list = true
|
gather_process_list = true
|
||||||
#
|
#
|
||||||
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
|
## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS
|
||||||
gather_user_statistics = true
|
gather_user_statistics = true
|
||||||
#
|
#
|
||||||
## gather auto_increment columns and max values from information schema
|
## gather auto_increment columns and max values from information schema
|
||||||
|
@ -282,9 +281,8 @@ const (
|
||||||
GROUP BY command,state
|
GROUP BY command,state
|
||||||
ORDER BY null`
|
ORDER BY null`
|
||||||
infoSchemaUserStatisticsQuery = `
|
infoSchemaUserStatisticsQuery = `
|
||||||
SELECT *,count(*)
|
SELECT *
|
||||||
FROM information_schema.user_statistics
|
FROM information_schema.user_statistics`
|
||||||
GROUP BY user`
|
|
||||||
infoSchemaAutoIncQuery = `
|
infoSchemaAutoIncQuery = `
|
||||||
SELECT table_schema, table_name, column_name, auto_increment,
|
SELECT table_schema, table_name, column_name, auto_increment,
|
||||||
CAST(pow(2, case data_type
|
CAST(pow(2, case data_type
|
||||||
|
@ -761,103 +759,6 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
|
||||||
if len(fields) > 0 {
|
if len(fields) > 0 {
|
||||||
acc.AddFields("mysql", fields, tags)
|
acc.AddFields("mysql", fields, tags)
|
||||||
}
|
}
|
||||||
// gather connection metrics from processlist for each user
|
|
||||||
if m.GatherProcessList {
|
|
||||||
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("E! MySQL Error gathering process list: %s", err)
|
|
||||||
} else {
|
|
||||||
for conn_rows.Next() {
|
|
||||||
var user string
|
|
||||||
var connections int64
|
|
||||||
|
|
||||||
err = conn_rows.Scan(&user, &connections)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
tags := map[string]string{"server": servtag, "user": user}
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fields["connections"] = connections
|
|
||||||
acc.AddFields("mysql_users", fields, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// gather connection metrics from user_statistics for each user
|
|
||||||
if m.GatherUserStatistics {
|
|
||||||
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("E! MySQL Error gathering user stats: %s", err)
|
|
||||||
} else {
|
|
||||||
for conn_rows.Next() {
|
|
||||||
var user string
|
|
||||||
var total_connections int64
|
|
||||||
var concurrent_connections int64
|
|
||||||
var connected_time int64
|
|
||||||
var busy_time int64
|
|
||||||
var cpu_time int64
|
|
||||||
var bytes_received int64
|
|
||||||
var bytes_sent int64
|
|
||||||
var binlog_bytes_written int64
|
|
||||||
var rows_fetched int64
|
|
||||||
var rows_updated int64
|
|
||||||
var table_rows_read int64
|
|
||||||
var select_commands int64
|
|
||||||
var update_commands int64
|
|
||||||
var other_commands int64
|
|
||||||
var commit_transactions int64
|
|
||||||
var rollback_transactions int64
|
|
||||||
var denied_connections int64
|
|
||||||
var lost_connections int64
|
|
||||||
var access_denied int64
|
|
||||||
var empty_queries int64
|
|
||||||
var total_ssl_connections int64
|
|
||||||
|
|
||||||
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
|
|
||||||
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
|
|
||||||
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
|
|
||||||
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
|
|
||||||
&empty_queries, &total_ssl_connections,
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
tags := map[string]string{"server": servtag, "user": user}
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"total_connections": total_connections,
|
|
||||||
"concurrent_connections": concurrent_connections,
|
|
||||||
"connected_time": connected_time,
|
|
||||||
"busy_time": busy_time,
|
|
||||||
"cpu_time": cpu_time,
|
|
||||||
"bytes_received": bytes_received,
|
|
||||||
"bytes_sent": bytes_sent,
|
|
||||||
"binlog_bytes_written": binlog_bytes_written,
|
|
||||||
"rows_fetched": rows_fetched,
|
|
||||||
"rows_updated": rows_updated,
|
|
||||||
"table_rows_read": table_rows_read,
|
|
||||||
"select_commands": select_commands,
|
|
||||||
"update_commands": update_commands,
|
|
||||||
"other_commands": other_commands,
|
|
||||||
"commit_transactions": commit_transactions,
|
|
||||||
"rollback_transactions": rollback_transactions,
|
|
||||||
"denied_connections": denied_connections,
|
|
||||||
"lost_connections": lost_connections,
|
|
||||||
"access_denied": access_denied,
|
|
||||||
"empty_queries": empty_queries,
|
|
||||||
"total_ssl_connections": total_ssl_connections,
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AddFields("mysql_user_stats", fields, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -908,6 +809,29 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
|
||||||
} else {
|
} else {
|
||||||
acc.AddFields("mysql_process_list", fields, tags)
|
acc.AddFields("mysql_process_list", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get count of connections from each user
|
||||||
|
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for conn_rows.Next() {
|
||||||
|
var user string
|
||||||
|
var connections int64
|
||||||
|
|
||||||
|
err = conn_rows.Scan(&user, &connections)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{"server": servtag, "user": user}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
|
||||||
|
fields["connections"] = connections
|
||||||
|
acc.AddFields("mysql_users", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -917,9 +841,71 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
|
||||||
// run query
|
// run query
|
||||||
rows, err := db.Query(infoSchemaUserStatisticsQuery)
|
rows, err := db.Query(infoSchemaUserStatisticsQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// disable collecting if table is not found (mysql specific error)
|
||||||
|
// (suppresses repeat errors)
|
||||||
|
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
|
||||||
|
m.GatherUserStatistics = false
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
|
cols, err := columnsToLower(rows.Columns())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
read, err := getColSlice(len(cols))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
servtag := getDSNTag(serv)
|
||||||
|
for rows.Next() {
|
||||||
|
err = rows.Scan(read...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{"server": servtag, "user": *read[0].(*string)}
|
||||||
|
fields := map[string]interface{}{}
|
||||||
|
|
||||||
|
for i := range cols {
|
||||||
|
if i == 0 {
|
||||||
|
continue // skip "user"
|
||||||
|
}
|
||||||
|
switch v := read[i].(type) {
|
||||||
|
case *int64:
|
||||||
|
fields[cols[i]] = *v
|
||||||
|
case *float64:
|
||||||
|
fields[cols[i]] = *v
|
||||||
|
case *string:
|
||||||
|
fields[cols[i]] = *v
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("Unknown column type - %T", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acc.AddFields("mysql_user_stats", fields, tags)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// columnsToLower converts selected column names to lowercase.
|
||||||
|
func columnsToLower(s []string, e error) ([]string, error) {
|
||||||
|
if e != nil {
|
||||||
|
return nil, e
|
||||||
|
}
|
||||||
|
d := make([]string, len(s))
|
||||||
|
|
||||||
|
for i := range s {
|
||||||
|
d[i] = strings.ToLower(s[i])
|
||||||
|
}
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getColSlice returns an in interface slice that can be used in the row.Scan().
|
||||||
|
func getColSlice(l int) ([]interface{}, error) {
|
||||||
|
// list of all possible column names
|
||||||
var (
|
var (
|
||||||
user string
|
user string
|
||||||
total_connections int64
|
total_connections int64
|
||||||
|
@ -930,9 +916,11 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
|
||||||
bytes_received int64
|
bytes_received int64
|
||||||
bytes_sent int64
|
bytes_sent int64
|
||||||
binlog_bytes_written int64
|
binlog_bytes_written int64
|
||||||
rows_fetched int64
|
rows_read int64
|
||||||
|
rows_sent int64
|
||||||
|
rows_deleted int64
|
||||||
|
rows_inserted int64
|
||||||
rows_updated int64
|
rows_updated int64
|
||||||
table_rows_read int64
|
|
||||||
select_commands int64
|
select_commands int64
|
||||||
update_commands int64
|
update_commands int64
|
||||||
other_commands int64
|
other_commands int64
|
||||||
|
@ -943,49 +931,98 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
|
||||||
access_denied int64
|
access_denied int64
|
||||||
empty_queries int64
|
empty_queries int64
|
||||||
total_ssl_connections int64
|
total_ssl_connections int64
|
||||||
count uint32
|
max_statement_time_exceeded int64
|
||||||
|
// maria specific
|
||||||
|
fbusy_time float64
|
||||||
|
fcpu_time float64
|
||||||
|
// percona specific
|
||||||
|
rows_fetched int64
|
||||||
|
table_rows_read int64
|
||||||
)
|
)
|
||||||
|
|
||||||
servtag := getDSNTag(serv)
|
switch l {
|
||||||
for rows.Next() {
|
case 23: // maria5
|
||||||
err = rows.Scan(&user, &total_connections, &concurrent_connections,
|
return []interface{}{
|
||||||
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
|
&user,
|
||||||
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
|
&total_connections,
|
||||||
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
|
&concurrent_connections,
|
||||||
&empty_queries, &total_ssl_connections, &count,
|
&connected_time,
|
||||||
)
|
&fbusy_time,
|
||||||
if err != nil {
|
&fcpu_time,
|
||||||
return err
|
&bytes_received,
|
||||||
|
&bytes_sent,
|
||||||
|
&binlog_bytes_written,
|
||||||
|
&rows_read,
|
||||||
|
&rows_sent,
|
||||||
|
&rows_deleted,
|
||||||
|
&rows_inserted,
|
||||||
|
&rows_updated,
|
||||||
|
&select_commands,
|
||||||
|
&update_commands,
|
||||||
|
&other_commands,
|
||||||
|
&commit_transactions,
|
||||||
|
&rollback_transactions,
|
||||||
|
&denied_connections,
|
||||||
|
&lost_connections,
|
||||||
|
&access_denied,
|
||||||
|
&empty_queries,
|
||||||
|
}, nil
|
||||||
|
case 25: // maria10
|
||||||
|
return []interface{}{
|
||||||
|
&user,
|
||||||
|
&total_connections,
|
||||||
|
&concurrent_connections,
|
||||||
|
&connected_time,
|
||||||
|
&fbusy_time,
|
||||||
|
&fcpu_time,
|
||||||
|
&bytes_received,
|
||||||
|
&bytes_sent,
|
||||||
|
&binlog_bytes_written,
|
||||||
|
&rows_read,
|
||||||
|
&rows_sent,
|
||||||
|
&rows_deleted,
|
||||||
|
&rows_inserted,
|
||||||
|
&rows_updated,
|
||||||
|
&select_commands,
|
||||||
|
&update_commands,
|
||||||
|
&other_commands,
|
||||||
|
&commit_transactions,
|
||||||
|
&rollback_transactions,
|
||||||
|
&denied_connections,
|
||||||
|
&lost_connections,
|
||||||
|
&access_denied,
|
||||||
|
&empty_queries,
|
||||||
|
&total_ssl_connections,
|
||||||
|
&max_statement_time_exceeded,
|
||||||
|
}, nil
|
||||||
|
case 22: // percona
|
||||||
|
return []interface{}{
|
||||||
|
&user,
|
||||||
|
&total_connections,
|
||||||
|
&concurrent_connections,
|
||||||
|
&connected_time,
|
||||||
|
&busy_time,
|
||||||
|
&cpu_time,
|
||||||
|
&bytes_received,
|
||||||
|
&bytes_sent,
|
||||||
|
&binlog_bytes_written,
|
||||||
|
&rows_fetched,
|
||||||
|
&rows_updated,
|
||||||
|
&table_rows_read,
|
||||||
|
&select_commands,
|
||||||
|
&update_commands,
|
||||||
|
&other_commands,
|
||||||
|
&commit_transactions,
|
||||||
|
&rollback_transactions,
|
||||||
|
&denied_connections,
|
||||||
|
&lost_connections,
|
||||||
|
&access_denied,
|
||||||
|
&empty_queries,
|
||||||
|
&total_ssl_connections,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"server": servtag, "user": user}
|
return nil, fmt.Errorf("Not Supported - %d columns", l)
|
||||||
fields := map[string]interface{}{
|
|
||||||
|
|
||||||
"total_connections": total_connections,
|
|
||||||
"concurrent_connections": concurrent_connections,
|
|
||||||
"connected_time": connected_time,
|
|
||||||
"busy_time": busy_time,
|
|
||||||
"cpu_time": cpu_time,
|
|
||||||
"bytes_received": bytes_received,
|
|
||||||
"bytes_sent": bytes_sent,
|
|
||||||
"binlog_bytes_written": binlog_bytes_written,
|
|
||||||
"rows_fetched": rows_fetched,
|
|
||||||
"rows_updated": rows_updated,
|
|
||||||
"table_rows_read": table_rows_read,
|
|
||||||
"select_commands": select_commands,
|
|
||||||
"update_commands": update_commands,
|
|
||||||
"other_commands": other_commands,
|
|
||||||
"commit_transactions": commit_transactions,
|
|
||||||
"rollback_transactions": rollback_transactions,
|
|
||||||
"denied_connections": denied_connections,
|
|
||||||
"lost_connections": lost_connections,
|
|
||||||
"access_denied": access_denied,
|
|
||||||
"empty_queries": empty_queries,
|
|
||||||
"total_ssl_connections": total_ssl_connections,
|
|
||||||
}
|
|
||||||
acc.AddFields("mysql_user_stats", fields, tags)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// gatherPerfTableIOWaits can be used to get total count and time
|
// gatherPerfTableIOWaits can be used to get total count and time
|
||||||
|
|
Loading…
Reference in New Issue