1package addtransport 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "io/ioutil" 9 "net/http" 10 "net/url" 11 "strings" 12 "time" 13 14 "golang.org/x/time/rate" 15 16 stdopentracing "github.com/opentracing/opentracing-go" 17 stdzipkin "github.com/openzipkin/zipkin-go" 18 "github.com/sony/gobreaker" 19 20 "github.com/go-kit/kit/circuitbreaker" 21 "github.com/go-kit/kit/endpoint" 22 "github.com/go-kit/kit/log" 23 "github.com/go-kit/kit/ratelimit" 24 "github.com/go-kit/kit/tracing/opentracing" 25 "github.com/go-kit/kit/tracing/zipkin" 26 "github.com/go-kit/kit/transport" 27 httptransport "github.com/go-kit/kit/transport/http" 28 29 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" 30 "github.com/go-kit/kit/examples/addsvc/pkg/addservice" 31) 32 33// NewHTTPHandler returns an HTTP handler that makes a set of endpoints 34// available on predefined paths. 35func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler { 36 // Zipkin HTTP Server Trace can either be instantiated per endpoint with a 37 // provided operation name or a global tracing service can be instantiated 38 // without an operation name and fed to each Go kit endpoint as ServerOption. 39 // In the latter case, the operation name will be the endpoint's http method. 40 // We demonstrate a global tracing service here. 41 zipkinServer := zipkin.HTTPServerTrace(zipkinTracer) 42 43 options := []httptransport.ServerOption{ 44 httptransport.ServerErrorEncoder(errorEncoder), 45 httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)), 46 zipkinServer, 47 } 48 49 m := http.NewServeMux() 50 m.Handle("/sum", httptransport.NewServer( 51 endpoints.SumEndpoint, 52 decodeHTTPSumRequest, 53 encodeHTTPGenericResponse, 54 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))..., 55 )) 56 m.Handle("/concat", httptransport.NewServer( 57 endpoints.ConcatEndpoint, 58 decodeHTTPConcatRequest, 59 encodeHTTPGenericResponse, 60 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))..., 61 )) 62 return m 63} 64 65// NewHTTPClient returns an AddService backed by an HTTP server living at the 66// remote instance. We expect instance to come from a service discovery system, 67// so likely of the form "host:port". We bake-in certain middlewares, 68// implementing the client library pattern. 69func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) { 70 // Quickly sanitize the instance string. 71 if !strings.HasPrefix(instance, "http") { 72 instance = "http://" + instance 73 } 74 u, err := url.Parse(instance) 75 if err != nil { 76 return nil, err 77 } 78 79 // We construct a single ratelimiter middleware, to limit the total outgoing 80 // QPS from this client to all methods on the remote instance. We also 81 // construct per-endpoint circuitbreaker middlewares to demonstrate how 82 // that's done, although they could easily be combined into a single breaker 83 // for the entire remote instance, too. 84 limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) 85 86 // Zipkin HTTP Client Trace can either be instantiated per endpoint with a 87 // provided operation name or a global tracing client can be instantiated 88 // without an operation name and fed to each Go kit endpoint as ClientOption. 89 // In the latter case, the operation name will be the endpoint's http method. 90 zipkinClient := zipkin.HTTPClientTrace(zipkinTracer) 91 92 // global client middlewares 93 options := []httptransport.ClientOption{ 94 zipkinClient, 95 } 96 97 // Each individual endpoint is an http/transport.Client (which implements 98 // endpoint.Endpoint) that gets wrapped with various middlewares. If you 99 // made your own client library, you'd do this work there, so your server 100 // could rely on a consistent set of client behavior. 101 var sumEndpoint endpoint.Endpoint 102 { 103 sumEndpoint = httptransport.NewClient( 104 "POST", 105 copyURL(u, "/sum"), 106 encodeHTTPGenericRequest, 107 decodeHTTPSumResponse, 108 append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., 109 ).Endpoint() 110 sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) 111 sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) 112 sumEndpoint = limiter(sumEndpoint) 113 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ 114 Name: "Sum", 115 Timeout: 30 * time.Second, 116 }))(sumEndpoint) 117 } 118 119 // The Concat endpoint is the same thing, with slightly different 120 // middlewares to demonstrate how to specialize per-endpoint. 121 var concatEndpoint endpoint.Endpoint 122 { 123 concatEndpoint = httptransport.NewClient( 124 "POST", 125 copyURL(u, "/concat"), 126 encodeHTTPGenericRequest, 127 decodeHTTPConcatResponse, 128 append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., 129 ).Endpoint() 130 concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) 131 concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) 132 concatEndpoint = limiter(concatEndpoint) 133 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ 134 Name: "Concat", 135 Timeout: 10 * time.Second, 136 }))(concatEndpoint) 137 } 138 139 // Returning the endpoint.Set as a service.Service relies on the 140 // endpoint.Set implementing the Service methods. That's just a simple bit 141 // of glue code. 142 return addendpoint.Set{ 143 SumEndpoint: sumEndpoint, 144 ConcatEndpoint: concatEndpoint, 145 }, nil 146} 147 148func copyURL(base *url.URL, path string) *url.URL { 149 next := *base 150 next.Path = path 151 return &next 152} 153 154func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { 155 w.WriteHeader(err2code(err)) 156 json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()}) 157} 158 159func err2code(err error) int { 160 switch err { 161 case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow: 162 return http.StatusBadRequest 163 } 164 return http.StatusInternalServerError 165} 166 167func errorDecoder(r *http.Response) error { 168 var w errorWrapper 169 if err := json.NewDecoder(r.Body).Decode(&w); err != nil { 170 return err 171 } 172 return errors.New(w.Error) 173} 174 175type errorWrapper struct { 176 Error string `json:"error"` 177} 178 179// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a 180// JSON-encoded sum request from the HTTP request body. Primarily useful in a 181// server. 182func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { 183 var req addendpoint.SumRequest 184 err := json.NewDecoder(r.Body).Decode(&req) 185 return req, err 186} 187 188// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a 189// JSON-encoded concat request from the HTTP request body. Primarily useful in a 190// server. 191func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { 192 var req addendpoint.ConcatRequest 193 err := json.NewDecoder(r.Body).Decode(&req) 194 return req, err 195} 196 197// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a 198// JSON-encoded sum response from the HTTP response body. If the response has a 199// non-200 status code, we will interpret that as an error and attempt to decode 200// the specific error message from the response body. Primarily useful in a 201// client. 202func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) { 203 if r.StatusCode != http.StatusOK { 204 return nil, errors.New(r.Status) 205 } 206 var resp addendpoint.SumResponse 207 err := json.NewDecoder(r.Body).Decode(&resp) 208 return resp, err 209} 210 211// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes 212// a JSON-encoded concat response from the HTTP response body. If the response 213// has a non-200 status code, we will interpret that as an error and attempt to 214// decode the specific error message from the response body. Primarily useful in 215// a client. 216func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) { 217 if r.StatusCode != http.StatusOK { 218 return nil, errors.New(r.Status) 219 } 220 var resp addendpoint.ConcatResponse 221 err := json.NewDecoder(r.Body).Decode(&resp) 222 return resp, err 223} 224 225// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that 226// JSON-encodes any request to the request body. Primarily useful in a client. 227func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error { 228 var buf bytes.Buffer 229 if err := json.NewEncoder(&buf).Encode(request); err != nil { 230 return err 231 } 232 r.Body = ioutil.NopCloser(&buf) 233 return nil 234} 235 236// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes 237// the response as JSON to the response writer. Primarily useful in a server. 238func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { 239 if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil { 240 errorEncoder(ctx, f.Failed(), w) 241 return nil 242 } 243 w.Header().Set("Content-Type", "application/json; charset=utf-8") 244 return json.NewEncoder(w).Encode(response) 245} 246