1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5/*
6	Package rpc provides access to the exported methods of an object across a
7	network or other I/O connection.  A server registers an object, making it visible
8	as a service with the name of the type of the object.  After registration, exported
9	methods of the object will be accessible remotely.  A server may register multiple
10	objects (services) of different types but it is an error to register multiple
11	objects of the same type.
12
13	Only methods that satisfy these criteria will be made available for remote access;
14	other methods will be ignored:
15
16		- the method's type is exported.
17		- the method is exported.
18		- the method has two arguments, both exported (or builtin) types.
19		- the method's second argument is a pointer.
20		- the method has return type error.
21
22	In effect, the method must look schematically like
23
24		func (t *T) MethodName(argType T1, replyType *T2) error
25
26	where T1 and T2 can be marshaled by encoding/gob.
27	These requirements apply even if a different codec is used.
28	(In the future, these requirements may soften for custom codecs.)
29
30	The method's first argument represents the arguments provided by the caller; the
31	second argument represents the result parameters to be returned to the caller.
32	The method's return value, if non-nil, is passed back as a string that the client
33	sees as if created by errors.New.  If an error is returned, the reply parameter
34	will not be sent back to the client.
35
36	The server may handle requests on a single connection by calling ServeConn.  More
37	typically it will create a network listener and call Accept or, for an HTTP
38	listener, HandleHTTP and http.Serve.
39
40	A client wishing to use the service establishes a connection and then invokes
41	NewClient on the connection.  The convenience function Dial (DialHTTP) performs
42	both steps for a raw network connection (an HTTP connection).  The resulting
43	Client object has two methods, Call and Go, that specify the service and method to
44	call, a pointer containing the arguments, and a pointer to receive the result
45	parameters.
46
47	The Call method waits for the remote call to complete while the Go method
48	launches the call asynchronously and signals completion using the Call
49	structure's Done channel.
50
51	Unless an explicit codec is set up, package encoding/gob is used to
52	transport the data.
53
54	Here is a simple example.  A server wishes to export an object of type Arith:
55
56		package server
57
58		import "errors"
59
60		type Args struct {
61			A, B int
62		}
63
64		type Quotient struct {
65			Quo, Rem int
66		}
67
68		type Arith int
69
70		func (t *Arith) Multiply(args *Args, reply *int) error {
71			*reply = args.A * args.B
72			return nil
73		}
74
75		func (t *Arith) Divide(args *Args, quo *Quotient) error {
76			if args.B == 0 {
77				return errors.New("divide by zero")
78			}
79			quo.Quo = args.A / args.B
80			quo.Rem = args.A % args.B
81			return nil
82		}
83
84	The server calls (for HTTP service):
85
86		arith := new(Arith)
87		rpc.Register(arith)
88		rpc.HandleHTTP()
89		l, e := net.Listen("tcp", ":1234")
90		if e != nil {
91			log.Fatal("listen error:", e)
92		}
93		go http.Serve(l, nil)
94
95	At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
96	"Arith.Divide".  To invoke one, a client first dials the server:
97
98		client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
99		if err != nil {
100			log.Fatal("dialing:", err)
101		}
102
103	Then it can make a remote call:
104
105		// Synchronous call
106		args := &server.Args{7,8}
107		var reply int
108		err = client.Call("Arith.Multiply", args, &reply)
109		if err != nil {
110			log.Fatal("arith error:", err)
111		}
112		fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
113
114	or
115
116		// Asynchronous call
117		quotient := new(Quotient)
118		divCall := client.Go("Arith.Divide", args, quotient, nil)
119		replyCall := <-divCall.Done	// will be equal to divCall
120		// check errors, print, etc.
121
122	A server implementation will often provide a simple, type-safe wrapper for the
123	client.
124
125	The net/rpc package is frozen and is not accepting new features.
126*/
127package rpc
128
129import (
130	"bufio"
131	"encoding/gob"
132	"errors"
133	"io"
134	"log"
135	"net"
136	"net/http"
137	"reflect"
138	"strings"
139	"sync"
140	"unicode"
141	"unicode/utf8"
142)
143
144const (
145	// Defaults used by HandleHTTP
146	DefaultRPCPath   = "/_goRPC_"
147	DefaultDebugPath = "/debug/rpc"
148)
149
150// Precompute the reflect type for error. Can't use error directly
151// because Typeof takes an empty interface value. This is annoying.
152var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
153
154type methodType struct {
155	sync.Mutex // protects counters
156	method     reflect.Method
157	ArgType    reflect.Type
158	ReplyType  reflect.Type
159	numCalls   uint
160}
161
162type service struct {
163	name   string                 // name of service
164	rcvr   reflect.Value          // receiver of methods for the service
165	typ    reflect.Type           // type of the receiver
166	method map[string]*methodType // registered methods
167}
168
169// Request is a header written before every RPC call. It is used internally
170// but documented here as an aid to debugging, such as when analyzing
171// network traffic.
172type Request struct {
173	ServiceMethod string   // format: "Service.Method"
174	Seq           uint64   // sequence number chosen by client
175	next          *Request // for free list in Server
176}
177
178// Response is a header written before every RPC return. It is used internally
179// but documented here as an aid to debugging, such as when analyzing
180// network traffic.
181type Response struct {
182	ServiceMethod string    // echoes that of the Request
183	Seq           uint64    // echoes that of the request
184	Error         string    // error, if any.
185	next          *Response // for free list in Server
186}
187
188// Server represents an RPC Server.
189type Server struct {
190	serviceMap sync.Map   // map[string]*service
191	reqLock    sync.Mutex // protects freeReq
192	freeReq    *Request
193	respLock   sync.Mutex // protects freeResp
194	freeResp   *Response
195}
196
197// NewServer returns a new Server.
198func NewServer() *Server {
199	return &Server{}
200}
201
202// DefaultServer is the default instance of *Server.
203var DefaultServer = NewServer()
204
205// Is this an exported - upper case - name?
206func isExported(name string) bool {
207	rune, _ := utf8.DecodeRuneInString(name)
208	return unicode.IsUpper(rune)
209}
210
211// Is this type exported or a builtin?
212func isExportedOrBuiltinType(t reflect.Type) bool {
213	for t.Kind() == reflect.Ptr {
214		t = t.Elem()
215	}
216	// PkgPath will be non-empty even for an exported type,
217	// so we need to check the type name as well.
218	return isExported(t.Name()) || t.PkgPath() == ""
219}
220
221// Register publishes in the server the set of methods of the
222// receiver value that satisfy the following conditions:
223//	- exported method of exported type
224//	- two arguments, both of exported type
225//	- the second argument is a pointer
226//	- one return value, of type error
227// It returns an error if the receiver is not an exported type or has
228// no suitable methods. It also logs the error using package log.
229// The client accesses each method using a string of the form "Type.Method",
230// where Type is the receiver's concrete type.
231func (server *Server) Register(rcvr interface{}) error {
232	return server.register(rcvr, "", false)
233}
234
235// RegisterName is like Register but uses the provided name for the type
236// instead of the receiver's concrete type.
237func (server *Server) RegisterName(name string, rcvr interface{}) error {
238	return server.register(rcvr, name, true)
239}
240
241func (server *Server) register(rcvr interface{}, name string, useName bool) error {
242	s := new(service)
243	s.typ = reflect.TypeOf(rcvr)
244	s.rcvr = reflect.ValueOf(rcvr)
245	sname := reflect.Indirect(s.rcvr).Type().Name()
246	if useName {
247		sname = name
248	}
249	if sname == "" {
250		s := "rpc.Register: no service name for type " + s.typ.String()
251		log.Print(s)
252		return errors.New(s)
253	}
254	if !isExported(sname) && !useName {
255		s := "rpc.Register: type " + sname + " is not exported"
256		log.Print(s)
257		return errors.New(s)
258	}
259	s.name = sname
260
261	// Install the methods
262	s.method = suitableMethods(s.typ, true)
263
264	if len(s.method) == 0 {
265		str := ""
266
267		// To help the user, see if a pointer receiver would work.
268		method := suitableMethods(reflect.PtrTo(s.typ), false)
269		if len(method) != 0 {
270			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
271		} else {
272			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
273		}
274		log.Print(str)
275		return errors.New(str)
276	}
277
278	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
279		return errors.New("rpc: service already defined: " + sname)
280	}
281	return nil
282}
283
284// suitableMethods returns suitable Rpc methods of typ, it will report
285// error using log if reportErr is true.
286func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
287	methods := make(map[string]*methodType)
288	for m := 0; m < typ.NumMethod(); m++ {
289		method := typ.Method(m)
290		mtype := method.Type
291		mname := method.Name
292		// Method must be exported.
293		if method.PkgPath != "" {
294			continue
295		}
296		// Method needs three ins: receiver, *args, *reply.
297		if mtype.NumIn() != 3 {
298			if reportErr {
299				log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
300			}
301			continue
302		}
303		// First arg need not be a pointer.
304		argType := mtype.In(1)
305		if !isExportedOrBuiltinType(argType) {
306			if reportErr {
307				log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
308			}
309			continue
310		}
311		// Second arg must be a pointer.
312		replyType := mtype.In(2)
313		if replyType.Kind() != reflect.Ptr {
314			if reportErr {
315				log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
316			}
317			continue
318		}
319		// Reply type must be exported.
320		if !isExportedOrBuiltinType(replyType) {
321			if reportErr {
322				log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
323			}
324			continue
325		}
326		// Method needs one out.
327		if mtype.NumOut() != 1 {
328			if reportErr {
329				log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
330			}
331			continue
332		}
333		// The return type of the method must be error.
334		if returnType := mtype.Out(0); returnType != typeOfError {
335			if reportErr {
336				log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
337			}
338			continue
339		}
340		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
341	}
342	return methods
343}
344
345// A value sent as a placeholder for the server's response value when the server
346// receives an invalid request. It is never decoded by the client since the Response
347// contains an error when it is used.
348var invalidRequest = struct{}{}
349
350func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
351	resp := server.getResponse()
352	// Encode the response header
353	resp.ServiceMethod = req.ServiceMethod
354	if errmsg != "" {
355		resp.Error = errmsg
356		reply = invalidRequest
357	}
358	resp.Seq = req.Seq
359	sending.Lock()
360	err := codec.WriteResponse(resp, reply)
361	if debugLog && err != nil {
362		log.Println("rpc: writing response:", err)
363	}
364	sending.Unlock()
365	server.freeResponse(resp)
366}
367
368func (m *methodType) NumCalls() (n uint) {
369	m.Lock()
370	n = m.numCalls
371	m.Unlock()
372	return n
373}
374
375func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
376	if wg != nil {
377		defer wg.Done()
378	}
379	mtype.Lock()
380	mtype.numCalls++
381	mtype.Unlock()
382	function := mtype.method.Func
383	// Invoke the method, providing a new value for the reply.
384	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
385	// The return value for the method is an error.
386	errInter := returnValues[0].Interface()
387	errmsg := ""
388	if errInter != nil {
389		errmsg = errInter.(error).Error()
390	}
391	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
392	server.freeRequest(req)
393}
394
395type gobServerCodec struct {
396	rwc    io.ReadWriteCloser
397	dec    *gob.Decoder
398	enc    *gob.Encoder
399	encBuf *bufio.Writer
400	closed bool
401}
402
403func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
404	return c.dec.Decode(r)
405}
406
407func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
408	return c.dec.Decode(body)
409}
410
411func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
412	if err = c.enc.Encode(r); err != nil {
413		if c.encBuf.Flush() == nil {
414			// Gob couldn't encode the header. Should not happen, so if it does,
415			// shut down the connection to signal that the connection is broken.
416			log.Println("rpc: gob error encoding response:", err)
417			c.Close()
418		}
419		return
420	}
421	if err = c.enc.Encode(body); err != nil {
422		if c.encBuf.Flush() == nil {
423			// Was a gob problem encoding the body but the header has been written.
424			// Shut down the connection to signal that the connection is broken.
425			log.Println("rpc: gob error encoding body:", err)
426			c.Close()
427		}
428		return
429	}
430	return c.encBuf.Flush()
431}
432
433func (c *gobServerCodec) Close() error {
434	if c.closed {
435		// Only call c.rwc.Close once; otherwise the semantics are undefined.
436		return nil
437	}
438	c.closed = true
439	return c.rwc.Close()
440}
441
442// ServeConn runs the server on a single connection.
443// ServeConn blocks, serving the connection until the client hangs up.
444// The caller typically invokes ServeConn in a go statement.
445// ServeConn uses the gob wire format (see package gob) on the
446// connection. To use an alternate codec, use ServeCodec.
447func (server *Server) ServeConn(conn io.ReadWriteCloser) {
448	buf := bufio.NewWriter(conn)
449	srv := &gobServerCodec{
450		rwc:    conn,
451		dec:    gob.NewDecoder(conn),
452		enc:    gob.NewEncoder(buf),
453		encBuf: buf,
454	}
455	server.ServeCodec(srv)
456}
457
458// ServeCodec is like ServeConn but uses the specified codec to
459// decode requests and encode responses.
460func (server *Server) ServeCodec(codec ServerCodec) {
461	sending := new(sync.Mutex)
462	wg := new(sync.WaitGroup)
463	for {
464		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
465		if err != nil {
466			if debugLog && err != io.EOF {
467				log.Println("rpc:", err)
468			}
469			if !keepReading {
470				break
471			}
472			// send a response if we actually managed to read a header.
473			if req != nil {
474				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
475				server.freeRequest(req)
476			}
477			continue
478		}
479		wg.Add(1)
480		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
481	}
482	// We've seen that there are no more requests.
483	// Wait for responses to be sent before closing codec.
484	wg.Wait()
485	codec.Close()
486}
487
488// ServeRequest is like ServeCodec but synchronously serves a single request.
489// It does not close the codec upon completion.
490func (server *Server) ServeRequest(codec ServerCodec) error {
491	sending := new(sync.Mutex)
492	service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
493	if err != nil {
494		if !keepReading {
495			return err
496		}
497		// send a response if we actually managed to read a header.
498		if req != nil {
499			server.sendResponse(sending, req, invalidRequest, codec, err.Error())
500			server.freeRequest(req)
501		}
502		return err
503	}
504	service.call(server, sending, nil, mtype, req, argv, replyv, codec)
505	return nil
506}
507
508func (server *Server) getRequest() *Request {
509	server.reqLock.Lock()
510	req := server.freeReq
511	if req == nil {
512		req = new(Request)
513	} else {
514		server.freeReq = req.next
515		*req = Request{}
516	}
517	server.reqLock.Unlock()
518	return req
519}
520
521func (server *Server) freeRequest(req *Request) {
522	server.reqLock.Lock()
523	req.next = server.freeReq
524	server.freeReq = req
525	server.reqLock.Unlock()
526}
527
528func (server *Server) getResponse() *Response {
529	server.respLock.Lock()
530	resp := server.freeResp
531	if resp == nil {
532		resp = new(Response)
533	} else {
534		server.freeResp = resp.next
535		*resp = Response{}
536	}
537	server.respLock.Unlock()
538	return resp
539}
540
541func (server *Server) freeResponse(resp *Response) {
542	server.respLock.Lock()
543	resp.next = server.freeResp
544	server.freeResp = resp
545	server.respLock.Unlock()
546}
547
548func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
549	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
550	if err != nil {
551		if !keepReading {
552			return
553		}
554		// discard body
555		codec.ReadRequestBody(nil)
556		return
557	}
558
559	// Decode the argument value.
560	argIsValue := false // if true, need to indirect before calling.
561	if mtype.ArgType.Kind() == reflect.Ptr {
562		argv = reflect.New(mtype.ArgType.Elem())
563	} else {
564		argv = reflect.New(mtype.ArgType)
565		argIsValue = true
566	}
567	// argv guaranteed to be a pointer now.
568	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
569		return
570	}
571	if argIsValue {
572		argv = argv.Elem()
573	}
574
575	replyv = reflect.New(mtype.ReplyType.Elem())
576
577	switch mtype.ReplyType.Elem().Kind() {
578	case reflect.Map:
579		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
580	case reflect.Slice:
581		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
582	}
583	return
584}
585
586func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
587	// Grab the request header.
588	req = server.getRequest()
589	err = codec.ReadRequestHeader(req)
590	if err != nil {
591		req = nil
592		if err == io.EOF || err == io.ErrUnexpectedEOF {
593			return
594		}
595		err = errors.New("rpc: server cannot decode request: " + err.Error())
596		return
597	}
598
599	// We read the header successfully. If we see an error now,
600	// we can still recover and move on to the next request.
601	keepReading = true
602
603	dot := strings.LastIndex(req.ServiceMethod, ".")
604	if dot < 0 {
605		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
606		return
607	}
608	serviceName := req.ServiceMethod[:dot]
609	methodName := req.ServiceMethod[dot+1:]
610
611	// Look up the request.
612	svci, ok := server.serviceMap.Load(serviceName)
613	if !ok {
614		err = errors.New("rpc: can't find service " + req.ServiceMethod)
615		return
616	}
617	svc = svci.(*service)
618	mtype = svc.method[methodName]
619	if mtype == nil {
620		err = errors.New("rpc: can't find method " + req.ServiceMethod)
621	}
622	return
623}
624
625// Accept accepts connections on the listener and serves requests
626// for each incoming connection. Accept blocks until the listener
627// returns a non-nil error. The caller typically invokes Accept in a
628// go statement.
629func (server *Server) Accept(lis net.Listener) {
630	for {
631		conn, err := lis.Accept()
632		if err != nil {
633			log.Print("rpc.Serve: accept:", err.Error())
634			return
635		}
636		go server.ServeConn(conn)
637	}
638}
639
640// Register publishes the receiver's methods in the DefaultServer.
641func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
642
643// RegisterName is like Register but uses the provided name for the type
644// instead of the receiver's concrete type.
645func RegisterName(name string, rcvr interface{}) error {
646	return DefaultServer.RegisterName(name, rcvr)
647}
648
649// A ServerCodec implements reading of RPC requests and writing of
650// RPC responses for the server side of an RPC session.
651// The server calls ReadRequestHeader and ReadRequestBody in pairs
652// to read requests from the connection, and it calls WriteResponse to
653// write a response back. The server calls Close when finished with the
654// connection. ReadRequestBody may be called with a nil
655// argument to force the body of the request to be read and discarded.
656type ServerCodec interface {
657	ReadRequestHeader(*Request) error
658	ReadRequestBody(interface{}) error
659	// WriteResponse must be safe for concurrent use by multiple goroutines.
660	WriteResponse(*Response, interface{}) error
661
662	Close() error
663}
664
665// ServeConn runs the DefaultServer on a single connection.
666// ServeConn blocks, serving the connection until the client hangs up.
667// The caller typically invokes ServeConn in a go statement.
668// ServeConn uses the gob wire format (see package gob) on the
669// connection. To use an alternate codec, use ServeCodec.
670func ServeConn(conn io.ReadWriteCloser) {
671	DefaultServer.ServeConn(conn)
672}
673
674// ServeCodec is like ServeConn but uses the specified codec to
675// decode requests and encode responses.
676func ServeCodec(codec ServerCodec) {
677	DefaultServer.ServeCodec(codec)
678}
679
680// ServeRequest is like ServeCodec but synchronously serves a single request.
681// It does not close the codec upon completion.
682func ServeRequest(codec ServerCodec) error {
683	return DefaultServer.ServeRequest(codec)
684}
685
686// Accept accepts connections on the listener and serves requests
687// to DefaultServer for each incoming connection.
688// Accept blocks; the caller typically invokes it in a go statement.
689func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
690
691// Can connect to RPC service using HTTP CONNECT to rpcPath.
692var connected = "200 Connected to Go RPC"
693
694// ServeHTTP implements an http.Handler that answers RPC requests.
695func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
696	if req.Method != "CONNECT" {
697		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
698		w.WriteHeader(http.StatusMethodNotAllowed)
699		io.WriteString(w, "405 must CONNECT\n")
700		return
701	}
702	conn, _, err := w.(http.Hijacker).Hijack()
703	if err != nil {
704		log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
705		return
706	}
707	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
708	server.ServeConn(conn)
709}
710
711// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
712// and a debugging handler on debugPath.
713// It is still necessary to invoke http.Serve(), typically in a go statement.
714func (server *Server) HandleHTTP(rpcPath, debugPath string) {
715	http.Handle(rpcPath, server)
716	http.Handle(debugPath, debugHTTP{server})
717}
718
719// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
720// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
721// It is still necessary to invoke http.Serve(), typically in a go statement.
722func HandleHTTP() {
723	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
724}
725