1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package lsprpc implements a jsonrpc2.StreamServer that may be used to
6// serve the LSP on a jsonrpc2 channel.
7package lsprpc
8
9import (
10	"context"
11	"fmt"
12	"log"
13	"net"
14	"os"
15	"time"
16
17	"golang.org/x/sync/errgroup"
18	"golang.org/x/tools/internal/jsonrpc2"
19	"golang.org/x/tools/internal/lsp"
20	"golang.org/x/tools/internal/lsp/cache"
21	"golang.org/x/tools/internal/lsp/protocol"
22)
23
24// The StreamServer type is a jsonrpc2.StreamServer that handles incoming
25// streams as a new LSP session, using a shared cache.
26type StreamServer struct {
27	withTelemetry bool
28
29	// accept is mutable for testing.
30	accept func(protocol.Client) protocol.Server
31}
32
33// NewStreamServer creates a StreamServer using the shared cache. If
34// withTelemetry is true, each session is instrumented with telemetry that
35// records RPC statistics.
36func NewStreamServer(cache *cache.Cache, withTelemetry bool) *StreamServer {
37	s := &StreamServer{
38		withTelemetry: withTelemetry,
39	}
40	s.accept = func(c protocol.Client) protocol.Server {
41		session := cache.NewSession()
42		return lsp.NewServer(session, c)
43	}
44	return s
45}
46
47// ServeStream implements the jsonrpc2.StreamServer interface, by handling
48// incoming streams using a new lsp server.
49func (s *StreamServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error {
50	conn := jsonrpc2.NewConn(stream)
51	client := protocol.ClientDispatcher(conn)
52	server := s.accept(client)
53	conn.AddHandler(protocol.ServerHandler(server))
54	conn.AddHandler(protocol.Canceller{})
55	if s.withTelemetry {
56		conn.AddHandler(telemetryHandler{})
57	}
58	return conn.Run(protocol.WithClient(ctx, client))
59}
60
61// A Forwarder is a jsonrpc2.StreamServer that handles an LSP stream by
62// forwarding it to a remote. This is used when the gopls process started by
63// the editor is in the `-remote` mode, which means it finds and connects to a
64// separate gopls daemon. In these cases, we still want the forwarder gopls to
65// be instrumented with telemetry, and want to be able to in some cases hijack
66// the jsonrpc2 connection with the daemon.
67type Forwarder struct {
68	network, addr string
69
70	// Configuration. Right now, not all of this may be customizable, but in the
71	// future it probably will be.
72	withTelemetry bool
73	dialTimeout   time.Duration
74	retries       int
75}
76
77// NewForwarder creates a new Forwarder, ready to forward connections to the
78// remote server specified by network and addr.
79func NewForwarder(network, addr string, withTelemetry bool) *Forwarder {
80	return &Forwarder{
81		network:       network,
82		addr:          addr,
83		withTelemetry: withTelemetry,
84		dialTimeout:   1 * time.Second,
85		retries:       5,
86	}
87}
88
89// ServeStream dials the forwarder remote and binds the remote to serve the LSP
90// on the incoming stream.
91func (f *Forwarder) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error {
92	clientConn := jsonrpc2.NewConn(stream)
93	client := protocol.ClientDispatcher(clientConn)
94
95	var (
96		netConn net.Conn
97		err     error
98	)
99	// Sometimes the forwarder will be started immediately after the server is
100	// started. To account for these cases, add in some simple retrying.
101	// Note that the number of total attempts is f.retries + 1.
102	for attempt := 0; attempt <= f.retries; attempt++ {
103		startDial := time.Now()
104		netConn, err = net.DialTimeout(f.network, f.addr, f.dialTimeout)
105		if err == nil {
106			break
107		}
108		log.Printf("failed an attempt to connect to remote: %v\n", err)
109		// In case our failure was a fast-failure, ensure we wait at least
110		// f.dialTimeout before trying again.
111		if attempt != f.retries {
112			time.Sleep(f.dialTimeout - time.Since(startDial))
113		}
114	}
115	if err != nil {
116		return fmt.Errorf("forwarder: dialing remote: %v", err)
117	}
118	serverConn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn, netConn))
119	server := protocol.ServerDispatcher(serverConn)
120
121	// Forward between connections.
122	serverConn.AddHandler(protocol.ClientHandler(client))
123	serverConn.AddHandler(protocol.Canceller{})
124	clientConn.AddHandler(protocol.ServerHandler(server))
125	clientConn.AddHandler(protocol.Canceller{})
126	clientConn.AddHandler(forwarderHandler{})
127	if f.withTelemetry {
128		clientConn.AddHandler(telemetryHandler{})
129	}
130
131	g, ctx := errgroup.WithContext(ctx)
132	g.Go(func() error {
133		return serverConn.Run(ctx)
134	})
135	g.Go(func() error {
136		return clientConn.Run(ctx)
137	})
138	return g.Wait()
139}
140
141// ForwarderExitFunc is used to exit the forwarder process. It is mutable for
142// testing purposes.
143var ForwarderExitFunc = os.Exit
144
145// forwarderHandler intercepts 'exit' messages to prevent the shared gopls
146// instance from exiting. In the future it may also intercept 'shutdown' to
147// provide more graceful shutdown of the client connection.
148type forwarderHandler struct {
149	jsonrpc2.EmptyHandler
150}
151
152func (forwarderHandler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool {
153	// TODO(golang.org/issues/34111): we should more gracefully disconnect here,
154	// once that process exists.
155	if r.Method == "exit" {
156		ForwarderExitFunc(0)
157		// Still return true here to prevent the message from being delivered: in
158		// tests, ForwarderExitFunc may be overridden to something that doesn't
159		// exit the process.
160		return true
161	}
162	return false
163}
164