Add Suricata input plugin (#3145)

This commit is contained in:
Sascha Steinbiss 2019-09-21 00:35:21 +02:00 committed by Daniel Nelson
parent f669ef4452
commit d2d6f1ab21
6 changed files with 870 additions and 0 deletions

View File

@ -140,6 +140,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/stackdriver" _ "github.com/influxdata/telegraf/plugins/inputs/stackdriver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/suricata"
_ "github.com/influxdata/telegraf/plugins/inputs/swap" _ "github.com/influxdata/telegraf/plugins/inputs/swap"
_ "github.com/influxdata/telegraf/plugins/inputs/syslog" _ "github.com/influxdata/telegraf/plugins/inputs/syslog"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat"

View File

@ -0,0 +1,129 @@
# Suricata plugin for Telegraf
This plugin reports internal performance counters of the Suricata IDS/IPS
engine, such as captured traffic volume, memory usage, uptime, flow counters,
and much more. It provides a socket for the Suricata log output to write JSON
stats output to, and processes the incoming data to fit Telegraf's format.
### Configuration:
```toml
[[input.suricata]]
## Data sink for Suricata stats log.
# This is expected to be a filename of a
# unix socket to be created for listening.
source = "/var/run/suricata-stats.sock"
# Delimiter for flattening field keys, e.g. subitem "alert" of "detect"
# becomes "detect_alert" when delimiter is "_".
delimiter = "_"
```
### Measurements & Fields:
Fields in the 'suricata' measurement follow the JSON format used by Suricata's
stats output.
See http://suricata.readthedocs.io/en/latest/performance/statistics.html for
more information.
All fields are numeric.
- suricata
- app_layer_flow_dcerpc_udp
- app_layer_flow_dns_tcp
- app_layer_flow_dns_udp
- app_layer_flow_enip_udp
- app_layer_flow_failed_tcp
- app_layer_flow_failed_udp
- app_layer_flow_http
- app_layer_flow_ssh
- app_layer_flow_tls
- app_layer_tx_dns_tcp
- app_layer_tx_dns_udp
- app_layer_tx_enip_udp
- app_layer_tx_http
- app_layer_tx_smtp
- capture_kernel_drops
- capture_kernel_packets
- decoder_avg_pkt_size
- decoder_bytes
- decoder_ethernet
- decoder_gre
- decoder_icmpv4
- decoder_icmpv4_ipv4_unknown_ver
- decoder_icmpv6
- decoder_invalid
- decoder_ipv4
- decoder_ipv6
- decoder_max_pkt_size
- decoder_pkts
- decoder_tcp
- decoder_tcp_hlen_too_small
- decoder_tcp_invalid_optlen
- decoder_teredo
- decoder_udp
- decoder_vlan
- detect_alert
- dns_memcap_global
- dns_memuse
- flow_memuse
- flow_mgr_closed_pruned
- flow_mgr_est_pruned
- flow_mgr_flows_checked
- flow_mgr_flows_notimeout
- flow_mgr_flows_removed
- flow_mgr_flows_timeout
- flow_mgr_flows_timeout_inuse
- flow_mgr_new_pruned
- flow_mgr_rows_checked
- flow_mgr_rows_empty
- flow_mgr_rows_maxlen
- flow_mgr_rows_skipped
- flow_spare
- flow_tcp_reuse
- http_memuse
- tcp_memuse
- tcp_pseudo
- tcp_reassembly_gap
- tcp_reassembly_memuse
- tcp_rst
- tcp_sessions
- tcp_syn
- tcp_synack
- ...
### Tags:
The `suricata` measurement has the following tags:
- thread: `Global` for global statistics (if enabled), thread IDs (e.g. `W#03-enp0s31f6`) for thread-specific statistics
## Suricata configuration
Suricata needs to deliver the 'stats' event type to a given unix socket for
this plugin to pick up. This can be done, for example, by creating an additional
output in the Suricata configuration file:
```yaml
- eve-log:
enabled: yes
filetype: unix_stream
filename: /tmp/suricata-stats.sock
types:
- stats:
threads: yes
```
## Example Output:
```text
suricata,host=myhost,thread=FM#01 flow_mgr_rows_empty=0,flow_mgr_rows_checked=65536,flow_mgr_closed_pruned=0,flow_emerg_mode_over=0,flow_mgr_flows_timeout_inuse=0,flow_mgr_rows_skipped=65535,flow_mgr_bypassed_pruned=0,flow_mgr_flows_removed=0,flow_mgr_est_pruned=0,flow_mgr_flows_notimeout=1,flow_mgr_flows_checked=1,flow_mgr_rows_busy=0,flow_spare=10000,flow_mgr_rows_maxlen=1,flow_mgr_new_pruned=0,flow_emerg_mode_entered=0,flow_tcp_reuse=0,flow_mgr_flows_timeout=0 1568368562545197545
suricata,host=myhost,thread=W#04-wlp4s0 decoder_ltnull_pkt_too_small=0,decoder_ipraw_invalid_ip_version=0,defrag_ipv4_reassembled=0,tcp_no_flow=0,app_layer_flow_tls=1,decoder_udp=25,defrag_ipv6_fragments=0,defrag_ipv4_fragments=0,decoder_tcp=59,decoder_vlan=0,decoder_pkts=84,decoder_vlan_qinq=0,decoder_avg_pkt_size=574,flow_memcap=0,defrag_max_frag_hits=0,tcp_ssn_memcap_drop=0,capture_kernel_packets=84,app_layer_flow_dcerpc_udp=0,app_layer_tx_dns_tcp=0,tcp_rst=0,decoder_icmpv4=0,app_layer_tx_tls=0,decoder_ipv4=84,decoder_erspan=0,decoder_ltnull_unsupported_type=0,decoder_invalid=0,app_layer_flow_ssh=0,capture_kernel_drops=0,app_layer_flow_ftp=0,app_layer_tx_http=0,tcp_pseudo_failed=0,defrag_ipv6_reassembled=0,defrag_ipv6_timeouts=0,tcp_pseudo=0,tcp_sessions=1,decoder_ethernet=84,decoder_raw=0,decoder_sctp=0,app_layer_flow_dns_udp=1,decoder_gre=0,app_layer_flow_http=0,app_layer_flow_imap=0,tcp_segment_memcap_drop=0,detect_alert=0,app_layer_flow_failed_tcp=0,decoder_teredo=0,decoder_mpls=0,decoder_ppp=0,decoder_max_pkt_size=1422,decoder_ipv6=0,tcp_reassembly_gap=0,app_layer_flow_dcerpc_tcp=0,decoder_ipv4_in_ipv6=0,tcp_stream_depth_reached=0,app_layer_flow_dns_tcp=0,app_layer_flow_smtp=0,tcp_syn=1,decoder_sll=0,tcp_invalid_checksum=0,app_layer_tx_dns_udp=1,decoder_bytes=48258,defrag_ipv4_timeouts=0,app_layer_flow_msn=0,decoder_pppoe=0,decoder_null=0,app_layer_flow_failed_udp=3,app_layer_tx_smtp=0,decoder_icmpv6=0,decoder_ipv6_in_ipv6=0,tcp_synack=1,app_layer_flow_smb=0,decoder_dce_pkt_too_small=0 1568368562545174807
suricata,host=myhost,thread=W#01-wlp4s0 tcp_synack=0,app_layer_flow_imap=0,decoder_ipv4_in_ipv6=0,decoder_max_pkt_size=684,decoder_gre=0,defrag_ipv4_timeouts=0,tcp_invalid_checksum=0,decoder_ipv4=53,flow_memcap=0,app_layer_tx_http=0,app_layer_tx_smtp=0,decoder_null=0,tcp_no_flow=0,app_layer_tx_tls=0,app_layer_flow_ssh=0,app_layer_flow_smtp=0,decoder_pppoe=0,decoder_teredo=0,decoder_ipraw_invalid_ip_version=0,decoder_ltnull_pkt_too_small=0,tcp_rst=0,decoder_ppp=0,decoder_ipv6=29,app_layer_flow_dns_udp=3,decoder_vlan=0,app_layer_flow_dcerpc_tcp=0,tcp_syn=0,defrag_ipv4_fragments=0,defrag_ipv6_timeouts=0,decoder_raw=0,defrag_ipv6_reassembled=0,tcp_reassembly_gap=0,tcp_sessions=0,decoder_udp=44,tcp_segment_memcap_drop=0,app_layer_tx_dns_udp=3,app_layer_flow_tls=0,decoder_tcp=37,defrag_ipv4_reassembled=0,app_layer_flow_failed_udp=6,app_layer_flow_ftp=0,decoder_icmpv6=1,tcp_stream_depth_reached=0,capture_kernel_drops=0,decoder_sll=0,decoder_bytes=15883,decoder_ethernet=91,tcp_pseudo=0,app_layer_flow_http=0,decoder_sctp=0,decoder_pkts=91,decoder_avg_pkt_size=174,decoder_erspan=0,app_layer_flow_msn=0,app_layer_flow_smb=0,capture_kernel_packets=91,decoder_icmpv4=0,decoder_ipv6_in_ipv6=0,tcp_ssn_memcap_drop=0,decoder_vlan_qinq=0,decoder_ltnull_unsupported_type=0,decoder_invalid=0,defrag_max_frag_hits=0,tcp_pseudo_failed=0,detect_alert=0,app_layer_tx_dns_tcp=0,app_layer_flow_failed_tcp=0,app_layer_flow_dcerpc_udp=0,app_layer_flow_dns_tcp=0,defrag_ipv6_fragments=0,decoder_mpls=0,decoder_dce_pkt_too_small=0 1568368562545148438
suricata,host=myhost flow_memuse=7094464,tcp_memuse=3276800,tcp_reassembly_memuse=12332832,dns_memuse=0,dns_memcap_state=0,dns_memcap_global=0,http_memuse=0,http_memcap=0 1568368562545144569
suricata,host=myhost,thread=W#07-wlp4s0 app_layer_tx_http=0,app_layer_tx_dns_tcp=0,decoder_vlan=0,decoder_pppoe=0,decoder_sll=0,decoder_tcp=0,flow_memcap=0,app_layer_flow_msn=0,tcp_no_flow=0,tcp_rst=0,tcp_segment_memcap_drop=0,tcp_sessions=0,detect_alert=0,defrag_ipv6_reassembled=0,decoder_ipraw_invalid_ip_version=0,decoder_erspan=0,decoder_icmpv4=0,app_layer_tx_dns_udp=2,decoder_ltnull_pkt_too_small=0,decoder_bytes=1998,decoder_ipv6=1,defrag_ipv4_fragments=0,defrag_ipv6_fragments=0,app_layer_tx_smtp=0,decoder_ltnull_unsupported_type=0,decoder_max_pkt_size=342,app_layer_flow_ftp=0,decoder_ipv6_in_ipv6=0,defrag_ipv4_reassembled=0,defrag_ipv6_timeouts=0,app_layer_flow_dns_tcp=0,decoder_avg_pkt_size=181,defrag_ipv4_timeouts=0,tcp_stream_depth_reached=0,decoder_mpls=0,app_layer_flow_dns_udp=2,tcp_ssn_memcap_drop=0,app_layer_flow_dcerpc_tcp=0,app_layer_flow_failed_udp=2,app_layer_flow_smb=0,app_layer_flow_failed_tcp=0,decoder_invalid=0,decoder_null=0,decoder_gre=0,decoder_ethernet=11,app_layer_flow_ssh=0,defrag_max_frag_hits=0,capture_kernel_drops=0,tcp_pseudo_failed=0,app_layer_flow_smtp=0,decoder_udp=10,decoder_sctp=0,decoder_teredo=0,decoder_icmpv6=1,tcp_pseudo=0,tcp_synack=0,app_layer_tx_tls=0,app_layer_flow_imap=0,capture_kernel_packets=11,decoder_pkts=11,decoder_raw=0,decoder_ppp=0,tcp_syn=0,tcp_invalid_checksum=0,app_layer_flow_tls=0,decoder_ipv4_in_ipv6=0,app_layer_flow_http=0,decoder_dce_pkt_too_small=0,decoder_ipv4=10,decoder_vlan_qinq=0,tcp_reassembly_gap=0,app_layer_flow_dcerpc_udp=0 1568368562545110847
suricata,host=myhost,thread=W#06-wlp4s0 app_layer_tx_smtp=0,decoder_ipv6_in_ipv6=0,decoder_dce_pkt_too_small=0,tcp_segment_memcap_drop=0,tcp_sessions=1,decoder_ppp=0,tcp_pseudo_failed=0,app_layer_tx_dns_tcp=0,decoder_invalid=0,defrag_ipv4_timeouts=0,app_layer_flow_smb=0,app_layer_flow_ssh=0,decoder_bytes=19407,decoder_null=0,app_layer_flow_tls=1,decoder_avg_pkt_size=473,decoder_pkts=41,decoder_pppoe=0,decoder_tcp=32,defrag_ipv4_reassembled=0,tcp_reassembly_gap=0,decoder_raw=0,flow_memcap=0,defrag_ipv6_timeouts=0,app_layer_flow_smtp=0,app_layer_tx_http=0,decoder_sll=0,decoder_udp=8,decoder_ltnull_pkt_too_small=0,decoder_ltnull_unsupported_type=0,decoder_ipv4_in_ipv6=0,decoder_vlan=0,decoder_max_pkt_size=1422,tcp_no_flow=0,app_layer_flow_failed_tcp=0,app_layer_flow_dns_tcp=0,app_layer_flow_ftp=0,decoder_icmpv4=0,defrag_max_frag_hits=0,tcp_rst=0,app_layer_flow_msn=0,app_layer_flow_failed_udp=2,app_layer_flow_dns_udp=0,app_layer_flow_dcerpc_udp=0,decoder_ipv4=39,decoder_ethernet=41,defrag_ipv6_reassembled=0,tcp_ssn_memcap_drop=0,app_layer_tx_tls=0,decoder_gre=0,decoder_vlan_qinq=0,tcp_pseudo=0,app_layer_flow_imap=0,app_layer_flow_dcerpc_tcp=0,defrag_ipv4_fragments=0,defrag_ipv6_fragments=0,tcp_synack=1,app_layer_flow_http=0,app_layer_tx_dns_udp=0,capture_kernel_packets=41,decoder_ipv6=2,tcp_invalid_checksum=0,tcp_stream_depth_reached=0,decoder_ipraw_invalid_ip_version=0,decoder_icmpv6=1,tcp_syn=1,detect_alert=0,capture_kernel_drops=0,decoder_teredo=0,decoder_erspan=0,decoder_sctp=0,decoder_mpls=0 1568368562545084670
suricata,host=myhost,thread=W#02-wlp4s0 decoder_tcp=53,tcp_rst=3,tcp_reassembly_gap=0,defrag_ipv6_timeouts=0,tcp_ssn_memcap_drop=0,app_layer_flow_dcerpc_tcp=0,decoder_max_pkt_size=1422,decoder_ipv6_in_ipv6=0,tcp_no_flow=0,app_layer_flow_ftp=0,app_layer_flow_ssh=0,decoder_pkts=82,decoder_sctp=0,tcp_invalid_checksum=0,app_layer_flow_dns_tcp=0,decoder_ipraw_invalid_ip_version=0,decoder_bytes=26441,decoder_erspan=0,tcp_pseudo_failed=0,tcp_syn=1,app_layer_tx_http=0,app_layer_tx_smtp=0,decoder_teredo=0,decoder_ipv4=80,defrag_ipv4_fragments=0,tcp_stream_depth_reached=0,app_layer_flow_smb=0,capture_kernel_packets=82,decoder_null=0,decoder_ltnull_pkt_too_small=0,decoder_ppp=0,decoder_icmpv6=1,app_layer_flow_dns_udp=2,app_layer_flow_http=0,app_layer_tx_dns_udp=3,decoder_mpls=0,decoder_sll=0,defrag_ipv4_reassembled=0,tcp_segment_memcap_drop=0,app_layer_flow_imap=0,decoder_ltnull_unsupported_type=0,decoder_icmpv4=0,decoder_raw=0,defrag_ipv4_timeouts=0,app_layer_flow_failed_udp=8,decoder_gre=0,capture_kernel_drops=0,defrag_ipv6_reassembled=0,tcp_pseudo=0,app_layer_flow_tls=1,decoder_avg_pkt_size=322,decoder_dce_pkt_too_small=0,decoder_ethernet=82,defrag_ipv6_fragments=0,tcp_sessions=1,tcp_synack=1,app_layer_tx_dns_tcp=0,decoder_vlan=0,flow_memcap=0,decoder_vlan_qinq=0,decoder_udp=28,decoder_invalid=0,detect_alert=0,app_layer_flow_failed_tcp=0,app_layer_tx_tls=0,decoder_pppoe=0,decoder_ipv6=2,decoder_ipv4_in_ipv6=0,defrag_max_frag_hits=0,app_layer_flow_dcerpc_udp=0,app_layer_flow_smtp=0,app_layer_flow_msn=0 1568368562545061864
suricata,host=myhost,thread=W#08-wlp4s0 decoder_dce_pkt_too_small=0,app_layer_tx_dns_tcp=0,decoder_pkts=58,decoder_ppp=0,decoder_raw=0,decoder_ipv4_in_ipv6=0,decoder_max_pkt_size=1392,tcp_invalid_checksum=0,tcp_syn=0,decoder_ipv4=51,decoder_ipv6_in_ipv6=0,decoder_tcp=0,decoder_ltnull_pkt_too_small=0,flow_memcap=0,decoder_udp=58,tcp_ssn_memcap_drop=0,tcp_pseudo=0,app_layer_flow_dcerpc_udp=0,app_layer_flow_dns_udp=5,app_layer_tx_http=0,capture_kernel_drops=0,decoder_vlan=0,tcp_segment_memcap_drop=0,app_layer_flow_ftp=0,app_layer_flow_imap=0,app_layer_flow_http=0,app_layer_flow_tls=0,decoder_icmpv4=0,decoder_sctp=0,defrag_ipv4_timeouts=0,tcp_reassembly_gap=0,detect_alert=0,decoder_ethernet=58,tcp_pseudo_failed=0,decoder_teredo=0,defrag_ipv4_reassembled=0,tcp_sessions=0,app_layer_flow_msn=0,decoder_ipraw_invalid_ip_version=0,tcp_no_flow=0,app_layer_flow_dns_tcp=0,decoder_null=0,defrag_ipv4_fragments=0,app_layer_flow_dcerpc_tcp=0,app_layer_flow_failed_udp=8,app_layer_tx_tls=0,decoder_bytes=15800,decoder_ipv6=7,tcp_stream_depth_reached=0,decoder_invalid=0,decoder_ltnull_unsupported_type=0,app_layer_tx_dns_udp=6,decoder_pppoe=0,decoder_avg_pkt_size=272,decoder_erspan=0,defrag_ipv6_timeouts=0,app_layer_flow_failed_tcp=0,decoder_gre=0,decoder_sll=0,defrag_max_frag_hits=0,app_layer_flow_ssh=0,capture_kernel_packets=58,decoder_mpls=0,decoder_vlan_qinq=0,tcp_rst=0,app_layer_flow_smb=0,app_layer_tx_smtp=0,decoder_icmpv6=0,defrag_ipv6_fragments=0,defrag_ipv6_reassembled=0,tcp_synack=0,app_layer_flow_smtp=0 1568368562545035575
suricata,host=myhost,thread=W#05-wlp4s0 tcp_reassembly_gap=0,capture_kernel_drops=0,decoder_ltnull_unsupported_type=0,tcp_sessions=0,tcp_stream_depth_reached=0,tcp_pseudo_failed=0,app_layer_flow_failed_tcp=0,app_layer_tx_dns_tcp=0,decoder_null=0,decoder_dce_pkt_too_small=0,decoder_udp=7,tcp_rst=3,app_layer_flow_dns_tcp=0,decoder_invalid=0,defrag_ipv4_reassembled=0,tcp_synack=0,app_layer_flow_ftp=0,decoder_bytes=3117,decoder_pppoe=0,app_layer_flow_dcerpc_tcp=0,app_layer_flow_smb=0,decoder_ipv6_in_ipv6=0,decoder_ipraw_invalid_ip_version=0,app_layer_flow_imap=0,app_layer_tx_dns_udp=2,decoder_ppp=0,decoder_ipv4=21,decoder_tcp=14,flow_memcap=0,tcp_syn=0,tcp_invalid_checksum=0,decoder_teredo=0,decoder_ltnull_pkt_too_small=0,defrag_max_frag_hits=0,app_layer_tx_tls=0,decoder_pkts=24,decoder_sll=0,defrag_ipv6_fragments=0,app_layer_flow_dcerpc_udp=0,app_layer_flow_smtp=0,decoder_icmpv6=3,defrag_ipv6_timeouts=0,decoder_ipv6=3,decoder_raw=0,defrag_ipv6_reassembled=0,tcp_no_flow=0,detect_alert=0,app_layer_flow_tls=0,decoder_ethernet=24,decoder_vlan=0,decoder_icmpv4=0,decoder_ipv4_in_ipv6=0,app_layer_flow_failed_udp=1,decoder_mpls=0,decoder_max_pkt_size=653,decoder_sctp=0,defrag_ipv4_timeouts=0,tcp_ssn_memcap_drop=0,app_layer_flow_dns_udp=1,app_layer_tx_smtp=0,capture_kernel_packets=24,decoder_vlan_qinq=0,decoder_gre=0,app_layer_flow_ssh=0,app_layer_flow_msn=0,defrag_ipv4_fragments=0,app_layer_flow_http=0,tcp_segment_memcap_drop=0,tcp_pseudo=0,app_layer_tx_http=0,decoder_erspan=0,decoder_avg_pkt_size=129 1568368562545009684
suricata,host=myhost,thread=W#03-wlp4s0 app_layer_flow_failed_tcp=0,decoder_teredo=0,decoder_ipv6_in_ipv6=0,tcp_pseudo_failed=0,tcp_stream_depth_reached=0,tcp_syn=0,decoder_gre=0,tcp_segment_memcap_drop=0,tcp_ssn_memcap_drop=0,app_layer_tx_smtp=0,decoder_raw=0,decoder_ltnull_pkt_too_small=0,tcp_sessions=0,tcp_reassembly_gap=0,app_layer_flow_ssh=0,app_layer_flow_imap=0,decoder_ipv4=463,decoder_ethernet=463,capture_kernel_packets=463,decoder_pppoe=0,defrag_ipv4_reassembled=0,app_layer_flow_tls=0,app_layer_flow_dcerpc_udp=0,app_layer_flow_dns_udp=0,decoder_vlan=0,decoder_ipraw_invalid_ip_version=0,decoder_mpls=0,tcp_no_flow=0,decoder_avg_pkt_size=445,decoder_udp=432,flow_memcap=0,app_layer_tx_dns_udp=0,app_layer_flow_msn=0,app_layer_flow_http=0,app_layer_flow_dcerpc_tcp=0,decoder_ipv6=0,decoder_ipv4_in_ipv6=0,defrag_ipv4_timeouts=0,defrag_ipv4_fragments=0,defrag_ipv6_timeouts=0,decoder_sctp=0,defrag_ipv6_fragments=0,app_layer_flow_dns_tcp=0,app_layer_tx_tls=0,defrag_max_frag_hits=0,decoder_bytes=206345,decoder_vlan_qinq=0,decoder_invalid=0,decoder_ppp=0,tcp_rst=0,detect_alert=0,capture_kernel_drops=0,app_layer_flow_failed_udp=4,decoder_null=0,decoder_icmpv4=0,decoder_icmpv6=0,decoder_ltnull_unsupported_type=0,defrag_ipv6_reassembled=0,tcp_invalid_checksum=0,tcp_synack=0,decoder_tcp=31,tcp_pseudo=0,app_layer_flow_smb=0,app_layer_flow_smtp=0,decoder_max_pkt_size=1463,decoder_dce_pkt_too_small=0,app_layer_tx_http=0,decoder_pkts=463,decoder_sll=0,app_layer_flow_ftp=0,app_layer_tx_dns_tcp=0,decoder_erspan=0 1568368562544966078
```

View File

@ -0,0 +1,229 @@
package suricata
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
// InBufSize is the input buffer size for JSON received via socket.
// Set to 10MB, as depending on the number of threads the output might be
// large.
InBufSize = 10 * 1024 * 1024
)
// Suricata is a Telegraf input plugin for Suricata runtime statistics.
type Suricata struct {
Source string `toml:"source"`
Delimiter string `toml:"delimiter"`
inputListener *net.UnixListener
cancel context.CancelFunc
Log telegraf.Logger `toml:"-"`
wg sync.WaitGroup
}
// Description returns the plugin description.
func (s *Suricata) Description() string {
return "Suricata stats plugin"
}
const sampleConfig = `
## Data sink for Suricata stats log
# This is expected to be a filename of a
# unix socket to be created for listening.
source = "/var/run/suricata-stats.sock"
# Delimiter for flattening field keys, e.g. subitem "alert" of "detect"
# becomes "detect_alert" when delimiter is "_".
delimiter = "_"
`
// SampleConfig returns a sample TOML section to illustrate configuration
// options.
func (s *Suricata) SampleConfig() string {
return sampleConfig
}
// Start initiates background collection of JSON data from the socket
// provided to Suricata.
func (s *Suricata) Start(acc telegraf.Accumulator) error {
var err error
s.inputListener, err = net.ListenUnix("unix", &net.UnixAddr{
Name: s.Source,
Net: "unix",
})
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
s.inputListener.SetUnlinkOnClose(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
go s.handleServerConnection(ctx, acc)
}()
return nil
}
// Stop causes the plugin to cease collecting JSON data from the socket provided
// to Suricata.
func (s *Suricata) Stop() {
s.inputListener.Close()
if s.cancel != nil {
s.cancel()
}
s.wg.Wait()
}
func (s *Suricata) readInput(ctx context.Context, acc telegraf.Accumulator, conn net.Conn) error {
reader := bufio.NewReaderSize(conn, InBufSize)
for {
select {
case <-ctx.Done():
return nil
default:
line, rerr := reader.ReadBytes('\n')
if rerr != nil {
return rerr
} else if len(line) > 0 {
s.parse(acc, line)
}
}
}
}
func (s *Suricata) handleServerConnection(ctx context.Context, acc telegraf.Accumulator) {
var err error
for {
select {
case <-ctx.Done():
return
default:
var conn net.Conn
conn, err = s.inputListener.Accept()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
acc.AddError(err)
}
continue
}
err = s.readInput(ctx, acc, conn)
// we want to handle EOF as an opportunity to wait for a new
// connection -- this could, for example, happen when Suricata is
// restarted while Telegraf is running.
if err != io.EOF {
acc.AddError(err)
return
}
}
}
}
func flexFlatten(outmap map[string]interface{}, field string, v interface{}, delimiter string) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
var err error
if field == "" {
err = flexFlatten(outmap, k, v, delimiter)
} else {
err = flexFlatten(outmap, fmt.Sprintf("%s%s%s", field, delimiter, k), v, delimiter)
}
if err != nil {
return err
}
}
case float64:
outmap[field] = v.(float64)
default:
return fmt.Errorf("Unsupported type %T encountered", t)
}
return nil
}
func (s *Suricata) parse(acc telegraf.Accumulator, sjson []byte) {
// initial parsing
var result map[string]interface{}
err := json.Unmarshal([]byte(sjson), &result)
if err != nil {
acc.AddError(err)
return
}
// check for presence of relevant stats
if _, ok := result["stats"]; !ok {
s.Log.Debug("Input does not contain necessary 'stats' sub-object")
return
}
if _, ok := result["stats"].(map[string]interface{}); !ok {
s.Log.Debug("The 'stats' sub-object does not have required structure")
return
}
fields := make(map[string](map[string]interface{}))
totalmap := make(map[string]interface{})
for k, v := range result["stats"].(map[string]interface{}) {
if k == "threads" {
if v, ok := v.(map[string]interface{}); ok {
for k, t := range v {
outmap := make(map[string]interface{})
if threadStruct, ok := t.(map[string]interface{}); ok {
err = flexFlatten(outmap, "", threadStruct, s.Delimiter)
if err != nil {
s.Log.Debug(err)
// we skip this thread as something did not parse correctly
continue
}
fields[k] = outmap
}
}
} else {
s.Log.Debug("The 'threads' sub-object does not have required structure")
}
} else {
err = flexFlatten(totalmap, k, v, s.Delimiter)
if err != nil {
s.Log.Debug(err.Error())
// we skip this subitem as something did not parse correctly
}
}
}
fields["total"] = totalmap
for k := range fields {
if k == "Global" {
acc.AddFields("suricata", fields[k], nil)
} else {
acc.AddFields("suricata", fields[k], map[string]string{"thread": k})
}
}
}
// Gather measures and submits one full set of telemetry to Telegraf.
// Not used here, submission is completely input-driven.
func (s *Suricata) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("suricata", func() telegraf.Input {
return &Suricata{
Source: "/var/run/suricata-stats.sock",
Delimiter: "_",
}
})
}

