1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "net/url" 8 "strings" 9 "time" 10 11 "golang.org/x/time/rate" 12 13 "github.com/sony/gobreaker" 14 15 "github.com/go-kit/kit/circuitbreaker" 16 "github.com/go-kit/kit/endpoint" 17 "github.com/go-kit/kit/log" 18 "github.com/go-kit/kit/ratelimit" 19 "github.com/go-kit/kit/sd" 20 "github.com/go-kit/kit/sd/lb" 21 httptransport "github.com/go-kit/kit/transport/http" 22) 23 24func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger) ServiceMiddleware { 25 // If instances is empty, don't proxy. 26 if instances == "" { 27 logger.Log("proxy_to", "none") 28 return func(next StringService) StringService { return next } 29 } 30 31 // Set some parameters for our client. 32 var ( 33 qps = 100 // beyond which we will return an error 34 maxAttempts = 3 // per request, before giving up 35 maxTime = 250 * time.Millisecond // wallclock time, before giving up 36 ) 37 38 // Otherwise, construct an endpoint for each instance in the list, and add 39 // it to a fixed set of endpoints. In a real service, rather than doing this 40 // by hand, you'd probably use package sd's support for your service 41 // discovery system. 42 var ( 43 instanceList = split(instances) 44 endpointer sd.FixedEndpointer 45 ) 46 logger.Log("proxy_to", fmt.Sprint(instanceList)) 47 for _, instance := range instanceList { 48 var e endpoint.Endpoint 49 e = makeUppercaseProxy(ctx, instance) 50 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) 51 e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e) 52 endpointer = append(endpointer, e) 53 } 54 55 // Now, build a single, retrying, load-balancing endpoint out of all of 56 // those individual endpoints. 57 balancer := lb.NewRoundRobin(endpointer) 58 retry := lb.Retry(maxAttempts, maxTime, balancer) 59 60 // And finally, return the ServiceMiddleware, implemented by proxymw. 61 return func(next StringService) StringService { 62 return proxymw{ctx, next, retry} 63 } 64} 65 66// proxymw implements StringService, forwarding Uppercase requests to the 67// provided endpoint, and serving all other (i.e. Count) requests via the 68// next StringService. 69type proxymw struct { 70 ctx context.Context 71 next StringService // Serve most requests via this service... 72 uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint 73} 74 75func (mw proxymw) Count(s string) int { 76 return mw.next.Count(s) 77} 78 79func (mw proxymw) Uppercase(s string) (string, error) { 80 response, err := mw.uppercase(mw.ctx, uppercaseRequest{S: s}) 81 if err != nil { 82 return "", err 83 } 84 85 resp := response.(uppercaseResponse) 86 if resp.Err != "" { 87 return resp.V, errors.New(resp.Err) 88 } 89 return resp.V, nil 90} 91 92func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint { 93 if !strings.HasPrefix(instance, "http") { 94 instance = "http://" + instance 95 } 96 u, err := url.Parse(instance) 97 if err != nil { 98 panic(err) 99 } 100 if u.Path == "" { 101 u.Path = "/uppercase" 102 } 103 return httptransport.NewClient( 104 "GET", 105 u, 106 encodeRequest, 107 decodeUppercaseResponse, 108 ).Endpoint() 109} 110 111func split(s string) []string { 112 a := strings.Split(s, ",") 113 for i := range a { 114 a[i] = strings.TrimSpace(a[i]) 115 } 116 return a 117} 118