Fix known mysql type conversion issues (#6647)
This commit is contained in:
parent
eb93dab70b
commit
d858d82a85
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/internal/tls"
|
"github.com/influxdata/telegraf/internal/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"
|
"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs/mysql/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Mysql struct {
|
type Mysql struct {
|
||||||
|
@ -37,6 +38,8 @@ type Mysql struct {
|
||||||
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
|
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
|
||||||
IntervalSlow string `toml:"interval_slow"`
|
IntervalSlow string `toml:"interval_slow"`
|
||||||
MetricVersion int `toml:"metric_version"`
|
MetricVersion int `toml:"metric_version"`
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
lastT time.Time
|
lastT time.Time
|
||||||
initDone bool
|
initDone bool
|
||||||
|
@ -554,14 +557,20 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key = strings.ToLower(key)
|
key = strings.ToLower(key)
|
||||||
|
|
||||||
// parse mysql version and put into field and tag
|
// parse mysql version and put into field and tag
|
||||||
if strings.Contains(key, "version") {
|
if strings.Contains(key, "version") {
|
||||||
fields[key] = string(val)
|
fields[key] = string(val)
|
||||||
tags[key] = string(val)
|
tags[key] = string(val)
|
||||||
}
|
}
|
||||||
if value, ok := m.parseValue(val); ok {
|
|
||||||
|
value, err := m.parseGlobalVariables(key, val)
|
||||||
|
if err != nil {
|
||||||
|
m.Log.Debugf("Error parsing global variable %q: %v", key, err)
|
||||||
|
} else {
|
||||||
fields[key] = value
|
fields[key] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send 20 fields at a time
|
// Send 20 fields at a time
|
||||||
if len(fields) >= 20 {
|
if len(fields) >= 20 {
|
||||||
acc.AddFields("mysql_variables", fields, tags)
|
acc.AddFields("mysql_variables", fields, tags)
|
||||||
|
@ -575,6 +584,18 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
|
||||||
|
if m.MetricVersion < 2 {
|
||||||
|
v, ok := v1.ParseValue(value)
|
||||||
|
if ok {
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
return v, fmt.Errorf("could not parse value: %q", string(value))
|
||||||
|
} else {
|
||||||
|
return v2.ConvertGlobalVariables(key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// gatherSlaveStatuses can be used to get replication analytics
|
// gatherSlaveStatuses can be used to get replication analytics
|
||||||
// When the server is slave, then it returns only one row.
|
// When the server is slave, then it returns only one row.
|
||||||
// If the multi-source replication is set, then everything works differently
|
// If the multi-source replication is set, then everything works differently
|
||||||
|
@ -748,7 +769,10 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
key = strings.ToLower(key)
|
key = strings.ToLower(key)
|
||||||
if value, ok := m.parseValue(val); ok {
|
value, err := v2.ConvertGlobalStatus(key, val)
|
||||||
|
if err != nil {
|
||||||
|
m.Log.Debugf("Error parsing global status: %v", err)
|
||||||
|
} else {
|
||||||
fields[key] = value
|
fields[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
package v2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConversionFunc func(value sql.RawBytes) (interface{}, error)
|
||||||
|
|
||||||
|
func ParseInt(value sql.RawBytes) (interface{}, error) {
|
||||||
|
v, err := strconv.ParseInt(string(value), 10, 64)
|
||||||
|
|
||||||
|
// Ignore ErrRange. When this error is set the returned value is "the
|
||||||
|
// maximum magnitude integer of the appropriate bitSize and sign."
|
||||||
|
if err, ok := err.(*strconv.NumError); ok && err.Err == strconv.ErrRange {
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return v, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
|
||||||
|
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
|
||||||
|
return int64(1), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return int64(0), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseGTIDMode(value sql.RawBytes) (interface{}, error) {
|
||||||
|
// https://dev.mysql.com/doc/refman/8.0/en/replication-mode-change-online-concepts.html
|
||||||
|
v := string(value)
|
||||||
|
switch v {
|
||||||
|
case "OFF":
|
||||||
|
return int64(0), nil
|
||||||
|
case "ON":
|
||||||
|
return int64(1), nil
|
||||||
|
case "OFF_PERMISSIVE":
|
||||||
|
return int64(0), nil
|
||||||
|
case "ON_PERMISSIVE":
|
||||||
|
return int64(1), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unrecognized gtid_mode: %q", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseValue(value sql.RawBytes) (interface{}, error) {
|
||||||
|
if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.EqualFold(value, []byte("NO")) || bytes.Compare(value, []byte("OFF")) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
|
||||||
|
return val, nil
|
||||||
|
}
|
||||||
|
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
|
||||||
|
return val, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(string(value)) > 0 {
|
||||||
|
return string(value), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("unconvertible value: %q", string(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
var GlobalStatusConversions = map[string]ConversionFunc{
|
||||||
|
"ssl_ctx_verify_depth": ParseInt,
|
||||||
|
"ssl_verify_depth": ParseInt,
|
||||||
|
}
|
||||||
|
|
||||||
|
var GlobalVariableConversions = map[string]ConversionFunc{
|
||||||
|
"gtid_mode": ParseGTIDMode,
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConvertGlobalStatus(key string, value sql.RawBytes) (interface{}, error) {
|
||||||
|
if bytes.Equal(value, []byte("")) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if conv, ok := GlobalStatusConversions[key]; ok {
|
||||||
|
return conv(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ParseValue(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConvertGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
|
||||||
|
if bytes.Equal(value, []byte("")) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if conv, ok := GlobalVariableConversions[key]; ok {
|
||||||
|
return conv(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ParseValue(value)
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
package v2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConvertGlobalStatus(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
value sql.RawBytes
|
||||||
|
expected interface{}
|
||||||
|
expectedErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default",
|
||||||
|
key: "ssl_ctx_verify_depth",
|
||||||
|
value: []byte("0"),
|
||||||
|
expected: int64(0),
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "overflow int64",
|
||||||
|
key: "ssl_ctx_verify_depth",
|
||||||
|
value: []byte("18446744073709551615"),
|
||||||
|
expected: int64(9223372036854775807),
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "defined variable but unset",
|
||||||
|
key: "ssl_ctx_verify_depth",
|
||||||
|
value: []byte(""),
|
||||||
|
expected: nil,
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
actual, err := ConvertGlobalStatus(tt.key, tt.value)
|
||||||
|
require.Equal(t, tt.expectedErr, err)
|
||||||
|
require.Equal(t, tt.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCovertGlobalVariables(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
value sql.RawBytes
|
||||||
|
expected interface{}
|
||||||
|
expectedErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "boolean type mysql<=5.6",
|
||||||
|
key: "gtid_mode",
|
||||||
|
value: []byte("ON"),
|
||||||
|
expected: int64(1),
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "enum type mysql>=5.7",
|
||||||
|
key: "gtid_mode",
|
||||||
|
value: []byte("ON_PERMISSIVE"),
|
||||||
|
expected: int64(1),
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "defined variable but unset",
|
||||||
|
key: "ssl_ctx_verify_depth",
|
||||||
|
value: []byte(""),
|
||||||
|
expected: nil,
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
actual, err := ConvertGlobalVariables(tt.key, tt.value)
|
||||||
|
require.Equal(t, tt.expectedErr, err)
|
||||||
|
require.Equal(t, tt.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue