1package addtransport
2
3import (
4	"bytes"
5	"context"
6	"encoding/json"
7	"errors"
8	"io/ioutil"
9	"net/http"
10	"net/url"
11	"strings"
12	"time"
13
14	"golang.org/x/time/rate"
15
16	stdopentracing "github.com/opentracing/opentracing-go"
17	stdzipkin "github.com/openzipkin/zipkin-go"
18	"github.com/sony/gobreaker"
19
20	"github.com/go-kit/kit/circuitbreaker"
21	"github.com/go-kit/kit/endpoint"
22	"github.com/go-kit/kit/log"
23	"github.com/go-kit/kit/ratelimit"
24	"github.com/go-kit/kit/tracing/opentracing"
25	"github.com/go-kit/kit/tracing/zipkin"
26	"github.com/go-kit/kit/transport"
27	httptransport "github.com/go-kit/kit/transport/http"
28
29	"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
30	"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
31)
32
33// NewHTTPHandler returns an HTTP handler that makes a set of endpoints
34// available on predefined paths.
35func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler {
36	// Zipkin HTTP Server Trace can either be instantiated per endpoint with a
37	// provided operation name or a global tracing service can be instantiated
38	// without an operation name and fed to each Go kit endpoint as ServerOption.
39	// In the latter case, the operation name will be the endpoint's http method.
40	// We demonstrate a global tracing service here.
41	zipkinServer := zipkin.HTTPServerTrace(zipkinTracer)
42
43	options := []httptransport.ServerOption{
44		httptransport.ServerErrorEncoder(errorEncoder),
45		httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
46		zipkinServer,
47	}
48
49	m := http.NewServeMux()
50	m.Handle("/sum", httptransport.NewServer(
51		endpoints.SumEndpoint,
52		decodeHTTPSumRequest,
53		encodeHTTPGenericResponse,
54		append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))...,
55	))
56	m.Handle("/concat", httptransport.NewServer(
57		endpoints.ConcatEndpoint,
58		decodeHTTPConcatRequest,
59		encodeHTTPGenericResponse,
60		append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))...,
61	))
62	return m
63}
64
65// NewHTTPClient returns an AddService backed by an HTTP server living at the
66// remote instance. We expect instance to come from a service discovery system,
67// so likely of the form "host:port". We bake-in certain middlewares,
68// implementing the client library pattern.
69func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) {
70	// Quickly sanitize the instance string.
71	if !strings.HasPrefix(instance, "http") {
72		instance = "http://" + instance
73	}
74	u, err := url.Parse(instance)
75	if err != nil {
76		return nil, err
77	}
78
79	// We construct a single ratelimiter middleware, to limit the total outgoing
80	// QPS from this client to all methods on the remote instance. We also
81	// construct per-endpoint circuitbreaker middlewares to demonstrate how
82	// that's done, although they could easily be combined into a single breaker
83	// for the entire remote instance, too.
84	limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
85
86	// Zipkin HTTP Client Trace can either be instantiated per endpoint with a
87	// provided operation name or a global tracing client can be instantiated
88	// without an operation name and fed to each Go kit endpoint as ClientOption.
89	// In the latter case, the operation name will be the endpoint's http method.
90	zipkinClient := zipkin.HTTPClientTrace(zipkinTracer)
91
92	// global client middlewares
93	options := []httptransport.ClientOption{
94		zipkinClient,
95	}
96
97	// Each individual endpoint is an http/transport.Client (which implements
98	// endpoint.Endpoint) that gets wrapped with various middlewares. If you
99	// made your own client library, you'd do this work there, so your server
100	// could rely on a consistent set of client behavior.
101	var sumEndpoint endpoint.Endpoint
102	{
103		sumEndpoint = httptransport.NewClient(
104			"POST",
105			copyURL(u, "/sum"),
106			encodeHTTPGenericRequest,
107			decodeHTTPSumResponse,
108			append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
109		).Endpoint()
110		sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
111		sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
112		sumEndpoint = limiter(sumEndpoint)
113		sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
114			Name:    "Sum",
115			Timeout: 30 * time.Second,
116		}))(sumEndpoint)
117	}
118
119	// The Concat endpoint is the same thing, with slightly different
120	// middlewares to demonstrate how to specialize per-endpoint.
121	var concatEndpoint endpoint.Endpoint
122	{
123		concatEndpoint = httptransport.NewClient(
124			"POST",
125			copyURL(u, "/concat"),
126			encodeHTTPGenericRequest,
127			decodeHTTPConcatResponse,
128			append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
129		).Endpoint()
130		concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
131		concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
132		concatEndpoint = limiter(concatEndpoint)
133		concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
134			Name:    "Concat",
135			Timeout: 10 * time.Second,
136		}))(concatEndpoint)
137	}
138
139	// Returning the endpoint.Set as a service.Service relies on the
140	// endpoint.Set implementing the Service methods. That's just a simple bit
141	// of glue code.
142	return addendpoint.Set{
143		SumEndpoint:    sumEndpoint,
144		ConcatEndpoint: concatEndpoint,
145	}, nil
146}
147
148func copyURL(base *url.URL, path string) *url.URL {
149	next := *base
150	next.Path = path
151	return &next
152}
153
154func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
155	w.WriteHeader(err2code(err))
156	json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
157}
158
159func err2code(err error) int {
160	switch err {
161	case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
162		return http.StatusBadRequest
163	}
164	return http.StatusInternalServerError
165}
166
167func errorDecoder(r *http.Response) error {
168	var w errorWrapper
169	if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
170		return err
171	}
172	return errors.New(w.Error)
173}
174
175type errorWrapper struct {
176	Error string `json:"error"`
177}
178
179// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
180// JSON-encoded sum request from the HTTP request body. Primarily useful in a
181// server.
182func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
183	var req addendpoint.SumRequest
184	err := json.NewDecoder(r.Body).Decode(&req)
185	return req, err
186}
187
188// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
189// JSON-encoded concat request from the HTTP request body. Primarily useful in a
190// server.
191func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
192	var req addendpoint.ConcatRequest
193	err := json.NewDecoder(r.Body).Decode(&req)
194	return req, err
195}
196
197// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
198// JSON-encoded sum response from the HTTP response body. If the response has a
199// non-200 status code, we will interpret that as an error and attempt to decode
200// the specific error message from the response body. Primarily useful in a
201// client.
202func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
203	if r.StatusCode != http.StatusOK {
204		return nil, errors.New(r.Status)
205	}
206	var resp addendpoint.SumResponse
207	err := json.NewDecoder(r.Body).Decode(&resp)
208	return resp, err
209}
210
211// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
212// a JSON-encoded concat response from the HTTP response body. If the response
213// has a non-200 status code, we will interpret that as an error and attempt to
214// decode the specific error message from the response body. Primarily useful in
215// a client.
216func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
217	if r.StatusCode != http.StatusOK {
218		return nil, errors.New(r.Status)
219	}
220	var resp addendpoint.ConcatResponse
221	err := json.NewDecoder(r.Body).Decode(&resp)
222	return resp, err
223}
224
225// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
226// JSON-encodes any request to the request body. Primarily useful in a client.
227func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
228	var buf bytes.Buffer
229	if err := json.NewEncoder(&buf).Encode(request); err != nil {
230		return err
231	}
232	r.Body = ioutil.NopCloser(&buf)
233	return nil
234}
235
236// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
237// the response as JSON to the response writer. Primarily useful in a server.
238func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
239	if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil {
240		errorEncoder(ctx, f.Failed(), w)
241		return nil
242	}
243	w.Header().Set("Content-Type", "application/json; charset=utf-8")
244	return json.NewEncoder(w).Encode(response)
245}
246