1// Copyright 2019 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package jsonrpc2 6 7import ( 8 "context" 9 "fmt" 10 "sync" 11 12 "golang.org/x/tools/internal/event" 13) 14 15// Handler is invoked to handle incoming requests. 16// The Replier sends a reply to the request and must be called exactly once. 17type Handler func(ctx context.Context, reply Replier, req Request) error 18 19// Replier is passed to handlers to allow them to reply to the request. 20// If err is set then result will be ignored. 21type Replier func(ctx context.Context, result interface{}, err error) error 22 23// MethodNotFound is a Handler that replies to all call requests with the 24// standard method not found response. 25// This should normally be the final handler in a chain. 26func MethodNotFound(ctx context.Context, reply Replier, req Request) error { 27 return reply(ctx, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method())) 28} 29 30// MustReplyHandler creates a Handler that panics if the wrapped handler does 31// not call Reply for every request that it is passed. 32func MustReplyHandler(handler Handler) Handler { 33 return func(ctx context.Context, reply Replier, req Request) error { 34 called := false 35 err := handler(ctx, func(ctx context.Context, result interface{}, err error) error { 36 if called { 37 panic(fmt.Errorf("request %q replied to more than once", req.Method())) 38 } 39 called = true 40 return reply(ctx, result, err) 41 }, req) 42 if !called { 43 panic(fmt.Errorf("request %q was never replied to", req.Method())) 44 } 45 return err 46 } 47} 48 49// CancelHandler returns a handler that supports cancellation, and a function 50// that can be used to trigger canceling in progress requests. 51func CancelHandler(handler Handler) (Handler, func(id ID)) { 52 var mu sync.Mutex 53 handling := make(map[ID]context.CancelFunc) 54 wrapped := func(ctx context.Context, reply Replier, req Request) error { 55 if call, ok := req.(*Call); ok { 56 cancelCtx, cancel := context.WithCancel(ctx) 57 ctx = cancelCtx 58 mu.Lock() 59 handling[call.ID()] = cancel 60 mu.Unlock() 61 innerReply := reply 62 reply = func(ctx context.Context, result interface{}, err error) error { 63 mu.Lock() 64 delete(handling, call.ID()) 65 mu.Unlock() 66 return innerReply(ctx, result, err) 67 } 68 } 69 return handler(ctx, reply, req) 70 } 71 return wrapped, func(id ID) { 72 mu.Lock() 73 cancel, found := handling[id] 74 mu.Unlock() 75 if found { 76 cancel() 77 } 78 } 79} 80 81// AsyncHandler returns a handler that processes each request goes in its own 82// goroutine. 83// The handler returns immediately, without the request being processed. 84// Each request then waits for the previous request to finish before it starts. 85// This allows the stream to unblock at the cost of unbounded goroutines 86// all stalled on the previous one. 87func AsyncHandler(handler Handler) Handler { 88 nextRequest := make(chan struct{}) 89 close(nextRequest) 90 return func(ctx context.Context, reply Replier, req Request) error { 91 waitForPrevious := nextRequest 92 nextRequest = make(chan struct{}) 93 unlockNext := nextRequest 94 innerReply := reply 95 reply = func(ctx context.Context, result interface{}, err error) error { 96 close(unlockNext) 97 return innerReply(ctx, result, err) 98 } 99 _, queueDone := event.Start(ctx, "queued") 100 go func() { 101 <-waitForPrevious 102 queueDone() 103 if err := handler(ctx, reply, req); err != nil { 104 event.Error(ctx, "jsonrpc2 async message delivery failed", err) 105 } 106 }() 107 return nil 108 } 109} 110