From 60c39ced69ed18ea23008ec6258a154394b3828b Mon Sep 17 00:00:00 2001 From: Mike Danko Date: Tue, 12 Dec 2017 16:22:11 -0500 Subject: [PATCH] Fix various mysql data type conversions (#3554) --- plugins/inputs/mysql/mysql.go | 321 ++++------------------------- plugins/inputs/mysql/mysql_test.go | 21 +- 2 files changed, 54 insertions(+), 288 deletions(-) diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index c3dc38423..1e75bf6b5 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -169,182 +169,6 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error { return nil } -type mapping struct { - onServer string - inExport string -} - -var mappings = []*mapping{ - { - onServer: "Aborted_", - inExport: "aborted_", - }, - { - onServer: "Bytes_", - inExport: "bytes_", - }, - { - onServer: "Com_", - inExport: "commands_", - }, - { - onServer: "Created_", - inExport: "created_", - }, - { - onServer: "Handler_", - inExport: "handler_", - }, - { - onServer: "Innodb_", - inExport: "innodb_", - }, - { - onServer: "Key_", - inExport: "key_", - }, - { - onServer: "Open_", - inExport: "open_", - }, - { - onServer: "Opened_", - inExport: "opened_", - }, - { - onServer: "Qcache_", - inExport: "qcache_", - }, - { - onServer: "Table_", - inExport: "table_", - }, - { - onServer: "Tokudb_", - inExport: "tokudb_", - }, - { - onServer: "Threads_", - inExport: "threads_", - }, - { - onServer: "Access_", - inExport: "access_", - }, - { - onServer: "Aria__", - inExport: "aria_", - }, - { - onServer: "Binlog__", - inExport: "binlog_", - }, - { - onServer: "Busy_", - inExport: "busy_", - }, - { - onServer: "Connection_", - inExport: "connection_", - }, - { - onServer: "Delayed_", - inExport: "delayed_", - }, - { - onServer: "Empty_", - inExport: "empty_", - }, - { - onServer: "Executed_", - inExport: "executed_", - }, - { - onServer: "Executed_", - inExport: "executed_", - }, - { - onServer: "Feature_", - inExport: "feature_", - }, - { - onServer: "Flush_", - inExport: "flush_", - }, - { - onServer: "Last_", - inExport: "last_", - }, - { - onServer: "Master_", - inExport: "master_", - }, - { - onServer: "Max_", - inExport: "max_", - }, - { - onServer: "Memory_", - inExport: "memory_", - }, - { - onServer: "Not_", - inExport: "not_", - }, - { - onServer: "Performance_", - inExport: "performance_", - }, - { - onServer: "Prepared_", - inExport: "prepared_", - }, - { - onServer: "Rows_", - inExport: "rows_", - }, - { - onServer: "Rpl_", - inExport: "rpl_", - }, - { - onServer: "Select_", - inExport: "select_", - }, - { - onServer: "Slave_", - inExport: "slave_", - }, - { - onServer: "Slow_", - inExport: "slow_", - }, - { - onServer: "Sort_", - inExport: "sort_", - }, - { - onServer: "Subquery_", - inExport: "subquery_", - }, - { - onServer: "Tc_", - inExport: "tc_", - }, - { - onServer: "Threadpool_", - inExport: "threadpool_", - }, - { - onServer: "wsrep_", - inExport: "wsrep_", - }, - { - onServer: "Uptime_", - inExport: "uptime_", - }, -} - var ( // status counter generalThreadStates = map[string]uint32{ @@ -717,9 +541,8 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu fields[key] = string(val) tags[key] = string(val) } - // parse value, if it is numeric then save, otherwise ignore - if floatVal, ok := parseValue(val); ok { - fields[key] = floatVal + if value, ok := parseValue(val); ok { + fields[key] = value } // Send 20 fields at a time if len(fields) >= 20 { @@ -769,7 +592,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu } // range over columns, and try to parse values for i, col := range cols { - // skip unparsable values + col = strings.ToLower(col) if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok { fields["slave_"+col] = value } @@ -820,98 +643,36 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat // the mappings of actual names and names of each status to be exported // to output is provided on mappings variable func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { - // If user forgot the '/', add it - if strings.HasSuffix(serv, ")") { - serv = serv + "/" - } else if serv == "localhost" { - serv = "" - } - // run query rows, err := db.Query(globalStatusQuery) if err != nil { return err } + defer rows.Close() // parse the DSN and save host name as a tag servtag := getDSNTag(serv) tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) for rows.Next() { - var name string - var val interface{} + var key string + var val sql.RawBytes - err = rows.Scan(&name, &val) - if err != nil { + if err = rows.Scan(&key, &val); err != nil { return err } - var found bool + key = strings.ToLower(key) - // iterate over mappings and gather metrics that is provided on mapping - for _, mapped := range mappings { - if strings.HasPrefix(name, mapped.onServer) { - // convert numeric values to integer - i, _ := strconv.Atoi(string(val.([]byte))) - fields[mapped.inExport+name[len(mapped.onServer):]] = i - found = true - } + if value, ok := parseValue(val); ok { + fields[key] = value } + // Send 20 fields at a time if len(fields) >= 20 { acc.AddFields("mysql", fields, tags) fields = make(map[string]interface{}) } - - if found { - continue - } - - // search for specific values - switch name { - case "Queries": - i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) - if err != nil { - acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err)) - } else { - fields["queries"] = i - } - case "Questions": - i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) - if err != nil { - acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err)) - } else { - fields["questions"] = i - } - case "Slow_queries": - i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) - if err != nil { - acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err)) - } else { - fields["slow_queries"] = i - } - case "Connections": - i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) - if err != nil { - acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err)) - } else { - fields["connections"] = i - } - case "Syncs": - i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) - if err != nil { - acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err)) - } else { - fields["syncs"] = i - } - case "Uptime": - i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) - if err != nil { - acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err)) - } else { - fields["uptime"] = i - } - } } // Send any remaining fields if len(fields) > 0 { @@ -1059,7 +820,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. for s, c := range stateCounts { fields[newNamespace("threads", s)] = c } - acc.AddFields("mysql_info_schema", fields, tags) + acc.AddFields("mysql_process_list", fields, tags) return nil } @@ -1272,7 +1033,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel fields["auto_increment_column"] = incValue fields["auto_increment_column_max"] = maxInt - acc.AddFields("mysql_info_schema", fields, tags) + acc.AddFields("mysql_table_schema", fields, tags) } return nil } @@ -1287,21 +1048,19 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu } defer rows.Close() - var key string - var val sql.RawBytes - // parse DSN and save server tag servtag := getDSNTag(serv) tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) for rows.Next() { + var key string + var val sql.RawBytes if err := rows.Scan(&key, &val); err != nil { return err } key = strings.ToLower(key) - // parse value, if it is numeric then save, otherwise ignore - if floatVal, ok := parseValue(val); ok { - fields[key] = floatVal + if value, ok := parseValue(val); ok { + fields[key] = value } // Send 20 fields at a time if len(fields) >= 20 { @@ -1671,23 +1430,17 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula tags["schema"] = tableSchema tags["table"] = tableName - acc.AddFields(newNamespace("info_schema", "table_rows"), - map[string]interface{}{"value": tableRows}, tags) + acc.AddFields("mysql_table_schema", + map[string]interface{}{"rows": tableRows}, tags) - dlTags := copyTags(tags) - dlTags["component"] = "data_length" - acc.AddFields(newNamespace("info_schema", "table_size", "data_length"), - map[string]interface{}{"value": dataLength}, dlTags) + acc.AddFields("mysql_table_schema", + map[string]interface{}{"data_length": dataLength}, tags) - ilTags := copyTags(tags) - ilTags["component"] = "index_length" - acc.AddFields(newNamespace("info_schema", "table_size", "index_length"), - map[string]interface{}{"value": indexLength}, ilTags) + acc.AddFields("mysql_table_schema", + map[string]interface{}{"index_length": indexLength}, tags) - dfTags := copyTags(tags) - dfTags["component"] = "data_free" - acc.AddFields(newNamespace("info_schema", "table_size", "data_free"), - map[string]interface{}{"value": dataFree}, dfTags) + acc.AddFields("mysql_table_schema", + map[string]interface{}{"data_free": dataFree}, tags) versionTags := copyTags(tags) versionTags["type"] = tableType @@ -1695,24 +1448,34 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula versionTags["row_format"] = rowFormat versionTags["create_options"] = createOptions - acc.AddFields(newNamespace("info_schema", "table_version"), - map[string]interface{}{"value": version}, versionTags) + acc.AddFields("mysql_table_schema_version", + map[string]interface{}{"table_version": version}, versionTags) } } return nil } // parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1 -func parseValue(value sql.RawBytes) (float64, bool) { - if bytes.Compare(value, []byte("Yes")) == 0 || bytes.Compare(value, []byte("ON")) == 0 { +func parseValue(value sql.RawBytes) (interface{}, bool) { + if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 { return 1, true } - if bytes.Compare(value, []byte("No")) == 0 || bytes.Compare(value, []byte("OFF")) == 0 { + if bytes.EqualFold(value, []byte("NO")) || bytes.Compare(value, []byte("OFF")) == 0 { return 0, true } - n, err := strconv.ParseFloat(string(value), 64) - return n, err == nil + + if val, err := strconv.ParseInt(string(value), 10, 64); err == nil { + return val, true + } + if val, err := strconv.ParseFloat(string(value), 64); err == nil { + return val, true + } + + if len(string(value)) > 0 { + return string(value), true + } + return nil, false } // findThreadState can be used to find thread state by command and plain state diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 5356e7bd4..1820c9347 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -127,26 +127,29 @@ func TestMysqlDNSAddTimeout(t *testing.T) { } } } - func TestParseValue(t *testing.T) { testCases := []struct { rawByte sql.RawBytes - value float64 + output interface{} boolValue bool }{ - {sql.RawBytes("Yes"), 1, true}, - {sql.RawBytes("No"), 0, false}, + {sql.RawBytes("123"), int64(123), true}, + {sql.RawBytes("abc"), "abc", true}, + {sql.RawBytes("10.1"), 10.1, true}, {sql.RawBytes("ON"), 1, true}, - {sql.RawBytes("OFF"), 0, false}, - {sql.RawBytes("ABC"), 0, false}, + {sql.RawBytes("OFF"), 0, true}, + {sql.RawBytes("NO"), 0, true}, + {sql.RawBytes("YES"), 1, true}, + {sql.RawBytes("No"), 0, true}, + {sql.RawBytes("Yes"), 1, true}, + {sql.RawBytes(""), nil, false}, } for _, cases := range testCases { - if value, ok := parseValue(cases.rawByte); value != cases.value && ok != cases.boolValue { - t.Errorf("want %d with %t, got %d with %t", int(cases.value), cases.boolValue, int(value), ok) + if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue { + t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got) } } } - func TestNewNamespace(t *testing.T) { testCases := []struct { words []string