1// Copyright 2019 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8	"context"
9	"fmt"
10	"sync"
11
12	"golang.org/x/tools/internal/event"
13)
14
15// Handler is invoked to handle incoming requests.
16// The Replier sends a reply to the request and must be called exactly once.
17type Handler func(ctx context.Context, reply Replier, req Request) error
18
19// Replier is passed to handlers to allow them to reply to the request.
20// If err is set then result will be ignored.
21type Replier func(ctx context.Context, result interface{}, err error) error
22
23// MethodNotFound is a Handler that replies to all call requests with the
24// standard method not found response.
25// This should normally be the final handler in a chain.
26func MethodNotFound(ctx context.Context, reply Replier, req Request) error {
27	return reply(ctx, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method()))
28}
29
30// MustReplyHandler creates a Handler that panics if the wrapped handler does
31// not call Reply for every request that it is passed.
32func MustReplyHandler(handler Handler) Handler {
33	return func(ctx context.Context, reply Replier, req Request) error {
34		called := false
35		err := handler(ctx, func(ctx context.Context, result interface{}, err error) error {
36			if called {
37				panic(fmt.Errorf("request %q replied to more than once", req.Method()))
38			}
39			called = true
40			return reply(ctx, result, err)
41		}, req)
42		if !called {
43			panic(fmt.Errorf("request %q was never replied to", req.Method()))
44		}
45		return err
46	}
47}
48
49// CancelHandler returns a handler that supports cancellation, and a function
50// that can be used to trigger canceling in progress requests.
51func CancelHandler(handler Handler) (Handler, func(id ID)) {
52	var mu sync.Mutex
53	handling := make(map[ID]context.CancelFunc)
54	wrapped := func(ctx context.Context, reply Replier, req Request) error {
55		if call, ok := req.(*Call); ok {
56			cancelCtx, cancel := context.WithCancel(ctx)
57			ctx = cancelCtx
58			mu.Lock()
59			handling[call.ID()] = cancel
60			mu.Unlock()
61			innerReply := reply
62			reply = func(ctx context.Context, result interface{}, err error) error {
63				mu.Lock()
64				delete(handling, call.ID())
65				mu.Unlock()
66				return innerReply(ctx, result, err)
67			}
68		}
69		return handler(ctx, reply, req)
70	}
71	return wrapped, func(id ID) {
72		mu.Lock()
73		cancel, found := handling[id]
74		mu.Unlock()
75		if found {
76			cancel()
77		}
78	}
79}
80
81// AsyncHandler returns a handler that processes each request goes in its own
82// goroutine.
83// The handler returns immediately, without the request being processed.
84// Each request then waits for the previous request to finish before it starts.
85// This allows the stream to unblock at the cost of unbounded goroutines
86// all stalled on the previous one.
87func AsyncHandler(handler Handler) Handler {
88	nextRequest := make(chan struct{})
89	close(nextRequest)
90	return func(ctx context.Context, reply Replier, req Request) error {
91		waitForPrevious := nextRequest
92		nextRequest = make(chan struct{})
93		unlockNext := nextRequest
94		innerReply := reply
95		reply = func(ctx context.Context, result interface{}, err error) error {
96			close(unlockNext)
97			return innerReply(ctx, result, err)
98		}
99		_, queueDone := event.Start(ctx, "queued")
100		go func() {
101			<-waitForPrevious
102			queueDone()
103			if err := handler(ctx, reply, req); err != nil {
104				event.Error(ctx, "jsonrpc2 async message delivery failed", err)
105			}
106		}()
107		return nil
108	}
109}
110