1package nsqadmin 2 3import ( 4 "bytes" 5 "crypto/tls" 6 "crypto/x509" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io/ioutil" 11 "log" 12 "net" 13 "net/http" 14 "net/url" 15 "os" 16 "path" 17 "sync" 18 "sync/atomic" 19 20 "github.com/nsqio/nsq/internal/http_api" 21 "github.com/nsqio/nsq/internal/util" 22 "github.com/nsqio/nsq/internal/version" 23) 24 25type NSQAdmin struct { 26 sync.RWMutex 27 opts atomic.Value 28 httpListener net.Listener 29 waitGroup util.WaitGroupWrapper 30 notifications chan *AdminAction 31 graphiteURL *url.URL 32 httpClientTLSConfig *tls.Config 33} 34 35func New(opts *Options) (*NSQAdmin, error) { 36 if opts.Logger == nil { 37 opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) 38 } 39 40 n := &NSQAdmin{ 41 notifications: make(chan *AdminAction), 42 } 43 n.swapOpts(opts) 44 45 if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 { 46 return nil, errors.New("--nsqd-http-address or --lookupd-http-address required") 47 } 48 49 if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 { 50 return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both") 51 } 52 53 if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" { 54 return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert") 55 } 56 57 if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" { 58 return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key") 59 } 60 61 n.httpClientTLSConfig = &tls.Config{ 62 InsecureSkipVerify: opts.HTTPClientTLSInsecureSkipVerify, 63 } 64 if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" { 65 cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey) 66 if err != nil { 67 return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s", 68 opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err) 69 } 70 n.httpClientTLSConfig.Certificates = []tls.Certificate{cert} 71 } 72 if opts.HTTPClientTLSRootCAFile != "" { 73 tlsCertPool := x509.NewCertPool() 74 caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile) 75 if err != nil { 76 return nil, fmt.Errorf("failed to read TLS root CA file %s - %s", 77 opts.HTTPClientTLSRootCAFile, err) 78 } 79 if !tlsCertPool.AppendCertsFromPEM(caCertFile) { 80 return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile) 81 } 82 n.httpClientTLSConfig.RootCAs = tlsCertPool 83 } 84 85 for _, address := range opts.NSQLookupdHTTPAddresses { 86 _, err := net.ResolveTCPAddr("tcp", address) 87 if err != nil { 88 return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err) 89 } 90 } 91 92 for _, address := range opts.NSQDHTTPAddresses { 93 _, err := net.ResolveTCPAddr("tcp", address) 94 if err != nil { 95 return nil, fmt.Errorf("failed to resolve --nsqd-http-address (%s) - %s", address, err) 96 } 97 } 98 99 if opts.ProxyGraphite { 100 url, err := url.Parse(opts.GraphiteURL) 101 if err != nil { 102 return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err) 103 os.Exit(1) 104 } 105 n.graphiteURL = url 106 } 107 108 if opts.AllowConfigFromCIDR != "" { 109 _, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR) 110 if err != nil { 111 return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err) 112 } 113 } 114 115 opts.BasePath = normalizeBasePath(opts.BasePath) 116 117 n.logf(LOG_INFO, version.String("nsqadmin")) 118 119 var err error 120 n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) 121 if err != nil { 122 return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err) 123 } 124 125 return n, nil 126} 127 128func normalizeBasePath(p string) string { 129 if len(p) == 0 { 130 return "/" 131 } 132 // add leading slash 133 if p[0] != '/' { 134 p = "/" + p 135 } 136 return path.Clean(p) 137} 138 139func (n *NSQAdmin) getOpts() *Options { 140 return n.opts.Load().(*Options) 141} 142 143func (n *NSQAdmin) swapOpts(opts *Options) { 144 n.opts.Store(opts) 145} 146 147func (n *NSQAdmin) RealHTTPAddr() *net.TCPAddr { 148 return n.httpListener.Addr().(*net.TCPAddr) 149} 150 151func (n *NSQAdmin) handleAdminActions() { 152 for action := range n.notifications { 153 content, err := json.Marshal(action) 154 if err != nil { 155 n.logf(LOG_ERROR, "failed to serialize admin action - %s", err) 156 } 157 httpclient := &http.Client{ 158 Transport: http_api.NewDeadlineTransport(n.getOpts().HTTPClientConnectTimeout, n.getOpts().HTTPClientRequestTimeout), 159 } 160 n.logf(LOG_INFO, "POSTing notification to %s", n.getOpts().NotificationHTTPEndpoint) 161 resp, err := httpclient.Post(n.getOpts().NotificationHTTPEndpoint, 162 "application/json", bytes.NewBuffer(content)) 163 if err != nil { 164 n.logf(LOG_ERROR, "failed to POST notification - %s", err) 165 } 166 resp.Body.Close() 167 } 168} 169 170func (n *NSQAdmin) Main() error { 171 exitCh := make(chan error) 172 var once sync.Once 173 exitFunc := func(err error) { 174 once.Do(func() { 175 if err != nil { 176 n.logf(LOG_FATAL, "%s", err) 177 } 178 exitCh <- err 179 }) 180 } 181 182 httpServer := NewHTTPServer(n) 183 n.waitGroup.Wrap(func() { 184 exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)) 185 }) 186 n.waitGroup.Wrap(n.handleAdminActions) 187 188 err := <-exitCh 189 return err 190} 191 192func (n *NSQAdmin) Exit() { 193 if n.httpListener != nil { 194 n.httpListener.Close() 195 } 196 close(n.notifications) 197 n.waitGroup.Wait() 198} 199