Ensure admin interface shuts down with Telegraf

This commit is contained in:
Tim Raymond 2016-02-10 16:48:00 -05:00
parent f3ff0e8be8
commit 8c81034051
2 changed files with 89 additions and 9 deletions

View File

@ -3,8 +3,10 @@ package enterprise
import ( import (
"fmt" "fmt"
"log" "log"
"net"
"net/http" "net/http"
"os" "os"
"time"
"github.com/influxdata/enterprise-client/v2" "github.com/influxdata/enterprise-client/v2"
"github.com/influxdata/enterprise-client/v2/admin" "github.com/influxdata/enterprise-client/v2/admin"
@ -20,14 +22,17 @@ type Service struct {
logger *log.Logger logger *log.Logger
hostname string hostname string
adminPort string adminPort string
shutdown chan struct{}
} }
func NewEnterprise(c Config, hostname string) *Service { func NewEnterprise(c Config, hostname string, shutdown chan struct{}) *Service {
return &Service{ return &Service{
hosts: c.Hosts, hosts: c.Hosts,
hostname: hostname, hostname: hostname,
logger: log.New(os.Stdout, "[enterprise]", log.Ldate|log.Ltime), logger: log.New(os.Stdout, "[enterprise]", log.Ldate|log.Ltime),
adminPort: fmt.Sprintf(":%d", c.AdminPort), adminPort: fmt.Sprintf(":%d", c.AdminPort),
shutdown: shutdown,
} }
} }
@ -37,22 +42,51 @@ func (s *Service) Open() {
s.logger.Printf("Unable to contact one or more Enterprise hosts. err: %s", err.Error()) s.logger.Printf("Unable to contact one or more Enterprise hosts. err: %s", err.Error())
return return
} }
go s.registerProduct(cl) go func() {
go s.startAdminInterface() token, secret, err := s.registerProduct(cl)
if err == nil {
s.startAdminInterface(token, secret)
}
}()
} }
func (s *Service) registerProduct(cl *client.Client) { func (s *Service) registerProduct(cl *client.Client) (token string, secret string, err error) {
p := client.Product{ p := client.Product{
ProductID: "telegraf", ProductID: "telegraf",
Host: s.hostname, Host: s.hostname,
} }
_, err := cl.Register(&p) _, err = cl.Register(&p)
if err != nil { if err != nil {
s.logger.Println("Unable to register Telegraf with Enterprise") s.logger.Println("Unable to register Telegraf with Enterprise")
} return
} }
func (s *Service) startAdminInterface() { for _, host := range cl.Hosts {
go http.ListenAndServe(s.adminPort, admin.App("foo", []byte("bar"))) if host.Primary {
token = host.Token
secret = host.SecretKey
}
}
return
}
func (s *Service) startAdminInterface(token, secret string) {
srv := &http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
Handler: admin.App(token, []byte(secret)),
}
l, err := net.Listen("tcp", s.adminPort)
if err != nil {
s.logger.Printf("Unable to bind to admin interface port: err: %s", err.Error())
return
}
go srv.Serve(l)
select {
case <-s.shutdown:
s.logger.Printf("Shutting down enterprise admin interface")
l.Close()
}
return
} }

View File

@ -44,7 +44,10 @@ func Test_RegistersWithEnterprise(t *testing.T) {
&client.Host{URL: srv.URL}, &client.Host{URL: srv.URL},
}, },
} }
e := enterprise.NewEnterprise(c, expected)
shutdown := make(chan struct{})
defer close(shutdown)
e := enterprise.NewEnterprise(c, expected, shutdown)
e.Open() e.Open()
timeout := time.After(1 * time.Millisecond) timeout := time.After(1 * time.Millisecond)
@ -75,7 +78,9 @@ func Test_StartsAdminInterface(t *testing.T) {
AdminPort: 2300, AdminPort: 2300,
} }
e := enterprise.NewEnterprise(c, hostname) shutdown := make(chan struct{})
defer close(shutdown)
e := enterprise.NewEnterprise(c, hostname, shutdown)
e.Open() e.Open()
timeout := time.After(1 * time.Millisecond) timeout := time.After(1 * time.Millisecond)
@ -92,3 +97,44 @@ func Test_StartsAdminInterface(t *testing.T) {
} }
} }
} }
func Test_ClosesAdminInterface(t *testing.T) {
hostname := "localhost"
adminPort := 2300
success, srv := mockEnterprise(func(c *client.Product, err error) {})
defer srv.Close()
c := enterprise.Config{
Hosts: []*client.Host{
&client.Host{URL: srv.URL},
},
AdminPort: 2300,
}
shutdown := make(chan struct{})
e := enterprise.NewEnterprise(c, hostname, shutdown)
e.Open()
timeout := time.After(1 * time.Millisecond)
for {
select {
case <-success:
// Ensure that the admin interface is running
_, err := http.Get(fmt.Sprintf("http://%s:%d", hostname, adminPort))
if err != nil {
t.Errorf("Unable to connect to admin interface: err: %s", err)
}
close(shutdown)
// ...and that it's not running after we shut it down
_, err = http.Get(fmt.Sprintf("http://%s:%d", hostname, adminPort))
if err == nil {
t.Errorf("Admin interface continued running after shutdown")
}
return
case <-timeout:
t.Fatal("Expected to receive call to Enterprise API, but received none")
}
}
}