1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5/*
6	Package rpc provides access to the exported methods of an object across a
7	network or other I/O connection.  A server registers an object, making it visible
8	as a service with the name of the type of the object.  After registration, exported
9	methods of the object will be accessible remotely.  A server may register multiple
10	objects (services) of different types but it is an error to register multiple
11	objects of the same type.
12
13	Only methods that satisfy these criteria will be made available for remote access;
14	other methods will be ignored:
15
16		- the method's type is exported.
17		- the method is exported.
18		- the method has two arguments, both exported (or builtin) types.
19		- the method's second argument is a pointer.
20		- the method has return type error.
21
22	In effect, the method must look schematically like
23
24		func (t *T) MethodName(argType T1, replyType *T2) error
25
26	where T1 and T2 can be marshaled by encoding/gob.
27	These requirements apply even if a different codec is used.
28	(In the future, these requirements may soften for custom codecs.)
29
30	The method's first argument represents the arguments provided by the caller; the
31	second argument represents the result parameters to be returned to the caller.
32	The method's return value, if non-nil, is passed back as a string that the client
33	sees as if created by errors.New.  If an error is returned, the reply parameter
34	will not be sent back to the client.
35
36	The server may handle requests on a single connection by calling ServeConn.  More
37	typically it will create a network listener and call Accept or, for an HTTP
38	listener, HandleHTTP and http.Serve.
39
40	A client wishing to use the service establishes a connection and then invokes
41	NewClient on the connection.  The convenience function Dial (DialHTTP) performs
42	both steps for a raw network connection (an HTTP connection).  The resulting
43	Client object has two methods, Call and Go, that specify the service and method to
44	call, a pointer containing the arguments, and a pointer to receive the result
45	parameters.
46
47	The Call method waits for the remote call to complete while the Go method
48	launches the call asynchronously and signals completion using the Call
49	structure's Done channel.
50
51	Unless an explicit codec is set up, package encoding/gob is used to
52	transport the data.
53
54	Here is a simple example.  A server wishes to export an object of type Arith:
55
56		package server
57
58		import "errors"
59
60		type Args struct {
61			A, B int
62		}
63
64		type Quotient struct {
65			Quo, Rem int
66		}
67
68		type Arith int
69
70		func (t *Arith) Multiply(args *Args, reply *int) error {
71			*reply = args.A * args.B
72			return nil
73		}
74
75		func (t *Arith) Divide(args *Args, quo *Quotient) error {
76			if args.B == 0 {
77				return errors.New("divide by zero")
78			}
79			quo.Quo = args.A / args.B
80			quo.Rem = args.A % args.B
81			return nil
82		}
83
84	The server calls (for HTTP service):
85
86		arith := new(Arith)
87		rpc.Register(arith)
88		rpc.HandleHTTP()
89		l, e := net.Listen("tcp", ":1234")
90		if e != nil {
91			log.Fatal("listen error:", e)
92		}
93		go http.Serve(l, nil)
94
95	At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
96	"Arith.Divide".  To invoke one, a client first dials the server:
97
98		client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
99		if err != nil {
100			log.Fatal("dialing:", err)
101		}
102
103	Then it can make a remote call:
104
105		// Synchronous call
106		args := &server.Args{7,8}
107		var reply int
108		err = client.Call("Arith.Multiply", args, &reply)
109		if err != nil {
110			log.Fatal("arith error:", err)
111		}
112		fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
113
114	or
115
116		// Asynchronous call
117		quotient := new(Quotient)
118		divCall := client.Go("Arith.Divide", args, quotient, nil)
119		replyCall := <-divCall.Done	// will be equal to divCall
120		// check errors, print, etc.
121
122	A server implementation will often provide a simple, type-safe wrapper for the
123	client.
124
125	The net/rpc package is frozen and is not accepting new features.
126*/
127package rpc
128
129import (
130	"bufio"
131	"encoding/gob"
132	"errors"
133	"go/token"
134	"io"
135	"log"
136	"net"
137	"net/http"
138	"reflect"
139	"strings"
140	"sync"
141)
142
143const (
144	// Defaults used by HandleHTTP
145	DefaultRPCPath   = "/_goRPC_"
146	DefaultDebugPath = "/debug/rpc"
147)
148
149// Precompute the reflect type for error. Can't use error directly
150// because Typeof takes an empty interface value. This is annoying.
151var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
152
153type methodType struct {
154	sync.Mutex // protects counters
155	method     reflect.Method
156	ArgType    reflect.Type
157	ReplyType  reflect.Type
158	numCalls   uint
159}
160
161type service struct {
162	name   string                 // name of service
163	rcvr   reflect.Value          // receiver of methods for the service
164	typ    reflect.Type           // type of the receiver
165	method map[string]*methodType // registered methods
166}
167
168// Request is a header written before every RPC call. It is used internally
169// but documented here as an aid to debugging, such as when analyzing
170// network traffic.
171type Request struct {
172	ServiceMethod string   // format: "Service.Method"
173	Seq           uint64   // sequence number chosen by client
174	next          *Request // for free list in Server
175}
176
177// Response is a header written before every RPC return. It is used internally
178// but documented here as an aid to debugging, such as when analyzing
179// network traffic.
180type Response struct {
181	ServiceMethod string    // echoes that of the Request
182	Seq           uint64    // echoes that of the request
183	Error         string    // error, if any.
184	next          *Response // for free list in Server
185}
186
187// Server represents an RPC Server.
188type Server struct {
189	serviceMap sync.Map   // map[string]*service
190	reqLock    sync.Mutex // protects freeReq
191	freeReq    *Request
192	respLock   sync.Mutex // protects freeResp
193	freeResp   *Response
194}
195
196// NewServer returns a new Server.
197func NewServer() *Server {
198	return &Server{}
199}
200
201// DefaultServer is the default instance of *Server.
202var DefaultServer = NewServer()
203
204// Is this type exported or a builtin?
205func isExportedOrBuiltinType(t reflect.Type) bool {
206	for t.Kind() == reflect.Ptr {
207		t = t.Elem()
208	}
209	// PkgPath will be non-empty even for an exported type,
210	// so we need to check the type name as well.
211	return token.IsExported(t.Name()) || t.PkgPath() == ""
212}
213
214// Register publishes in the server the set of methods of the
215// receiver value that satisfy the following conditions:
216//	- exported method of exported type
217//	- two arguments, both of exported type
218//	- the second argument is a pointer
219//	- one return value, of type error
220// It returns an error if the receiver is not an exported type or has
221// no suitable methods. It also logs the error using package log.
222// The client accesses each method using a string of the form "Type.Method",
223// where Type is the receiver's concrete type.
224func (server *Server) Register(rcvr interface{}) error {
225	return server.register(rcvr, "", false)
226}
227
228// RegisterName is like Register but uses the provided name for the type
229// instead of the receiver's concrete type.
230func (server *Server) RegisterName(name string, rcvr interface{}) error {
231	return server.register(rcvr, name, true)
232}
233
234func (server *Server) register(rcvr interface{}, name string, useName bool) error {
235	s := new(service)
236	s.typ = reflect.TypeOf(rcvr)
237	s.rcvr = reflect.ValueOf(rcvr)
238	sname := reflect.Indirect(s.rcvr).Type().Name()
239	if useName {
240		sname = name
241	}
242	if sname == "" {
243		s := "rpc.Register: no service name for type " + s.typ.String()
244		log.Print(s)
245		return errors.New(s)
246	}
247	if !token.IsExported(sname) && !useName {
248		s := "rpc.Register: type " + sname + " is not exported"
249		log.Print(s)
250		return errors.New(s)
251	}
252	s.name = sname
253
254	// Install the methods
255	s.method = suitableMethods(s.typ, true)
256
257	if len(s.method) == 0 {
258		str := ""
259
260		// To help the user, see if a pointer receiver would work.
261		method := suitableMethods(reflect.PtrTo(s.typ), false)
262		if len(method) != 0 {
263			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
264		} else {
265			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
266		}
267		log.Print(str)
268		return errors.New(str)
269	}
270
271	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
272		return errors.New("rpc: service already defined: " + sname)
273	}
274	return nil
275}
276
277// suitableMethods returns suitable Rpc methods of typ, it will report
278// error using log if reportErr is true.
279func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
280	methods := make(map[string]*methodType)
281	for m := 0; m < typ.NumMethod(); m++ {
282		method := typ.Method(m)
283		mtype := method.Type
284		mname := method.Name
285		// Method must be exported.
286		if method.PkgPath != "" {
287			continue
288		}
289		// Method needs three ins: receiver, *args, *reply.
290		if mtype.NumIn() != 3 {
291			if reportErr {
292				log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
293			}
294			continue
295		}
296		// First arg need not be a pointer.
297		argType := mtype.In(1)
298		if !isExportedOrBuiltinType(argType) {
299			if reportErr {
300				log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
301			}
302			continue
303		}
304		// Second arg must be a pointer.
305		replyType := mtype.In(2)
306		if replyType.Kind() != reflect.Ptr {
307			if reportErr {
308				log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
309			}
310			continue
311		}
312		// Reply type must be exported.
313		if !isExportedOrBuiltinType(replyType) {
314			if reportErr {
315				log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
316			}
317			continue
318		}
319		// Method needs one out.
320		if mtype.NumOut() != 1 {
321			if reportErr {
322				log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
323			}
324			continue
325		}
326		// The return type of the method must be error.
327		if returnType := mtype.Out(0); returnType != typeOfError {
328			if reportErr {
329				log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
330			}
331			continue
332		}
333		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
334	}
335	return methods
336}
337
338// A value sent as a placeholder for the server's response value when the server
339// receives an invalid request. It is never decoded by the client since the Response
340// contains an error when it is used.
341var invalidRequest = struct{}{}
342
343func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
344	resp := server.getResponse()
345	// Encode the response header
346	resp.ServiceMethod = req.ServiceMethod
347	if errmsg != "" {
348		resp.Error = errmsg
349		reply = invalidRequest
350	}
351	resp.Seq = req.Seq
352	sending.Lock()
353	err := codec.WriteResponse(resp, reply)
354	if debugLog && err != nil {
355		log.Println("rpc: writing response:", err)
356	}
357	sending.Unlock()
358	server.freeResponse(resp)
359}
360
361func (m *methodType) NumCalls() (n uint) {
362	m.Lock()
363	n = m.numCalls
364	m.Unlock()
365	return n
366}
367
368func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
369	if wg != nil {
370		defer wg.Done()
371	}
372	mtype.Lock()
373	mtype.numCalls++
374	mtype.Unlock()
375	function := mtype.method.Func
376	// Invoke the method, providing a new value for the reply.
377	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
378	// The return value for the method is an error.
379	errInter := returnValues[0].Interface()
380	errmsg := ""
381	if errInter != nil {
382		errmsg = errInter.(error).Error()
383	}
384	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
385	server.freeRequest(req)
386}
387
388type gobServerCodec struct {
389	rwc    io.ReadWriteCloser
390	dec    *gob.Decoder
391	enc    *gob.Encoder
392	encBuf *bufio.Writer
393	closed bool
394}
395
396func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
397	return c.dec.Decode(r)
398}
399
400func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
401	return c.dec.Decode(body)
402}
403
404func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
405	if err = c.enc.Encode(r); err != nil {
406		if c.encBuf.Flush() == nil {
407			// Gob couldn't encode the header. Should not happen, so if it does,
408			// shut down the connection to signal that the connection is broken.
409			log.Println("rpc: gob error encoding response:", err)
410			c.Close()
411		}
412		return
413	}
414	if err = c.enc.Encode(body); err != nil {
415		if c.encBuf.Flush() == nil {
416			// Was a gob problem encoding the body but the header has been written.
417			// Shut down the connection to signal that the connection is broken.
418			log.Println("rpc: gob error encoding body:", err)
419			c.Close()
420		}
421		return
422	}
423	return c.encBuf.Flush()
424}
425
426func (c *gobServerCodec) Close() error {
427	if c.closed {
428		// Only call c.rwc.Close once; otherwise the semantics are undefined.
429		return nil
430	}
431	c.closed = true
432	return c.rwc.Close()
433}
434
435// ServeConn runs the server on a single connection.
436// ServeConn blocks, serving the connection until the client hangs up.
437// The caller typically invokes ServeConn in a go statement.
438// ServeConn uses the gob wire format (see package gob) on the
439// connection. To use an alternate codec, use ServeCodec.
440// See NewClient's comment for information about concurrent access.
441func (server *Server) ServeConn(conn io.ReadWriteCloser) {
442	buf := bufio.NewWriter(conn)
443	srv := &gobServerCodec{
444		rwc:    conn,
445		dec:    gob.NewDecoder(conn),
446		enc:    gob.NewEncoder(buf),
447		encBuf: buf,
448	}
449	server.ServeCodec(srv)
450}
451
452// ServeCodec is like ServeConn but uses the specified codec to
453// decode requests and encode responses.
454func (server *Server) ServeCodec(codec ServerCodec) {
455	sending := new(sync.Mutex)
456	wg := new(sync.WaitGroup)
457	for {
458		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
459		if err != nil {
460			if debugLog && err != io.EOF {
461				log.Println("rpc:", err)
462			}
463			if !keepReading {
464				break
465			}
466			// send a response if we actually managed to read a header.
467			if req != nil {
468				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
469				server.freeRequest(req)
470			}
471			continue
472		}
473		wg.Add(1)
474		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
475	}
476	// We've seen that there are no more requests.
477	// Wait for responses to be sent before closing codec.
478	wg.Wait()
479	codec.Close()
480}
481
482// ServeRequest is like ServeCodec but synchronously serves a single request.
483// It does not close the codec upon completion.
484func (server *Server) ServeRequest(codec ServerCodec) error {
485	sending := new(sync.Mutex)
486	service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
487	if err != nil {
488		if !keepReading {
489			return err
490		}
491		// send a response if we actually managed to read a header.
492		if req != nil {
493			server.sendResponse(sending, req, invalidRequest, codec, err.Error())
494			server.freeRequest(req)
495		}
496		return err
497	}
498	service.call(server, sending, nil, mtype, req, argv, replyv, codec)
499	return nil
500}
501
502func (server *Server) getRequest() *Request {
503	server.reqLock.Lock()
504	req := server.freeReq
505	if req == nil {
506		req = new(Request)
507	} else {
508		server.freeReq = req.next
509		*req = Request{}
510	}
511	server.reqLock.Unlock()
512	return req
513}
514
515func (server *Server) freeRequest(req *Request) {
516	server.reqLock.Lock()
517	req.next = server.freeReq
518	server.freeReq = req
519	server.reqLock.Unlock()
520}
521
522func (server *Server) getResponse() *Response {
523	server.respLock.Lock()
524	resp := server.freeResp
525	if resp == nil {
526		resp = new(Response)
527	} else {
528		server.freeResp = resp.next
529		*resp = Response{}
530	}
531	server.respLock.Unlock()
532	return resp
533}
534
535func (server *Server) freeResponse(resp *Response) {
536	server.respLock.Lock()
537	resp.next = server.freeResp
538	server.freeResp = resp
539	server.respLock.Unlock()
540}
541
542func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
543	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
544	if err != nil {
545		if !keepReading {
546			return
547		}
548		// discard body
549		codec.ReadRequestBody(nil)
550		return
551	}
552
553	// Decode the argument value.
554	argIsValue := false // if true, need to indirect before calling.
555	if mtype.ArgType.Kind() == reflect.Ptr {
556		argv = reflect.New(mtype.ArgType.Elem())
557	} else {
558		argv = reflect.New(mtype.ArgType)
559		argIsValue = true
560	}
561	// argv guaranteed to be a pointer now.
562	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
563		return
564	}
565	if argIsValue {
566		argv = argv.Elem()
567	}
568
569	replyv = reflect.New(mtype.ReplyType.Elem())
570
571	switch mtype.ReplyType.Elem().Kind() {
572	case reflect.Map:
573		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
574	case reflect.Slice:
575		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
576	}
577	return
578}
579
580func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
581	// Grab the request header.
582	req = server.getRequest()
583	err = codec.ReadRequestHeader(req)
584	if err != nil {
585		req = nil
586		if err == io.EOF || err == io.ErrUnexpectedEOF {
587			return
588		}
589		err = errors.New("rpc: server cannot decode request: " + err.Error())
590		return
591	}
592
593	// We read the header successfully. If we see an error now,
594	// we can still recover and move on to the next request.
595	keepReading = true
596
597	dot := strings.LastIndex(req.ServiceMethod, ".")
598	if dot < 0 {
599		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
600		return
601	}
602	serviceName := req.ServiceMethod[:dot]
603	methodName := req.ServiceMethod[dot+1:]
604
605	// Look up the request.
606	svci, ok := server.serviceMap.Load(serviceName)
607	if !ok {
608		err = errors.New("rpc: can't find service " + req.ServiceMethod)
609		return
610	}
611	svc = svci.(*service)
612	mtype = svc.method[methodName]
613	if mtype == nil {
614		err = errors.New("rpc: can't find method " + req.ServiceMethod)
615	}
616	return
617}
618
619// Accept accepts connections on the listener and serves requests
620// for each incoming connection. Accept blocks until the listener
621// returns a non-nil error. The caller typically invokes Accept in a
622// go statement.
623func (server *Server) Accept(lis net.Listener) {
624	for {
625		conn, err := lis.Accept()
626		if err != nil {
627			log.Print("rpc.Serve: accept:", err.Error())
628			return
629		}
630		go server.ServeConn(conn)
631	}
632}
633
634// Register publishes the receiver's methods in the DefaultServer.
635func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
636
637// RegisterName is like Register but uses the provided name for the type
638// instead of the receiver's concrete type.
639func RegisterName(name string, rcvr interface{}) error {
640	return DefaultServer.RegisterName(name, rcvr)
641}
642
643// A ServerCodec implements reading of RPC requests and writing of
644// RPC responses for the server side of an RPC session.
645// The server calls ReadRequestHeader and ReadRequestBody in pairs
646// to read requests from the connection, and it calls WriteResponse to
647// write a response back. The server calls Close when finished with the
648// connection. ReadRequestBody may be called with a nil
649// argument to force the body of the request to be read and discarded.
650// See NewClient's comment for information about concurrent access.
651type ServerCodec interface {
652	ReadRequestHeader(*Request) error
653	ReadRequestBody(interface{}) error
654	WriteResponse(*Response, interface{}) error
655
656	// Close can be called multiple times and must be idempotent.
657	Close() error
658}
659
660// ServeConn runs the DefaultServer on a single connection.
661// ServeConn blocks, serving the connection until the client hangs up.
662// The caller typically invokes ServeConn in a go statement.
663// ServeConn uses the gob wire format (see package gob) on the
664// connection. To use an alternate codec, use ServeCodec.
665// See NewClient's comment for information about concurrent access.
666func ServeConn(conn io.ReadWriteCloser) {
667	DefaultServer.ServeConn(conn)
668}
669
670// ServeCodec is like ServeConn but uses the specified codec to
671// decode requests and encode responses.
672func ServeCodec(codec ServerCodec) {
673	DefaultServer.ServeCodec(codec)
674}
675
676// ServeRequest is like ServeCodec but synchronously serves a single request.
677// It does not close the codec upon completion.
678func ServeRequest(codec ServerCodec) error {
679	return DefaultServer.ServeRequest(codec)
680}
681
682// Accept accepts connections on the listener and serves requests
683// to DefaultServer for each incoming connection.
684// Accept blocks; the caller typically invokes it in a go statement.
685func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
686
687// Can connect to RPC service using HTTP CONNECT to rpcPath.
688var connected = "200 Connected to Go RPC"
689
690// ServeHTTP implements an http.Handler that answers RPC requests.
691func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
692	if req.Method != "CONNECT" {
693		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
694		w.WriteHeader(http.StatusMethodNotAllowed)
695		io.WriteString(w, "405 must CONNECT\n")
696		return
697	}
698	conn, _, err := w.(http.Hijacker).Hijack()
699	if err != nil {
700		log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
701		return
702	}
703	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
704	server.ServeConn(conn)
705}
706
707// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
708// and a debugging handler on debugPath.
709// It is still necessary to invoke http.Serve(), typically in a go statement.
710func (server *Server) HandleHTTP(rpcPath, debugPath string) {
711	http.Handle(rpcPath, server)
712	http.Handle(debugPath, debugHTTP{server})
713}
714
715// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
716// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
717// It is still necessary to invoke http.Serve(), typically in a go statement.
718func HandleHTTP() {
719	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
720}
721