1package addtransport
2
3import (
4	"bytes"
5	"context"
6	"encoding/json"
7	"errors"
8	"io/ioutil"
9	"net/http"
10	"net/url"
11	"strings"
12	"time"
13
14	"golang.org/x/time/rate"
15
16	stdopentracing "github.com/opentracing/opentracing-go"
17	stdzipkin "github.com/openzipkin/zipkin-go"
18	"github.com/sony/gobreaker"
19
20	"github.com/go-kit/kit/circuitbreaker"
21	"github.com/go-kit/kit/endpoint"
22	"github.com/go-kit/kit/log"
23	"github.com/go-kit/kit/ratelimit"
24	"github.com/go-kit/kit/tracing/opentracing"
25	"github.com/go-kit/kit/tracing/zipkin"
26	"github.com/go-kit/kit/transport"
27	httptransport "github.com/go-kit/kit/transport/http"
28
29	"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
30	"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
31)
32
33// NewHTTPHandler returns an HTTP handler that makes a set of endpoints
34// available on predefined paths.
35func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler {
36	options := []httptransport.ServerOption{
37		httptransport.ServerErrorEncoder(errorEncoder),
38		httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
39	}
40
41	if zipkinTracer != nil {
42		// Zipkin HTTP Server Trace can either be instantiated per endpoint with a
43		// provided operation name or a global tracing service can be instantiated
44		// without an operation name and fed to each Go kit endpoint as ServerOption.
45		// In the latter case, the operation name will be the endpoint's http method.
46		// We demonstrate a global tracing service here.
47		options = append(options, zipkin.HTTPServerTrace(zipkinTracer))
48	}
49
50	m := http.NewServeMux()
51	m.Handle("/sum", httptransport.NewServer(
52		endpoints.SumEndpoint,
53		decodeHTTPSumRequest,
54		encodeHTTPGenericResponse,
55		append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))...,
56	))
57	m.Handle("/concat", httptransport.NewServer(
58		endpoints.ConcatEndpoint,
59		decodeHTTPConcatRequest,
60		encodeHTTPGenericResponse,
61		append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))...,
62	))
63	return m
64}
65
66// NewHTTPClient returns an AddService backed by an HTTP server living at the
67// remote instance. We expect instance to come from a service discovery system,
68// so likely of the form "host:port". We bake-in certain middlewares,
69// implementing the client library pattern.
70func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) {
71	// Quickly sanitize the instance string.
72	if !strings.HasPrefix(instance, "http") {
73		instance = "http://" + instance
74	}
75	u, err := url.Parse(instance)
76	if err != nil {
77		return nil, err
78	}
79
80	// We construct a single ratelimiter middleware, to limit the total outgoing
81	// QPS from this client to all methods on the remote instance. We also
82	// construct per-endpoint circuitbreaker middlewares to demonstrate how
83	// that's done, although they could easily be combined into a single breaker
84	// for the entire remote instance, too.
85	limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
86
87	// global client middlewares
88	var options []httptransport.ClientOption
89
90	if zipkinTracer != nil {
91		// Zipkin HTTP Client Trace can either be instantiated per endpoint with a
92		// provided operation name or a global tracing client can be instantiated
93		// without an operation name and fed to each Go kit endpoint as ClientOption.
94		// In the latter case, the operation name will be the endpoint's http method.
95		options = append(options, zipkin.HTTPClientTrace(zipkinTracer))
96	}
97
98	// Each individual endpoint is an http/transport.Client (which implements
99	// endpoint.Endpoint) that gets wrapped with various middlewares. If you
100	// made your own client library, you'd do this work there, so your server
101	// could rely on a consistent set of client behavior.
102	var sumEndpoint endpoint.Endpoint
103	{
104		sumEndpoint = httptransport.NewClient(
105			"POST",
106			copyURL(u, "/sum"),
107			encodeHTTPGenericRequest,
108			decodeHTTPSumResponse,
109			append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
110		).Endpoint()
111		sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
112		if zipkinTracer != nil {
113			sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
114		}
115		sumEndpoint = limiter(sumEndpoint)
116		sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
117			Name:    "Sum",
118			Timeout: 30 * time.Second,
119		}))(sumEndpoint)
120	}
121
122	// The Concat endpoint is the same thing, with slightly different
123	// middlewares to demonstrate how to specialize per-endpoint.
124	var concatEndpoint endpoint.Endpoint
125	{
126		concatEndpoint = httptransport.NewClient(
127			"POST",
128			copyURL(u, "/concat"),
129			encodeHTTPGenericRequest,
130			decodeHTTPConcatResponse,
131			append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
132		).Endpoint()
133		concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
134		if zipkinTracer != nil {
135			concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
136		}
137		concatEndpoint = limiter(concatEndpoint)
138		concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
139			Name:    "Concat",
140			Timeout: 10 * time.Second,
141		}))(concatEndpoint)
142	}
143
144	// Returning the endpoint.Set as a service.Service relies on the
145	// endpoint.Set implementing the Service methods. That's just a simple bit
146	// of glue code.
147	return addendpoint.Set{
148		SumEndpoint:    sumEndpoint,
149		ConcatEndpoint: concatEndpoint,
150	}, nil
151}
152
153func copyURL(base *url.URL, path string) *url.URL {
154	next := *base
155	next.Path = path
156	return &next
157}
158
159func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
160	w.WriteHeader(err2code(err))
161	json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
162}
163
164func err2code(err error) int {
165	switch err {
166	case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
167		return http.StatusBadRequest
168	}
169	return http.StatusInternalServerError
170}
171
172func errorDecoder(r *http.Response) error {
173	var w errorWrapper
174	if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
175		return err
176	}
177	return errors.New(w.Error)
178}
179
180type errorWrapper struct {
181	Error string `json:"error"`
182}
183
184// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
185// JSON-encoded sum request from the HTTP request body. Primarily useful in a
186// server.
187func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
188	var req addendpoint.SumRequest
189	err := json.NewDecoder(r.Body).Decode(&req)
190	return req, err
191}
192
193// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
194// JSON-encoded concat request from the HTTP request body. Primarily useful in a
195// server.
196func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
197	var req addendpoint.ConcatRequest
198	err := json.NewDecoder(r.Body).Decode(&req)
199	return req, err
200}
201
202// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
203// JSON-encoded sum response from the HTTP response body. If the response has a
204// non-200 status code, we will interpret that as an error and attempt to decode
205// the specific error message from the response body. Primarily useful in a
206// client.
207func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
208	if r.StatusCode != http.StatusOK {
209		return nil, errors.New(r.Status)
210	}
211	var resp addendpoint.SumResponse
212	err := json.NewDecoder(r.Body).Decode(&resp)
213	return resp, err
214}
215
216// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
217// a JSON-encoded concat response from the HTTP response body. If the response
218// has a non-200 status code, we will interpret that as an error and attempt to
219// decode the specific error message from the response body. Primarily useful in
220// a client.
221func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
222	if r.StatusCode != http.StatusOK {
223		return nil, errors.New(r.Status)
224	}
225	var resp addendpoint.ConcatResponse
226	err := json.NewDecoder(r.Body).Decode(&resp)
227	return resp, err
228}
229
230// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
231// JSON-encodes any request to the request body. Primarily useful in a client.
232func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
233	var buf bytes.Buffer
234	if err := json.NewEncoder(&buf).Encode(request); err != nil {
235		return err
236	}
237	r.Body = ioutil.NopCloser(&buf)
238	return nil
239}
240
241// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
242// the response as JSON to the response writer. Primarily useful in a server.
243func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
244	if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil {
245		errorEncoder(ctx, f.Failed(), w)
246		return nil
247	}
248	w.Header().Set("Content-Type", "application/json; charset=utf-8")
249	return json.NewEncoder(w).Encode(response)
250}
251