1package addtransport 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "io/ioutil" 9 "net/http" 10 "net/url" 11 "strings" 12 "time" 13 14 "golang.org/x/time/rate" 15 16 stdopentracing "github.com/opentracing/opentracing-go" 17 stdzipkin "github.com/openzipkin/zipkin-go" 18 "github.com/sony/gobreaker" 19 20 "github.com/go-kit/kit/circuitbreaker" 21 "github.com/go-kit/kit/endpoint" 22 "github.com/go-kit/kit/log" 23 "github.com/go-kit/kit/ratelimit" 24 "github.com/go-kit/kit/tracing/opentracing" 25 "github.com/go-kit/kit/tracing/zipkin" 26 "github.com/go-kit/kit/transport" 27 httptransport "github.com/go-kit/kit/transport/http" 28 29 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" 30 "github.com/go-kit/kit/examples/addsvc/pkg/addservice" 31) 32 33// NewHTTPHandler returns an HTTP handler that makes a set of endpoints 34// available on predefined paths. 35func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler { 36 options := []httptransport.ServerOption{ 37 httptransport.ServerErrorEncoder(errorEncoder), 38 httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)), 39 } 40 41 if zipkinTracer != nil { 42 // Zipkin HTTP Server Trace can either be instantiated per endpoint with a 43 // provided operation name or a global tracing service can be instantiated 44 // without an operation name and fed to each Go kit endpoint as ServerOption. 45 // In the latter case, the operation name will be the endpoint's http method. 46 // We demonstrate a global tracing service here. 47 options = append(options, zipkin.HTTPServerTrace(zipkinTracer)) 48 } 49 50 m := http.NewServeMux() 51 m.Handle("/sum", httptransport.NewServer( 52 endpoints.SumEndpoint, 53 decodeHTTPSumRequest, 54 encodeHTTPGenericResponse, 55 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))..., 56 )) 57 m.Handle("/concat", httptransport.NewServer( 58 endpoints.ConcatEndpoint, 59 decodeHTTPConcatRequest, 60 encodeHTTPGenericResponse, 61 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))..., 62 )) 63 return m 64} 65 66// NewHTTPClient returns an AddService backed by an HTTP server living at the 67// remote instance. We expect instance to come from a service discovery system, 68// so likely of the form "host:port". We bake-in certain middlewares, 69// implementing the client library pattern. 70func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) { 71 // Quickly sanitize the instance string. 72 if !strings.HasPrefix(instance, "http") { 73 instance = "http://" + instance 74 } 75 u, err := url.Parse(instance) 76 if err != nil { 77 return nil, err 78 } 79 80 // We construct a single ratelimiter middleware, to limit the total outgoing 81 // QPS from this client to all methods on the remote instance. We also 82 // construct per-endpoint circuitbreaker middlewares to demonstrate how 83 // that's done, although they could easily be combined into a single breaker 84 // for the entire remote instance, too. 85 limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) 86 87 // global client middlewares 88 var options []httptransport.ClientOption 89 90 if zipkinTracer != nil { 91 // Zipkin HTTP Client Trace can either be instantiated per endpoint with a 92 // provided operation name or a global tracing client can be instantiated 93 // without an operation name and fed to each Go kit endpoint as ClientOption. 94 // In the latter case, the operation name will be the endpoint's http method. 95 options = append(options, zipkin.HTTPClientTrace(zipkinTracer)) 96 } 97 98 // Each individual endpoint is an http/transport.Client (which implements 99 // endpoint.Endpoint) that gets wrapped with various middlewares. If you 100 // made your own client library, you'd do this work there, so your server 101 // could rely on a consistent set of client behavior. 102 var sumEndpoint endpoint.Endpoint 103 { 104 sumEndpoint = httptransport.NewClient( 105 "POST", 106 copyURL(u, "/sum"), 107 encodeHTTPGenericRequest, 108 decodeHTTPSumResponse, 109 append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., 110 ).Endpoint() 111 sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) 112 if zipkinTracer != nil { 113 sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) 114 } 115 sumEndpoint = limiter(sumEndpoint) 116 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ 117 Name: "Sum", 118 Timeout: 30 * time.Second, 119 }))(sumEndpoint) 120 } 121 122 // The Concat endpoint is the same thing, with slightly different 123 // middlewares to demonstrate how to specialize per-endpoint. 124 var concatEndpoint endpoint.Endpoint 125 { 126 concatEndpoint = httptransport.NewClient( 127 "POST", 128 copyURL(u, "/concat"), 129 encodeHTTPGenericRequest, 130 decodeHTTPConcatResponse, 131 append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., 132 ).Endpoint() 133 concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) 134 if zipkinTracer != nil { 135 concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) 136 } 137 concatEndpoint = limiter(concatEndpoint) 138 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ 139 Name: "Concat", 140 Timeout: 10 * time.Second, 141 }))(concatEndpoint) 142 } 143 144 // Returning the endpoint.Set as a service.Service relies on the 145 // endpoint.Set implementing the Service methods. That's just a simple bit 146 // of glue code. 147 return addendpoint.Set{ 148 SumEndpoint: sumEndpoint, 149 ConcatEndpoint: concatEndpoint, 150 }, nil 151} 152 153func copyURL(base *url.URL, path string) *url.URL { 154 next := *base 155 next.Path = path 156 return &next 157} 158 159func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { 160 w.WriteHeader(err2code(err)) 161 json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()}) 162} 163 164func err2code(err error) int { 165 switch err { 166 case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow: 167 return http.StatusBadRequest 168 } 169 return http.StatusInternalServerError 170} 171 172func errorDecoder(r *http.Response) error { 173 var w errorWrapper 174 if err := json.NewDecoder(r.Body).Decode(&w); err != nil { 175 return err 176 } 177 return errors.New(w.Error) 178} 179 180type errorWrapper struct { 181 Error string `json:"error"` 182} 183 184// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a 185// JSON-encoded sum request from the HTTP request body. Primarily useful in a 186// server. 187func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { 188 var req addendpoint.SumRequest 189 err := json.NewDecoder(r.Body).Decode(&req) 190 return req, err 191} 192 193// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a 194// JSON-encoded concat request from the HTTP request body. Primarily useful in a 195// server. 196func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { 197 var req addendpoint.ConcatRequest 198 err := json.NewDecoder(r.Body).Decode(&req) 199 return req, err 200} 201 202// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a 203// JSON-encoded sum response from the HTTP response body. If the response has a 204// non-200 status code, we will interpret that as an error and attempt to decode 205// the specific error message from the response body. Primarily useful in a 206// client. 207func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) { 208 if r.StatusCode != http.StatusOK { 209 return nil, errors.New(r.Status) 210 } 211 var resp addendpoint.SumResponse 212 err := json.NewDecoder(r.Body).Decode(&resp) 213 return resp, err 214} 215 216// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes 217// a JSON-encoded concat response from the HTTP response body. If the response 218// has a non-200 status code, we will interpret that as an error and attempt to 219// decode the specific error message from the response body. Primarily useful in 220// a client. 221func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) { 222 if r.StatusCode != http.StatusOK { 223 return nil, errors.New(r.Status) 224 } 225 var resp addendpoint.ConcatResponse 226 err := json.NewDecoder(r.Body).Decode(&resp) 227 return resp, err 228} 229 230// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that 231// JSON-encodes any request to the request body. Primarily useful in a client. 232func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error { 233 var buf bytes.Buffer 234 if err := json.NewEncoder(&buf).Encode(request); err != nil { 235 return err 236 } 237 r.Body = ioutil.NopCloser(&buf) 238 return nil 239} 240 241// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes 242// the response as JSON to the response writer. Primarily useful in a server. 243func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { 244 if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil { 245 errorEncoder(ctx, f.Failed(), w) 246 return nil 247 } 248 w.Header().Set("Content-Type", "application/json; charset=utf-8") 249 return json.NewEncoder(w).Encode(response) 250} 251