View File

@ -0,0 +1,472 @@
package suricata
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
var ex2 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"capture":{"kernel_packets":905344474,"kernel_drops":78355440,"kernel_packets_delta":2376742,"kernel_drops_delta":82049}}}`
var ex3 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": { "foo": { "capture":{"kernel_packets":905344474,"kernel_drops":78355440}}}}}`
var ex4 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": { "W1#en..bar1": { "capture":{"kernel_packets":905344474,"kernel_drops":78355440}}}}}`
var brokenType1 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": { "W1#en..bar1": { "capture":{"kernel_packets":905344474,"kernel_drops": true}}}}}`
var brokenType2 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": { "W1#en..bar1": { "capture":{"kernel_packets":905344474,"kernel_drops": ["foo"]}}}}}`
var brokenType3 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": { "W1#en..bar1": { "capture":{"kernel_packets":905344474,"kernel_drops":"none this time"}}}}}`
var brokenType4 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": { "W1#en..bar1": { "capture":{"kernel_packets":905344474,"kernel_drops":null}}}}}`
var brokenType5 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"foo": null}}`
var brokenStruct1 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats":{"threads": ["foo"]}}`
var brokenStruct2 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats"}`
var brokenStruct3 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats": "foobar"}`
var brokenStruct4 = `{"timestamp":"2017-03-06T07:43:39.000397+0000","event_type":"stats","stats": null}`
var singleDotRegexp = regexp.MustCompilePOSIX(`[^.]\.[^.]`)
func TestSuricataLarge(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Delimiter: ".",
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
data, err := ioutil.ReadFile("testdata/test1.json")
if err != nil {
t.Fatal(err)
}
c, err := net.Dial("unix", tmpfn)
if err != nil {
t.Fatal(err)
}
c.Write([]byte(data))
c.Write([]byte("\n"))
c.Close()
acc.Wait(1)
s.Stop()
}
func TestSuricata(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Delimiter: ".",
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
t.Fatalf("failed: %s", err.Error())
}
c.Write([]byte(ex2))
c.Write([]byte("\n"))
c.Close()
acc.Wait(1)
s.Stop()
s = Suricata{
Source: tmpfn,
Delimiter: ".",
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc.AssertContainsTaggedFields(t, "suricata",
map[string]interface{}{
"capture.kernel_packets": float64(905344474),
"capture.kernel_drops": float64(78355440),
"capture.kernel_packets_delta": float64(2376742),
"capture.kernel_drops_delta": float64(82049),
},
map[string]string{"thread": "total"})
acc = testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err = net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte(""))
c.Write([]byte("\n"))
c.Write([]byte("foobard}\n"))
c.Write([]byte(ex3))
c.Write([]byte("\n"))
c.Close()
acc.Wait(1)
s.Stop()
acc.AssertContainsTaggedFields(t, "suricata",
map[string]interface{}{
"capture.kernel_packets": float64(905344474),
"capture.kernel_drops": float64(78355440),
},
map[string]string{"thread": "foo"})
}
func TestSuricataInvalid(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte("sfjiowef"))
c.Write([]byte("\n"))
c.Close()
acc.WaitError(1)
s.Stop()
}
func splitAtSingleDot(in string) []string {
res := singleDotRegexp.FindAllStringIndex(in, -1)
if res == nil {
return []string{in}
}
ret := make([]string, 0)
startpos := 0
for _, v := range res {
ret = append(ret, in[startpos:v[0]+1])
startpos = v[1] - 1
}
return append(ret, in[startpos:])
}
func TestSuricataSplitDots(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
out := splitAtSingleDot("foo")
if len(out) != 1 {
t.Fatalf("splitting 'foo' should yield one result")
}
if out[0] != "foo" {
t.Fatalf("splitting 'foo' should yield one result, 'foo'")
}
s := Suricata{
Source: tmpfn,
Delimiter: ".",
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte(ex4))
c.Write([]byte("\n"))
c.Close()
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "suricata",
map[string]interface{}{
"capture.kernel_packets": float64(905344474),
"capture.kernel_drops": float64(78355440),
},
map[string]string{"thread": "W1#en..bar1"})
s.Stop()
}
func TestSuricataInvalidPath(t *testing.T) {
tmpfn := fmt.Sprintf("/t%d/X", rand.Int63())
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.Error(t, s.Start(&acc))
}
func TestSuricataTooLongLine(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte(strings.Repeat("X", 20000000)))
c.Write([]byte("\n"))
c.Close()
acc.WaitError(1)
s.Stop()
}
func TestSuricataEmptyJSON(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte("\n"))
c.Close()
acc.WaitError(1)
s.Stop()
}
func TestSuricataInvalidInputs(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
defer func() {
log.SetOutput(os.Stderr)
}()
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
for input, errmsg := range map[string]string{
brokenType1: `Unsupported type bool encountered`,
brokenType2: `Unsupported type []interface {} encountered`,
brokenType3: `Unsupported type string encountered`,
brokenType4: `Unsupported type <nil> encountered`,
brokenType5: `Unsupported type <nil> encountered`,
brokenStruct1: `The 'threads' sub-object does not have required structure`,
brokenStruct2: `Input does not contain necessary 'stats' sub-object`,
brokenStruct3: `The 'stats' sub-object does not have required structure`,
brokenStruct4: `The 'stats' sub-object does not have required structure`,
} {
var logBuf buffer
logBuf.Reset()
log.SetOutput(&logBuf)
acc := testutil.Accumulator{}
acc.SetDebug(true)
s := Suricata{
Source: tmpfn,
Delimiter: ".",
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
t.Fatal(err)
}
c.Write([]byte(input))
c.Write([]byte("\n"))
c.Close()
for {
if bytes.Count(logBuf.Bytes(), []byte{'\n'}) > 0 {
break
}
time.Sleep(50 * time.Millisecond)
}
assert.Contains(t, logBuf.String(), errmsg)
s.Stop()
}
}
func TestSuricataDisconnectSocket(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
c, err := net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte(ex2))
c.Write([]byte("\n"))
c.Close()
c, err = net.Dial("unix", tmpfn)
if err != nil {
log.Println(err)
}
c.Write([]byte(ex3))
c.Write([]byte("\n"))
c.Close()
acc.Wait(2)
s.Stop()
}
func TestSuricataPluginDesc(t *testing.T) {
v, ok := inputs.Inputs["suricata"]
if !ok {
t.Fatal("suricata plugin not registered")
}
desc := v().Description()
if desc != "Suricata stats plugin" {
t.Fatal("invalid description ", desc)
}
}
func TestSuricataStartStop(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Start(&acc))
s.Stop()
}
func TestSuricataGather(t *testing.T) {
dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))
s := Suricata{
Source: tmpfn,
Log: testutil.Logger{
Name: "inputs.suricata",
},
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, s.Gather(&acc))
}
func TestSuricataSampleConfig(t *testing.T) {
v, ok := inputs.Inputs["suricata"]
if !ok {
t.Fatal("suricata plugin not registered")
}
if v().SampleConfig() != sampleConfig {
t.Fatal("wrong sampleconfig")
}
}

View File

@ -0,0 +1,38 @@
package suricata
import (
"bytes"
"sync"
)
// A thread-safe Buffer wrapper to enable concurrent access to log output.
type buffer struct {
b bytes.Buffer
m sync.Mutex
}
func (b *buffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}
func (b *buffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
func (b *buffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}
func (b *buffer) Reset() {
b.m.Lock()
defer b.m.Unlock()
b.b.Reset()
}
func (b *buffer) Bytes() []byte {
b.m.Lock()
defer b.m.Unlock()
return b.b.Bytes()
}

File diff suppressed because one or more lines are too long