1// Copyright 2020 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Package lsprpc implements a jsonrpc2.StreamServer that may be used to 6// serve the LSP on a jsonrpc2 channel. 7package lsprpc 8 9import ( 10 "context" 11 "fmt" 12 "log" 13 "net" 14 "os" 15 "time" 16 17 "golang.org/x/sync/errgroup" 18 "golang.org/x/tools/internal/jsonrpc2" 19 "golang.org/x/tools/internal/lsp" 20 "golang.org/x/tools/internal/lsp/cache" 21 "golang.org/x/tools/internal/lsp/protocol" 22) 23 24// The StreamServer type is a jsonrpc2.StreamServer that handles incoming 25// streams as a new LSP session, using a shared cache. 26type StreamServer struct { 27 withTelemetry bool 28 29 // accept is mutable for testing. 30 accept func(protocol.Client) protocol.Server 31} 32 33// NewStreamServer creates a StreamServer using the shared cache. If 34// withTelemetry is true, each session is instrumented with telemetry that 35// records RPC statistics. 36func NewStreamServer(cache *cache.Cache, withTelemetry bool) *StreamServer { 37 s := &StreamServer{ 38 withTelemetry: withTelemetry, 39 } 40 s.accept = func(c protocol.Client) protocol.Server { 41 session := cache.NewSession() 42 return lsp.NewServer(session, c) 43 } 44 return s 45} 46 47// ServeStream implements the jsonrpc2.StreamServer interface, by handling 48// incoming streams using a new lsp server. 49func (s *StreamServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error { 50 conn := jsonrpc2.NewConn(stream) 51 client := protocol.ClientDispatcher(conn) 52 server := s.accept(client) 53 conn.AddHandler(protocol.ServerHandler(server)) 54 conn.AddHandler(protocol.Canceller{}) 55 if s.withTelemetry { 56 conn.AddHandler(telemetryHandler{}) 57 } 58 return conn.Run(protocol.WithClient(ctx, client)) 59} 60 61// A Forwarder is a jsonrpc2.StreamServer that handles an LSP stream by 62// forwarding it to a remote. This is used when the gopls process started by 63// the editor is in the `-remote` mode, which means it finds and connects to a 64// separate gopls daemon. In these cases, we still want the forwarder gopls to 65// be instrumented with telemetry, and want to be able to in some cases hijack 66// the jsonrpc2 connection with the daemon. 67type Forwarder struct { 68 network, addr string 69 70 // Configuration. Right now, not all of this may be customizable, but in the 71 // future it probably will be. 72 withTelemetry bool 73 dialTimeout time.Duration 74 retries int 75} 76 77// NewForwarder creates a new Forwarder, ready to forward connections to the 78// remote server specified by network and addr. 79func NewForwarder(network, addr string, withTelemetry bool) *Forwarder { 80 return &Forwarder{ 81 network: network, 82 addr: addr, 83 withTelemetry: withTelemetry, 84 dialTimeout: 1 * time.Second, 85 retries: 5, 86 } 87} 88 89// ServeStream dials the forwarder remote and binds the remote to serve the LSP 90// on the incoming stream. 91func (f *Forwarder) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error { 92 clientConn := jsonrpc2.NewConn(stream) 93 client := protocol.ClientDispatcher(clientConn) 94 95 var ( 96 netConn net.Conn 97 err error 98 ) 99 // Sometimes the forwarder will be started immediately after the server is 100 // started. To account for these cases, add in some simple retrying. 101 // Note that the number of total attempts is f.retries + 1. 102 for attempt := 0; attempt <= f.retries; attempt++ { 103 startDial := time.Now() 104 netConn, err = net.DialTimeout(f.network, f.addr, f.dialTimeout) 105 if err == nil { 106 break 107 } 108 log.Printf("failed an attempt to connect to remote: %v\n", err) 109 // In case our failure was a fast-failure, ensure we wait at least 110 // f.dialTimeout before trying again. 111 if attempt != f.retries { 112 time.Sleep(f.dialTimeout - time.Since(startDial)) 113 } 114 } 115 if err != nil { 116 return fmt.Errorf("forwarder: dialing remote: %v", err) 117 } 118 serverConn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn, netConn)) 119 server := protocol.ServerDispatcher(serverConn) 120 121 // Forward between connections. 122 serverConn.AddHandler(protocol.ClientHandler(client)) 123 serverConn.AddHandler(protocol.Canceller{}) 124 clientConn.AddHandler(protocol.ServerHandler(server)) 125 clientConn.AddHandler(protocol.Canceller{}) 126 clientConn.AddHandler(forwarderHandler{}) 127 if f.withTelemetry { 128 clientConn.AddHandler(telemetryHandler{}) 129 } 130 131 g, ctx := errgroup.WithContext(ctx) 132 g.Go(func() error { 133 return serverConn.Run(ctx) 134 }) 135 g.Go(func() error { 136 return clientConn.Run(ctx) 137 }) 138 return g.Wait() 139} 140 141// ForwarderExitFunc is used to exit the forwarder process. It is mutable for 142// testing purposes. 143var ForwarderExitFunc = os.Exit 144 145// forwarderHandler intercepts 'exit' messages to prevent the shared gopls 146// instance from exiting. In the future it may also intercept 'shutdown' to 147// provide more graceful shutdown of the client connection. 148type forwarderHandler struct { 149 jsonrpc2.EmptyHandler 150} 151 152func (forwarderHandler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool { 153 // TODO(golang.org/issues/34111): we should more gracefully disconnect here, 154 // once that process exists. 155 if r.Method == "exit" { 156 ForwarderExitFunc(0) 157 // Still return true here to prevent the message from being delivered: in 158 // tests, ForwarderExitFunc may be overridden to something that doesn't 159 // exit the process. 160 return true 161 } 162 return false 163} 164