1package dependency 2 3import ( 4 "encoding/gob" 5 "fmt" 6 "log" 7 "net/url" 8 "regexp" 9 "sort" 10 "strings" 11 12 "github.com/hashicorp/consul/api" 13 "github.com/pkg/errors" 14) 15 16const ( 17 HealthAny = "any" 18 HealthPassing = "passing" 19 HealthWarning = "warning" 20 HealthCritical = "critical" 21 HealthMaint = "maintenance" 22 23 NodeMaint = "_node_maintenance" 24 ServiceMaint = "_service_maintenance:" 25) 26 27var ( 28 // Ensure implements 29 _ Dependency = (*HealthServiceQuery)(nil) 30 31 // HealthServiceQueryRe is the regular expression to use. 32 HealthServiceQueryRe = regexp.MustCompile(`\A` + tagRe + serviceNameRe + dcRe + nearRe + filterRe + `\z`) 33) 34 35func init() { 36 gob.Register([]*HealthService{}) 37} 38 39// HealthService is a service entry in Consul. 40type HealthService struct { 41 Node string 42 NodeID string 43 NodeAddress string 44 NodeTaggedAddresses map[string]string 45 NodeMeta map[string]string 46 ServiceMeta map[string]string 47 Address string 48 ID string 49 Name string 50 Tags ServiceTags 51 Checks api.HealthChecks 52 Status string 53 Port int 54 Weights api.AgentWeights 55} 56 57// HealthServiceQuery is the representation of all a service query in Consul. 58type HealthServiceQuery struct { 59 stopCh chan struct{} 60 61 dc string 62 filters []string 63 name string 64 near string 65 tag string 66 connect bool 67} 68 69// NewHealthServiceQuery processes the strings to build a service dependency. 70func NewHealthServiceQuery(s string) (*HealthServiceQuery, error) { 71 return healthServiceQuery(s, false) 72} 73 74// NewHealthConnect Query processes the strings to build a connect dependency. 75func NewHealthConnectQuery(s string) (*HealthServiceQuery, error) { 76 return healthServiceQuery(s, true) 77} 78 79func healthServiceQuery(s string, connect bool) (*HealthServiceQuery, error) { 80 if !HealthServiceQueryRe.MatchString(s) { 81 return nil, fmt.Errorf("health.service: invalid format: %q", s) 82 } 83 84 m := regexpMatch(HealthServiceQueryRe, s) 85 86 var filters []string 87 if filter := m["filter"]; filter != "" { 88 split := strings.Split(filter, ",") 89 for _, f := range split { 90 f = strings.TrimSpace(f) 91 switch f { 92 case HealthAny, 93 HealthPassing, 94 HealthWarning, 95 HealthCritical, 96 HealthMaint: 97 filters = append(filters, f) 98 case "": 99 default: 100 return nil, fmt.Errorf( 101 "health.service: invalid filter: %q in %q", f, s) 102 } 103 } 104 sort.Strings(filters) 105 } else { 106 filters = []string{HealthPassing} 107 } 108 109 return &HealthServiceQuery{ 110 stopCh: make(chan struct{}, 1), 111 dc: m["dc"], 112 filters: filters, 113 name: m["name"], 114 near: m["near"], 115 tag: m["tag"], 116 connect: connect, 117 }, nil 118} 119 120// Fetch queries the Consul API defined by the given client and returns a slice 121// of HealthService objects. 122func (d *HealthServiceQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) { 123 select { 124 case <-d.stopCh: 125 return nil, nil, ErrStopped 126 default: 127 } 128 129 opts = opts.Merge(&QueryOptions{ 130 Datacenter: d.dc, 131 Near: d.near, 132 }) 133 134 u := &url.URL{ 135 Path: "/v1/health/service/" + d.name, 136 RawQuery: opts.String(), 137 } 138 if d.tag != "" { 139 q := u.Query() 140 q.Set("tag", d.tag) 141 u.RawQuery = q.Encode() 142 } 143 log.Printf("[TRACE] %s: GET %s", d, u) 144 145 // Check if a user-supplied filter was given. If so, we may be querying for 146 // more than healthy services, so we need to implement client-side 147 // filtering. 148 passingOnly := len(d.filters) == 1 && d.filters[0] == HealthPassing 149 150 nodes := clients.Consul().Health().Service 151 if d.connect { 152 nodes = clients.Consul().Health().Connect 153 } 154 entries, qm, err := nodes(d.name, d.tag, passingOnly, opts.ToConsulOpts()) 155 if err != nil { 156 return nil, nil, errors.Wrap(err, d.String()) 157 } 158 159 log.Printf("[TRACE] %s: returned %d results", d, len(entries)) 160 161 list := make([]*HealthService, 0, len(entries)) 162 for _, entry := range entries { 163 // Get the status of this service from its checks. 164 status := entry.Checks.AggregatedStatus() 165 166 // If we are not checking only healthy services, filter out services 167 // that do not match the given filter. 168 if !acceptStatus(d.filters, status) { 169 continue 170 } 171 172 // Get the address of the service, falling back to the address of the 173 // node. 174 address := entry.Service.Address 175 if address == "" { 176 address = entry.Node.Address 177 } 178 179 list = append(list, &HealthService{ 180 Node: entry.Node.Node, 181 NodeID: entry.Node.ID, 182 NodeAddress: entry.Node.Address, 183 NodeTaggedAddresses: entry.Node.TaggedAddresses, 184 NodeMeta: entry.Node.Meta, 185 ServiceMeta: entry.Service.Meta, 186 Address: address, 187 ID: entry.Service.ID, 188 Name: entry.Service.Service, 189 Tags: ServiceTags( 190 deepCopyAndSortTags(entry.Service.Tags)), 191 Status: status, 192 Checks: entry.Checks, 193 Port: entry.Service.Port, 194 Weights: entry.Service.Weights, 195 }) 196 } 197 198 log.Printf("[TRACE] %s: returned %d results after filtering", d, len(list)) 199 200 // Sort unless the user explicitly asked for nearness 201 if d.near == "" { 202 sort.Stable(ByNodeThenID(list)) 203 } 204 205 rm := &ResponseMetadata{ 206 LastIndex: qm.LastIndex, 207 LastContact: qm.LastContact, 208 } 209 210 return list, rm, nil 211} 212 213// CanShare returns a boolean if this dependency is shareable. 214func (d *HealthServiceQuery) CanShare() bool { 215 return true 216} 217 218// Stop halts the dependency's fetch function. 219func (d *HealthServiceQuery) Stop() { 220 close(d.stopCh) 221} 222 223// String returns the human-friendly version of this dependency. 224func (d *HealthServiceQuery) String() string { 225 name := d.name 226 if d.tag != "" { 227 name = d.tag + "." + name 228 } 229 if d.dc != "" { 230 name = name + "@" + d.dc 231 } 232 if d.near != "" { 233 name = name + "~" + d.near 234 } 235 if len(d.filters) > 0 { 236 name = name + "|" + strings.Join(d.filters, ",") 237 } 238 return fmt.Sprintf("health.service(%s)", name) 239} 240 241// Type returns the type of this dependency. 242func (d *HealthServiceQuery) Type() Type { 243 return TypeConsul 244} 245 246// acceptStatus allows us to check if a slice of health checks pass this filter. 247func acceptStatus(list []string, s string) bool { 248 for _, status := range list { 249 if status == s || status == HealthAny { 250 return true 251 } 252 } 253 return false 254} 255 256// ByNodeThenID is a sortable slice of Service 257type ByNodeThenID []*HealthService 258 259// Len, Swap, and Less are used to implement the sort.Sort interface. 260func (s ByNodeThenID) Len() int { return len(s) } 261func (s ByNodeThenID) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 262func (s ByNodeThenID) Less(i, j int) bool { 263 if s[i].Node < s[j].Node { 264 return true 265 } else if s[i].Node == s[j].Node { 266 return s[i].ID <= s[j].ID 267 } 268 return false 269} 270