1package nsqadmin
2
3import (
4	"bytes"
5	"crypto/tls"
6	"crypto/x509"
7	"encoding/json"
8	"errors"
9	"fmt"
10	"io/ioutil"
11	"log"
12	"net"
13	"net/http"
14	"net/url"
15	"os"
16	"path"
17	"sync"
18	"sync/atomic"
19
20	"github.com/nsqio/nsq/internal/http_api"
21	"github.com/nsqio/nsq/internal/util"
22	"github.com/nsqio/nsq/internal/version"
23)
24
25type NSQAdmin struct {
26	sync.RWMutex
27	opts                atomic.Value
28	httpListener        net.Listener
29	waitGroup           util.WaitGroupWrapper
30	notifications       chan *AdminAction
31	graphiteURL         *url.URL
32	httpClientTLSConfig *tls.Config
33}
34
35func New(opts *Options) (*NSQAdmin, error) {
36	if opts.Logger == nil {
37		opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
38	}
39
40	n := &NSQAdmin{
41		notifications: make(chan *AdminAction),
42	}
43	n.swapOpts(opts)
44
45	if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 {
46		return nil, errors.New("--nsqd-http-address or --lookupd-http-address required")
47	}
48
49	if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 {
50		return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both")
51	}
52
53	if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" {
54		return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert")
55	}
56
57	if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" {
58		return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key")
59	}
60
61	n.httpClientTLSConfig = &tls.Config{
62		InsecureSkipVerify: opts.HTTPClientTLSInsecureSkipVerify,
63	}
64	if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" {
65		cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey)
66		if err != nil {
67			return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s",
68				opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err)
69		}
70		n.httpClientTLSConfig.Certificates = []tls.Certificate{cert}
71	}
72	if opts.HTTPClientTLSRootCAFile != "" {
73		tlsCertPool := x509.NewCertPool()
74		caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile)
75		if err != nil {
76			return nil, fmt.Errorf("failed to read TLS root CA file %s - %s",
77				opts.HTTPClientTLSRootCAFile, err)
78		}
79		if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
80			return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile)
81		}
82		n.httpClientTLSConfig.RootCAs = tlsCertPool
83	}
84
85	for _, address := range opts.NSQLookupdHTTPAddresses {
86		_, err := net.ResolveTCPAddr("tcp", address)
87		if err != nil {
88			return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err)
89		}
90	}
91
92	for _, address := range opts.NSQDHTTPAddresses {
93		_, err := net.ResolveTCPAddr("tcp", address)
94		if err != nil {
95			return nil, fmt.Errorf("failed to resolve --nsqd-http-address (%s) - %s", address, err)
96		}
97	}
98
99	if opts.ProxyGraphite {
100		url, err := url.Parse(opts.GraphiteURL)
101		if err != nil {
102			return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err)
103			os.Exit(1)
104		}
105		n.graphiteURL = url
106	}
107
108	if opts.AllowConfigFromCIDR != "" {
109		_, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR)
110		if err != nil {
111			return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err)
112		}
113	}
114
115	opts.BasePath = normalizeBasePath(opts.BasePath)
116
117	n.logf(LOG_INFO, version.String("nsqadmin"))
118
119	var err error
120	n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
121	if err != nil {
122		return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
123	}
124
125	return n, nil
126}
127
128func normalizeBasePath(p string) string {
129	if len(p) == 0 {
130		return "/"
131	}
132	// add leading slash
133	if p[0] != '/' {
134		p = "/" + p
135	}
136	return path.Clean(p)
137}
138
139func (n *NSQAdmin) getOpts() *Options {
140	return n.opts.Load().(*Options)
141}
142
143func (n *NSQAdmin) swapOpts(opts *Options) {
144	n.opts.Store(opts)
145}
146
147func (n *NSQAdmin) RealHTTPAddr() *net.TCPAddr {
148	return n.httpListener.Addr().(*net.TCPAddr)
149}
150
151func (n *NSQAdmin) handleAdminActions() {
152	for action := range n.notifications {
153		content, err := json.Marshal(action)
154		if err != nil {
155			n.logf(LOG_ERROR, "failed to serialize admin action - %s", err)
156		}
157		httpclient := &http.Client{
158			Transport: http_api.NewDeadlineTransport(n.getOpts().HTTPClientConnectTimeout, n.getOpts().HTTPClientRequestTimeout),
159		}
160		n.logf(LOG_INFO, "POSTing notification to %s", n.getOpts().NotificationHTTPEndpoint)
161		resp, err := httpclient.Post(n.getOpts().NotificationHTTPEndpoint,
162			"application/json", bytes.NewBuffer(content))
163		if err != nil {
164			n.logf(LOG_ERROR, "failed to POST notification - %s", err)
165		}
166		resp.Body.Close()
167	}
168}
169
170func (n *NSQAdmin) Main() error {
171	exitCh := make(chan error)
172	var once sync.Once
173	exitFunc := func(err error) {
174		once.Do(func() {
175			if err != nil {
176				n.logf(LOG_FATAL, "%s", err)
177			}
178			exitCh <- err
179		})
180	}
181
182	httpServer := NewHTTPServer(n)
183	n.waitGroup.Wrap(func() {
184		exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf))
185	})
186	n.waitGroup.Wrap(n.handleAdminActions)
187
188	err := <-exitCh
189	return err
190}
191
192func (n *NSQAdmin) Exit() {
193	if n.httpListener != nil {
194		n.httpListener.Close()
195	}
196	close(n.notifications)
197	n.waitGroup.Wait()
198}
199