1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5/*
6	Package rpc provides access to the exported methods of an object across a
7	network or other I/O connection.  A server registers an object, making it visible
8	as a service with the name of the type of the object.  After registration, exported
9	methods of the object will be accessible remotely.  A server may register multiple
10	objects (services) of different types but it is an error to register multiple
11	objects of the same type.
12
13	Only methods that satisfy these criteria will be made available for remote access;
14	other methods will be ignored:
15
16		- the method's type is exported.
17		- the method is exported.
18		- the method has two arguments, both exported (or builtin) types.
19		- the method's second argument is a pointer.
20		- the method has return type error.
21
22	In effect, the method must look schematically like
23
24		func (t *T) MethodName(argType T1, replyType *T2) error
25
26	where T1 and T2 can be marshaled by encoding/gob.
27	These requirements apply even if a different codec is used.
28	(In the future, these requirements may soften for custom codecs.)
29
30	The method's first argument represents the arguments provided by the caller; the
31	second argument represents the result parameters to be returned to the caller.
32	The method's return value, if non-nil, is passed back as a string that the client
33	sees as if created by errors.New.  If an error is returned, the reply parameter
34	will not be sent back to the client.
35
36	The server may handle requests on a single connection by calling ServeConn.  More
37	typically it will create a network listener and call Accept or, for an HTTP
38	listener, HandleHTTP and http.Serve.
39
40	A client wishing to use the service establishes a connection and then invokes
41	NewClient on the connection.  The convenience function Dial (DialHTTP) performs
42	both steps for a raw network connection (an HTTP connection).  The resulting
43	Client object has two methods, Call and Go, that specify the service and method to
44	call, a pointer containing the arguments, and a pointer to receive the result
45	parameters.
46
47	The Call method waits for the remote call to complete while the Go method
48	launches the call asynchronously and signals completion using the Call
49	structure's Done channel.
50
51	Unless an explicit codec is set up, package encoding/gob is used to
52	transport the data.
53
54	Here is a simple example.  A server wishes to export an object of type Arith:
55
56		package server
57
58		import "errors"
59
60		type Args struct {
61			A, B int
62		}
63
64		type Quotient struct {
65			Quo, Rem int
66		}
67
68		type Arith int
69
70		func (t *Arith) Multiply(args *Args, reply *int) error {
71			*reply = args.A * args.B
72			return nil
73		}
74
75		func (t *Arith) Divide(args *Args, quo *Quotient) error {
76			if args.B == 0 {
77				return errors.New("divide by zero")
78			}
79			quo.Quo = args.A / args.B
80			quo.Rem = args.A % args.B
81			return nil
82		}
83
84	The server calls (for HTTP service):
85
86		arith := new(Arith)
87		rpc.Register(arith)
88		rpc.HandleHTTP()
89		l, e := net.Listen("tcp", ":1234")
90		if e != nil {
91			log.Fatal("listen error:", e)
92		}
93		go http.Serve(l, nil)
94
95	At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
96	"Arith.Divide".  To invoke one, a client first dials the server:
97
98		client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
99		if err != nil {
100			log.Fatal("dialing:", err)
101		}
102
103	Then it can make a remote call:
104
105		// Synchronous call
106		args := &server.Args{7,8}
107		var reply int
108		err = client.Call("Arith.Multiply", args, &reply)
109		if err != nil {
110			log.Fatal("arith error:", err)
111		}
112		fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
113
114	or
115
116		// Asynchronous call
117		quotient := new(Quotient)
118		divCall := client.Go("Arith.Divide", args, quotient, nil)
119		replyCall := <-divCall.Done	// will be equal to divCall
120		// check errors, print, etc.
121
122	A server implementation will often provide a simple, type-safe wrapper for the
123	client.
124
125	The net/rpc package is frozen and is not accepting new features.
126*/
127package rpc
128
129import (
130	"bufio"
131	"encoding/gob"
132	"errors"
133	"go/token"
134	"io"
135	"log"
136	"net"
137	"net/http"
138	"reflect"
139	"strings"
140	"sync"
141)
142
143const (
144	// Defaults used by HandleHTTP
145	DefaultRPCPath   = "/_goRPC_"
146	DefaultDebugPath = "/debug/rpc"
147)
148
149// Precompute the reflect type for error. Can't use error directly
150// because Typeof takes an empty interface value. This is annoying.
151var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
152
153type methodType struct {
154	sync.Mutex // protects counters
155	method     reflect.Method
156	ArgType    reflect.Type
157	ReplyType  reflect.Type
158	numCalls   uint
159}
160
161type service struct {
162	name   string                 // name of service
163	rcvr   reflect.Value          // receiver of methods for the service
164	typ    reflect.Type           // type of the receiver
165	method map[string]*methodType // registered methods
166}
167
168// Request is a header written before every RPC call. It is used internally
169// but documented here as an aid to debugging, such as when analyzing
170// network traffic.
171type Request struct {
172	ServiceMethod string   // format: "Service.Method"
173	Seq           uint64   // sequence number chosen by client
174	next          *Request // for free list in Server
175}
176
177// Response is a header written before every RPC return. It is used internally
178// but documented here as an aid to debugging, such as when analyzing
179// network traffic.
180type Response struct {
181	ServiceMethod string    // echoes that of the Request
182	Seq           uint64    // echoes that of the request
183	Error         string    // error, if any.
184	next          *Response // for free list in Server
185}
186
187// Server represents an RPC Server.
188type Server struct {
189	serviceMap sync.Map   // map[string]*service
190	reqLock    sync.Mutex // protects freeReq
191	freeReq    *Request
192	respLock   sync.Mutex // protects freeResp
193	freeResp   *Response
194}
195
196// NewServer returns a new Server.
197func NewServer() *Server {
198	return &Server{}
199}
200
201// DefaultServer is the default instance of *Server.
202var DefaultServer = NewServer()
203
204// Is this type exported or a builtin?
205func isExportedOrBuiltinType(t reflect.Type) bool {
206	for t.Kind() == reflect.Pointer {
207		t = t.Elem()
208	}
209	// PkgPath will be non-empty even for an exported type,
210	// so we need to check the type name as well.
211	return token.IsExported(t.Name()) || t.PkgPath() == ""
212}
213
214// Register publishes in the server the set of methods of the
215// receiver value that satisfy the following conditions:
216//	- exported method of exported type
217//	- two arguments, both of exported type
218//	- the second argument is a pointer
219//	- one return value, of type error
220// It returns an error if the receiver is not an exported type or has
221// no suitable methods. It also logs the error using package log.
222// The client accesses each method using a string of the form "Type.Method",
223// where Type is the receiver's concrete type.
224func (server *Server) Register(rcvr any) error {
225	return server.register(rcvr, "", false)
226}
227
228// RegisterName is like Register but uses the provided name for the type
229// instead of the receiver's concrete type.
230func (server *Server) RegisterName(name string, rcvr any) error {
231	return server.register(rcvr, name, true)
232}
233
234// logRegisterError specifies whether to log problems during method registration.
235// To debug registration, recompile the package with this set to true.
236const logRegisterError = false
237
238func (server *Server) register(rcvr any, name string, useName bool) error {
239	s := new(service)
240	s.typ = reflect.TypeOf(rcvr)
241	s.rcvr = reflect.ValueOf(rcvr)
242	sname := reflect.Indirect(s.rcvr).Type().Name()
243	if useName {
244		sname = name
245	}
246	if sname == "" {
247		s := "rpc.Register: no service name for type " + s.typ.String()
248		log.Print(s)
249		return errors.New(s)
250	}
251	if !token.IsExported(sname) && !useName {
252		s := "rpc.Register: type " + sname + " is not exported"
253		log.Print(s)
254		return errors.New(s)
255	}
256	s.name = sname
257
258	// Install the methods
259	s.method = suitableMethods(s.typ, logRegisterError)
260
261	if len(s.method) == 0 {
262		str := ""
263
264		// To help the user, see if a pointer receiver would work.
265		method := suitableMethods(reflect.PointerTo(s.typ), false)
266		if len(method) != 0 {
267			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
268		} else {
269			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
270		}
271		log.Print(str)
272		return errors.New(str)
273	}
274
275	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
276		return errors.New("rpc: service already defined: " + sname)
277	}
278	return nil
279}
280
281// suitableMethods returns suitable Rpc methods of typ. It will log
282// errors if logErr is true.
283func suitableMethods(typ reflect.Type, logErr bool) map[string]*methodType {
284	methods := make(map[string]*methodType)
285	for m := 0; m < typ.NumMethod(); m++ {
286		method := typ.Method(m)
287		mtype := method.Type
288		mname := method.Name
289		// Method must be exported.
290		if !method.IsExported() {
291			continue
292		}
293		// Method needs three ins: receiver, *args, *reply.
294		if mtype.NumIn() != 3 {
295			if logErr {
296				log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
297			}
298			continue
299		}
300		// First arg need not be a pointer.
301		argType := mtype.In(1)
302		if !isExportedOrBuiltinType(argType) {
303			if logErr {
304				log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
305			}
306			continue
307		}
308		// Second arg must be a pointer.
309		replyType := mtype.In(2)
310		if replyType.Kind() != reflect.Pointer {
311			if logErr {
312				log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
313			}
314			continue
315		}
316		// Reply type must be exported.
317		if !isExportedOrBuiltinType(replyType) {
318			if logErr {
319				log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
320			}
321			continue
322		}
323		// Method needs one out.
324		if mtype.NumOut() != 1 {
325			if logErr {
326				log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
327			}
328			continue
329		}
330		// The return type of the method must be error.
331		if returnType := mtype.Out(0); returnType != typeOfError {
332			if logErr {
333				log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
334			}
335			continue
336		}
337		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
338	}
339	return methods
340}
341
342// A value sent as a placeholder for the server's response value when the server
343// receives an invalid request. It is never decoded by the client since the Response
344// contains an error when it is used.
345var invalidRequest = struct{}{}
346
347func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply any, codec ServerCodec, errmsg string) {
348	resp := server.getResponse()
349	// Encode the response header
350	resp.ServiceMethod = req.ServiceMethod
351	if errmsg != "" {
352		resp.Error = errmsg
353		reply = invalidRequest
354	}
355	resp.Seq = req.Seq
356	sending.Lock()
357	err := codec.WriteResponse(resp, reply)
358	if debugLog && err != nil {
359		log.Println("rpc: writing response:", err)
360	}
361	sending.Unlock()
362	server.freeResponse(resp)
363}
364
365func (m *methodType) NumCalls() (n uint) {
366	m.Lock()
367	n = m.numCalls
368	m.Unlock()
369	return n
370}
371
372func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
373	if wg != nil {
374		defer wg.Done()
375	}
376	mtype.Lock()
377	mtype.numCalls++
378	mtype.Unlock()
379	function := mtype.method.Func
380	// Invoke the method, providing a new value for the reply.
381	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
382	// The return value for the method is an error.
383	errInter := returnValues[0].Interface()
384	errmsg := ""
385	if errInter != nil {
386		errmsg = errInter.(error).Error()
387	}
388	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
389	server.freeRequest(req)
390}
391
392type gobServerCodec struct {
393	rwc    io.ReadWriteCloser
394	dec    *gob.Decoder
395	enc    *gob.Encoder
396	encBuf *bufio.Writer
397	closed bool
398}
399
400func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
401	return c.dec.Decode(r)
402}
403
404func (c *gobServerCodec) ReadRequestBody(body any) error {
405	return c.dec.Decode(body)
406}
407
408func (c *gobServerCodec) WriteResponse(r *Response, body any) (err error) {
409	if err = c.enc.Encode(r); err != nil {
410		if c.encBuf.Flush() == nil {
411			// Gob couldn't encode the header. Should not happen, so if it does,
412			// shut down the connection to signal that the connection is broken.
413			log.Println("rpc: gob error encoding response:", err)
414			c.Close()
415		}
416		return
417	}
418	if err = c.enc.Encode(body); err != nil {
419		if c.encBuf.Flush() == nil {
420			// Was a gob problem encoding the body but the header has been written.
421			// Shut down the connection to signal that the connection is broken.
422			log.Println("rpc: gob error encoding body:", err)
423			c.Close()
424		}
425		return
426	}
427	return c.encBuf.Flush()
428}
429
430func (c *gobServerCodec) Close() error {
431	if c.closed {
432		// Only call c.rwc.Close once; otherwise the semantics are undefined.
433		return nil
434	}
435	c.closed = true
436	return c.rwc.Close()
437}
438
439// ServeConn runs the server on a single connection.
440// ServeConn blocks, serving the connection until the client hangs up.
441// The caller typically invokes ServeConn in a go statement.
442// ServeConn uses the gob wire format (see package gob) on the
443// connection. To use an alternate codec, use ServeCodec.
444// See NewClient's comment for information about concurrent access.
445func (server *Server) ServeConn(conn io.ReadWriteCloser) {
446	buf := bufio.NewWriter(conn)
447	srv := &gobServerCodec{
448		rwc:    conn,
449		dec:    gob.NewDecoder(conn),
450		enc:    gob.NewEncoder(buf),
451		encBuf: buf,
452	}
453	server.ServeCodec(srv)
454}
455
456// ServeCodec is like ServeConn but uses the specified codec to
457// decode requests and encode responses.
458func (server *Server) ServeCodec(codec ServerCodec) {
459	sending := new(sync.Mutex)
460	wg := new(sync.WaitGroup)
461	for {
462		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
463		if err != nil {
464			if debugLog && err != io.EOF {
465				log.Println("rpc:", err)
466			}
467			if !keepReading {
468				break
469			}
470			// send a response if we actually managed to read a header.
471			if req != nil {
472				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
473				server.freeRequest(req)
474			}
475			continue
476		}
477		wg.Add(1)
478		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
479	}
480	// We've seen that there are no more requests.
481	// Wait for responses to be sent before closing codec.
482	wg.Wait()
483	codec.Close()
484}
485
486// ServeRequest is like ServeCodec but synchronously serves a single request.
487// It does not close the codec upon completion.
488func (server *Server) ServeRequest(codec ServerCodec) error {
489	sending := new(sync.Mutex)
490	service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
491	if err != nil {
492		if !keepReading {
493			return err
494		}
495		// send a response if we actually managed to read a header.
496		if req != nil {
497			server.sendResponse(sending, req, invalidRequest, codec, err.Error())
498			server.freeRequest(req)
499		}
500		return err
501	}
502	service.call(server, sending, nil, mtype, req, argv, replyv, codec)
503	return nil
504}
505
506func (server *Server) getRequest() *Request {
507	server.reqLock.Lock()
508	req := server.freeReq
509	if req == nil {
510		req = new(Request)
511	} else {
512		server.freeReq = req.next
513		*req = Request{}
514	}
515	server.reqLock.Unlock()
516	return req
517}
518
519func (server *Server) freeRequest(req *Request) {
520	server.reqLock.Lock()
521	req.next = server.freeReq
522	server.freeReq = req
523	server.reqLock.Unlock()
524}
525
526func (server *Server) getResponse() *Response {
527	server.respLock.Lock()
528	resp := server.freeResp
529	if resp == nil {
530		resp = new(Response)
531	} else {
532		server.freeResp = resp.next
533		*resp = Response{}
534	}
535	server.respLock.Unlock()
536	return resp
537}
538
539func (server *Server) freeResponse(resp *Response) {
540	server.respLock.Lock()
541	resp.next = server.freeResp
542	server.freeResp = resp
543	server.respLock.Unlock()
544}
545
546func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
547	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
548	if err != nil {
549		if !keepReading {
550			return
551		}
552		// discard body
553		codec.ReadRequestBody(nil)
554		return
555	}
556
557	// Decode the argument value.
558	argIsValue := false // if true, need to indirect before calling.
559	if mtype.ArgType.Kind() == reflect.Pointer {
560		argv = reflect.New(mtype.ArgType.Elem())
561	} else {
562		argv = reflect.New(mtype.ArgType)
563		argIsValue = true
564	}
565	// argv guaranteed to be a pointer now.
566	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
567		return
568	}
569	if argIsValue {
570		argv = argv.Elem()
571	}
572
573	replyv = reflect.New(mtype.ReplyType.Elem())
574
575	switch mtype.ReplyType.Elem().Kind() {
576	case reflect.Map:
577		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
578	case reflect.Slice:
579		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
580	}
581	return
582}
583
584func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
585	// Grab the request header.
586	req = server.getRequest()
587	err = codec.ReadRequestHeader(req)
588	if err != nil {
589		req = nil
590		if err == io.EOF || err == io.ErrUnexpectedEOF {
591			return
592		}
593		err = errors.New("rpc: server cannot decode request: " + err.Error())
594		return
595	}
596
597	// We read the header successfully. If we see an error now,
598	// we can still recover and move on to the next request.
599	keepReading = true
600
601	dot := strings.LastIndex(req.ServiceMethod, ".")
602	if dot < 0 {
603		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
604		return
605	}
606	serviceName := req.ServiceMethod[:dot]
607	methodName := req.ServiceMethod[dot+1:]
608
609	// Look up the request.
610	svci, ok := server.serviceMap.Load(serviceName)
611	if !ok {
612		err = errors.New("rpc: can't find service " + req.ServiceMethod)
613		return
614	}
615	svc = svci.(*service)
616	mtype = svc.method[methodName]
617	if mtype == nil {
618		err = errors.New("rpc: can't find method " + req.ServiceMethod)
619	}
620	return
621}
622
623// Accept accepts connections on the listener and serves requests
624// for each incoming connection. Accept blocks until the listener
625// returns a non-nil error. The caller typically invokes Accept in a
626// go statement.
627func (server *Server) Accept(lis net.Listener) {
628	for {
629		conn, err := lis.Accept()
630		if err != nil {
631			log.Print("rpc.Serve: accept:", err.Error())
632			return
633		}
634		go server.ServeConn(conn)
635	}
636}
637
638// Register publishes the receiver's methods in the DefaultServer.
639func Register(rcvr any) error { return DefaultServer.Register(rcvr) }
640
641// RegisterName is like Register but uses the provided name for the type
642// instead of the receiver's concrete type.
643func RegisterName(name string, rcvr any) error {
644	return DefaultServer.RegisterName(name, rcvr)
645}
646
647// A ServerCodec implements reading of RPC requests and writing of
648// RPC responses for the server side of an RPC session.
649// The server calls ReadRequestHeader and ReadRequestBody in pairs
650// to read requests from the connection, and it calls WriteResponse to
651// write a response back. The server calls Close when finished with the
652// connection. ReadRequestBody may be called with a nil
653// argument to force the body of the request to be read and discarded.
654// See NewClient's comment for information about concurrent access.
655type ServerCodec interface {
656	ReadRequestHeader(*Request) error
657	ReadRequestBody(any) error
658	WriteResponse(*Response, any) error
659
660	// Close can be called multiple times and must be idempotent.
661	Close() error
662}
663
664// ServeConn runs the DefaultServer on a single connection.
665// ServeConn blocks, serving the connection until the client hangs up.
666// The caller typically invokes ServeConn in a go statement.
667// ServeConn uses the gob wire format (see package gob) on the
668// connection. To use an alternate codec, use ServeCodec.
669// See NewClient's comment for information about concurrent access.
670func ServeConn(conn io.ReadWriteCloser) {
671	DefaultServer.ServeConn(conn)
672}
673
674// ServeCodec is like ServeConn but uses the specified codec to
675// decode requests and encode responses.
676func ServeCodec(codec ServerCodec) {
677	DefaultServer.ServeCodec(codec)
678}
679
680// ServeRequest is like ServeCodec but synchronously serves a single request.
681// It does not close the codec upon completion.
682func ServeRequest(codec ServerCodec) error {
683	return DefaultServer.ServeRequest(codec)
684}
685
686// Accept accepts connections on the listener and serves requests
687// to DefaultServer for each incoming connection.
688// Accept blocks; the caller typically invokes it in a go statement.
689func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
690
691// Can connect to RPC service using HTTP CONNECT to rpcPath.
692var connected = "200 Connected to Go RPC"
693
694// ServeHTTP implements an http.Handler that answers RPC requests.
695func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
696	if req.Method != "CONNECT" {
697		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
698		w.WriteHeader(http.StatusMethodNotAllowed)
699		io.WriteString(w, "405 must CONNECT\n")
700		return
701	}
702	conn, _, err := w.(http.Hijacker).Hijack()
703	if err != nil {
704		log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
705		return
706	}
707	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
708	server.ServeConn(conn)
709}
710
711// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
712// and a debugging handler on debugPath.
713// It is still necessary to invoke http.Serve(), typically in a go statement.
714func (server *Server) HandleHTTP(rpcPath, debugPath string) {
715	http.Handle(rpcPath, server)
716	http.Handle(debugPath, debugHTTP{server})
717}
718
719// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
720// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
721// It is still necessary to invoke http.Serve(), typically in a go statement.
722func HandleHTTP() {
723	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
724}
725