1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5/*
6	Package rpc provides access to the exported methods of an object across a
7	network or other I/O connection.  A server registers an object, making it visible
8	as a service with the name of the type of the object.  After registration, exported
9	methods of the object will be accessible remotely.  A server may register multiple
10	objects (services) of different types but it is an error to register multiple
11	objects of the same type.
12
13	Only methods that satisfy these criteria will be made available for remote access;
14	other methods will be ignored:
15
16		- the method's type is exported.
17		- the method is exported.
18		- the method has two arguments, both exported (or builtin) types.
19		- the method's second argument is a pointer.
20		- the method has return type error.
21
22	In effect, the method must look schematically like
23
24		func (t *T) MethodName(argType T1, replyType *T2) error
25
26	where T, T1 and T2 can be marshaled by encoding/gob.
27	These requirements apply even if a different codec is used.
28	(In the future, these requirements may soften for custom codecs.)
29
30	The method's first argument represents the arguments provided by the caller; the
31	second argument represents the result parameters to be returned to the caller.
32	The method's return value, if non-nil, is passed back as a string that the client
33	sees as if created by errors.New.  If an error is returned, the reply parameter
34	will not be sent back to the client.
35
36	The server may handle requests on a single connection by calling ServeConn.  More
37	typically it will create a network listener and call Accept or, for an HTTP
38	listener, HandleHTTP and http.Serve.
39
40	A client wishing to use the service establishes a connection and then invokes
41	NewClient on the connection.  The convenience function Dial (DialHTTP) performs
42	both steps for a raw network connection (an HTTP connection).  The resulting
43	Client object has two methods, Call and Go, that specify the service and method to
44	call, a pointer containing the arguments, and a pointer to receive the result
45	parameters.
46
47	The Call method waits for the remote call to complete while the Go method
48	launches the call asynchronously and signals completion using the Call
49	structure's Done channel.
50
51	Unless an explicit codec is set up, package encoding/gob is used to
52	transport the data.
53
54	Here is a simple example.  A server wishes to export an object of type Arith:
55
56		package server
57
58		type Args struct {
59			A, B int
60		}
61
62		type Quotient struct {
63			Quo, Rem int
64		}
65
66		type Arith int
67
68		func (t *Arith) Multiply(args *Args, reply *int) error {
69			*reply = args.A * args.B
70			return nil
71		}
72
73		func (t *Arith) Divide(args *Args, quo *Quotient) error {
74			if args.B == 0 {
75				return errors.New("divide by zero")
76			}
77			quo.Quo = args.A / args.B
78			quo.Rem = args.A % args.B
79			return nil
80		}
81
82	The server calls (for HTTP service):
83
84		arith := new(Arith)
85		rpc.Register(arith)
86		rpc.HandleHTTP()
87		l, e := net.Listen("tcp", ":1234")
88		if e != nil {
89			log.Fatal("listen error:", e)
90		}
91		go http.Serve(l, nil)
92
93	At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
94	"Arith.Divide".  To invoke one, a client first dials the server:
95
96		client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
97		if err != nil {
98			log.Fatal("dialing:", err)
99		}
100
101	Then it can make a remote call:
102
103		// Synchronous call
104		args := &server.Args{7,8}
105		var reply int
106		err = client.Call("Arith.Multiply", args, &reply)
107		if err != nil {
108			log.Fatal("arith error:", err)
109		}
110		fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
111
112	or
113
114		// Asynchronous call
115		quotient := new(Quotient)
116		divCall := client.Go("Arith.Divide", args, quotient, nil)
117		replyCall := <-divCall.Done	// will be equal to divCall
118		// check errors, print, etc.
119
120	A server implementation will often provide a simple, type-safe wrapper for the
121	client.
122*/
123package rpc
124
125import (
126	"bufio"
127	"encoding/gob"
128	"errors"
129	"io"
130	"log"
131	"net"
132	"net/http"
133	"reflect"
134	"strings"
135	"sync"
136	"unicode"
137	"unicode/utf8"
138)
139
140const (
141	// Defaults used by HandleHTTP
142	DefaultRPCPath   = "/_goRPC_"
143	DefaultDebugPath = "/debug/rpc"
144)
145
146// Precompute the reflect type for error.  Can't use error directly
147// because Typeof takes an empty interface value.  This is annoying.
148var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
149
150type methodType struct {
151	sync.Mutex // protects counters
152	method     reflect.Method
153	ArgType    reflect.Type
154	ReplyType  reflect.Type
155	numCalls   uint
156}
157
158type service struct {
159	name   string                 // name of service
160	rcvr   reflect.Value          // receiver of methods for the service
161	typ    reflect.Type           // type of the receiver
162	method map[string]*methodType // registered methods
163}
164
165// Request is a header written before every RPC call.  It is used internally
166// but documented here as an aid to debugging, such as when analyzing
167// network traffic.
168type Request struct {
169	ServiceMethod string   // format: "Service.Method"
170	Seq           uint64   // sequence number chosen by client
171	next          *Request // for free list in Server
172}
173
174// Response is a header written before every RPC return.  It is used internally
175// but documented here as an aid to debugging, such as when analyzing
176// network traffic.
177type Response struct {
178	ServiceMethod string    // echoes that of the Request
179	Seq           uint64    // echoes that of the request
180	Error         string    // error, if any.
181	next          *Response // for free list in Server
182}
183
184// Server represents an RPC Server.
185type Server struct {
186	mu         sync.RWMutex // protects the serviceMap
187	serviceMap map[string]*service
188	reqLock    sync.Mutex // protects freeReq
189	freeReq    *Request
190	respLock   sync.Mutex // protects freeResp
191	freeResp   *Response
192}
193
194// NewServer returns a new Server.
195func NewServer() *Server {
196	return &Server{serviceMap: make(map[string]*service)}
197}
198
199// DefaultServer is the default instance of *Server.
200var DefaultServer = NewServer()
201
202// Is this an exported - upper case - name?
203func isExported(name string) bool {
204	rune, _ := utf8.DecodeRuneInString(name)
205	return unicode.IsUpper(rune)
206}
207
208// Is this type exported or a builtin?
209func isExportedOrBuiltinType(t reflect.Type) bool {
210	for t.Kind() == reflect.Ptr {
211		t = t.Elem()
212	}
213	// PkgPath will be non-empty even for an exported type,
214	// so we need to check the type name as well.
215	return isExported(t.Name()) || t.PkgPath() == ""
216}
217
218// Register publishes in the server the set of methods of the
219// receiver value that satisfy the following conditions:
220//	- exported method of exported type
221//	- two arguments, both of exported type
222//	- the second argument is a pointer
223//	- one return value, of type error
224// It returns an error if the receiver is not an exported type or has
225// no suitable methods. It also logs the error using package log.
226// The client accesses each method using a string of the form "Type.Method",
227// where Type is the receiver's concrete type.
228func (server *Server) Register(rcvr interface{}) error {
229	return server.register(rcvr, "", false)
230}
231
232// RegisterName is like Register but uses the provided name for the type
233// instead of the receiver's concrete type.
234func (server *Server) RegisterName(name string, rcvr interface{}) error {
235	return server.register(rcvr, name, true)
236}
237
238func (server *Server) register(rcvr interface{}, name string, useName bool) error {
239	server.mu.Lock()
240	defer server.mu.Unlock()
241	if server.serviceMap == nil {
242		server.serviceMap = make(map[string]*service)
243	}
244	s := new(service)
245	s.typ = reflect.TypeOf(rcvr)
246	s.rcvr = reflect.ValueOf(rcvr)
247	sname := reflect.Indirect(s.rcvr).Type().Name()
248	if useName {
249		sname = name
250	}
251	if sname == "" {
252		s := "rpc.Register: no service name for type " + s.typ.String()
253		log.Print(s)
254		return errors.New(s)
255	}
256	if !isExported(sname) && !useName {
257		s := "rpc.Register: type " + sname + " is not exported"
258		log.Print(s)
259		return errors.New(s)
260	}
261	if _, present := server.serviceMap[sname]; present {
262		return errors.New("rpc: service already defined: " + sname)
263	}
264	s.name = sname
265
266	// Install the methods
267	s.method = suitableMethods(s.typ, true)
268
269	if len(s.method) == 0 {
270		str := ""
271
272		// To help the user, see if a pointer receiver would work.
273		method := suitableMethods(reflect.PtrTo(s.typ), false)
274		if len(method) != 0 {
275			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
276		} else {
277			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
278		}
279		log.Print(str)
280		return errors.New(str)
281	}
282	server.serviceMap[s.name] = s
283	return nil
284}
285
286// suitableMethods returns suitable Rpc methods of typ, it will report
287// error using log if reportErr is true.
288func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
289	methods := make(map[string]*methodType)
290	for m := 0; m < typ.NumMethod(); m++ {
291		method := typ.Method(m)
292		mtype := method.Type
293		mname := method.Name
294		// Method must be exported.
295		if method.PkgPath != "" {
296			continue
297		}
298		// Method needs three ins: receiver, *args, *reply.
299		if mtype.NumIn() != 3 {
300			if reportErr {
301				log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
302			}
303			continue
304		}
305		// First arg need not be a pointer.
306		argType := mtype.In(1)
307		if !isExportedOrBuiltinType(argType) {
308			if reportErr {
309				log.Println(mname, "argument type not exported:", argType)
310			}
311			continue
312		}
313		// Second arg must be a pointer.
314		replyType := mtype.In(2)
315		if replyType.Kind() != reflect.Ptr {
316			if reportErr {
317				log.Println("method", mname, "reply type not a pointer:", replyType)
318			}
319			continue
320		}
321		// Reply type must be exported.
322		if !isExportedOrBuiltinType(replyType) {
323			if reportErr {
324				log.Println("method", mname, "reply type not exported:", replyType)
325			}
326			continue
327		}
328		// Method needs one out.
329		if mtype.NumOut() != 1 {
330			if reportErr {
331				log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
332			}
333			continue
334		}
335		// The return type of the method must be error.
336		if returnType := mtype.Out(0); returnType != typeOfError {
337			if reportErr {
338				log.Println("method", mname, "returns", returnType.String(), "not error")
339			}
340			continue
341		}
342		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
343	}
344	return methods
345}
346
347// A value sent as a placeholder for the server's response value when the server
348// receives an invalid request. It is never decoded by the client since the Response
349// contains an error when it is used.
350var invalidRequest = struct{}{}
351
352func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
353	resp := server.getResponse()
354	// Encode the response header
355	resp.ServiceMethod = req.ServiceMethod
356	if errmsg != "" {
357		resp.Error = errmsg
358		reply = invalidRequest
359	}
360	resp.Seq = req.Seq
361	sending.Lock()
362	err := codec.WriteResponse(resp, reply)
363	if debugLog && err != nil {
364		log.Println("rpc: writing response:", err)
365	}
366	sending.Unlock()
367	server.freeResponse(resp)
368}
369
370func (m *methodType) NumCalls() (n uint) {
371	m.Lock()
372	n = m.numCalls
373	m.Unlock()
374	return n
375}
376
377func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
378	mtype.Lock()
379	mtype.numCalls++
380	mtype.Unlock()
381	function := mtype.method.Func
382	// Invoke the method, providing a new value for the reply.
383	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
384	// The return value for the method is an error.
385	errInter := returnValues[0].Interface()
386	errmsg := ""
387	if errInter != nil {
388		errmsg = errInter.(error).Error()
389	}
390	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
391	server.freeRequest(req)
392}
393
394type gobServerCodec struct {
395	rwc    io.ReadWriteCloser
396	dec    *gob.Decoder
397	enc    *gob.Encoder
398	encBuf *bufio.Writer
399	closed bool
400}
401
402func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
403	return c.dec.Decode(r)
404}
405
406func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
407	return c.dec.Decode(body)
408}
409
410func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
411	if err = c.enc.Encode(r); err != nil {
412		if c.encBuf.Flush() == nil {
413			// Gob couldn't encode the header. Should not happen, so if it does,
414			// shut down the connection to signal that the connection is broken.
415			log.Println("rpc: gob error encoding response:", err)
416			c.Close()
417		}
418		return
419	}
420	if err = c.enc.Encode(body); err != nil {
421		if c.encBuf.Flush() == nil {
422			// Was a gob problem encoding the body but the header has been written.
423			// Shut down the connection to signal that the connection is broken.
424			log.Println("rpc: gob error encoding body:", err)
425			c.Close()
426		}
427		return
428	}
429	return c.encBuf.Flush()
430}
431
432func (c *gobServerCodec) Close() error {
433	if c.closed {
434		// Only call c.rwc.Close once; otherwise the semantics are undefined.
435		return nil
436	}
437	c.closed = true
438	return c.rwc.Close()
439}
440
441// ServeConn runs the server on a single connection.
442// ServeConn blocks, serving the connection until the client hangs up.
443// The caller typically invokes ServeConn in a go statement.
444// ServeConn uses the gob wire format (see package gob) on the
445// connection.  To use an alternate codec, use ServeCodec.
446func (server *Server) ServeConn(conn io.ReadWriteCloser) {
447	buf := bufio.NewWriter(conn)
448	srv := &gobServerCodec{
449		rwc:    conn,
450		dec:    gob.NewDecoder(conn),
451		enc:    gob.NewEncoder(buf),
452		encBuf: buf,
453	}
454	server.ServeCodec(srv)
455}
456
457// ServeCodec is like ServeConn but uses the specified codec to
458// decode requests and encode responses.
459func (server *Server) ServeCodec(codec ServerCodec) {
460	sending := new(sync.Mutex)
461	for {
462		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
463		if err != nil {
464			if debugLog && err != io.EOF {
465				log.Println("rpc:", err)
466			}
467			if !keepReading {
468				break
469			}
470			// send a response if we actually managed to read a header.
471			if req != nil {
472				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
473				server.freeRequest(req)
474			}
475			continue
476		}
477		go service.call(server, sending, mtype, req, argv, replyv, codec)
478	}
479	codec.Close()
480}
481
482// ServeRequest is like ServeCodec but synchronously serves a single request.
483// It does not close the codec upon completion.
484func (server *Server) ServeRequest(codec ServerCodec) error {
485	sending := new(sync.Mutex)
486	service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
487	if err != nil {
488		if !keepReading {
489			return err
490		}
491		// send a response if we actually managed to read a header.
492		if req != nil {
493			server.sendResponse(sending, req, invalidRequest, codec, err.Error())
494			server.freeRequest(req)
495		}
496		return err
497	}
498	service.call(server, sending, mtype, req, argv, replyv, codec)
499	return nil
500}
501
502func (server *Server) getRequest() *Request {
503	server.reqLock.Lock()
504	req := server.freeReq
505	if req == nil {
506		req = new(Request)
507	} else {
508		server.freeReq = req.next
509		*req = Request{}
510	}
511	server.reqLock.Unlock()
512	return req
513}
514
515func (server *Server) freeRequest(req *Request) {
516	server.reqLock.Lock()
517	req.next = server.freeReq
518	server.freeReq = req
519	server.reqLock.Unlock()
520}
521
522func (server *Server) getResponse() *Response {
523	server.respLock.Lock()
524	resp := server.freeResp
525	if resp == nil {
526		resp = new(Response)
527	} else {
528		server.freeResp = resp.next
529		*resp = Response{}
530	}
531	server.respLock.Unlock()
532	return resp
533}
534
535func (server *Server) freeResponse(resp *Response) {
536	server.respLock.Lock()
537	resp.next = server.freeResp
538	server.freeResp = resp
539	server.respLock.Unlock()
540}
541
542func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
543	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
544	if err != nil {
545		if !keepReading {
546			return
547		}
548		// discard body
549		codec.ReadRequestBody(nil)
550		return
551	}
552
553	// Decode the argument value.
554	argIsValue := false // if true, need to indirect before calling.
555	if mtype.ArgType.Kind() == reflect.Ptr {
556		argv = reflect.New(mtype.ArgType.Elem())
557	} else {
558		argv = reflect.New(mtype.ArgType)
559		argIsValue = true
560	}
561	// argv guaranteed to be a pointer now.
562	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
563		return
564	}
565	if argIsValue {
566		argv = argv.Elem()
567	}
568
569	replyv = reflect.New(mtype.ReplyType.Elem())
570	return
571}
572
573func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
574	// Grab the request header.
575	req = server.getRequest()
576	err = codec.ReadRequestHeader(req)
577	if err != nil {
578		req = nil
579		if err == io.EOF || err == io.ErrUnexpectedEOF {
580			return
581		}
582		err = errors.New("rpc: server cannot decode request: " + err.Error())
583		return
584	}
585
586	// We read the header successfully.  If we see an error now,
587	// we can still recover and move on to the next request.
588	keepReading = true
589
590	dot := strings.LastIndex(req.ServiceMethod, ".")
591	if dot < 0 {
592		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
593		return
594	}
595	serviceName := req.ServiceMethod[:dot]
596	methodName := req.ServiceMethod[dot+1:]
597
598	// Look up the request.
599	server.mu.RLock()
600	service = server.serviceMap[serviceName]
601	server.mu.RUnlock()
602	if service == nil {
603		err = errors.New("rpc: can't find service " + req.ServiceMethod)
604		return
605	}
606	mtype = service.method[methodName]
607	if mtype == nil {
608		err = errors.New("rpc: can't find method " + req.ServiceMethod)
609	}
610	return
611}
612
613// Accept accepts connections on the listener and serves requests
614// for each incoming connection.  Accept blocks; the caller typically
615// invokes it in a go statement.
616func (server *Server) Accept(lis net.Listener) {
617	for {
618		conn, err := lis.Accept()
619		if err != nil {
620			log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit?
621		}
622		go server.ServeConn(conn)
623	}
624}
625
626// Register publishes the receiver's methods in the DefaultServer.
627func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
628
629// RegisterName is like Register but uses the provided name for the type
630// instead of the receiver's concrete type.
631func RegisterName(name string, rcvr interface{}) error {
632	return DefaultServer.RegisterName(name, rcvr)
633}
634
635// A ServerCodec implements reading of RPC requests and writing of
636// RPC responses for the server side of an RPC session.
637// The server calls ReadRequestHeader and ReadRequestBody in pairs
638// to read requests from the connection, and it calls WriteResponse to
639// write a response back.  The server calls Close when finished with the
640// connection. ReadRequestBody may be called with a nil
641// argument to force the body of the request to be read and discarded.
642type ServerCodec interface {
643	ReadRequestHeader(*Request) error
644	ReadRequestBody(interface{}) error
645	// WriteResponse must be safe for concurrent use by multiple goroutines.
646	WriteResponse(*Response, interface{}) error
647
648	Close() error
649}
650
651// ServeConn runs the DefaultServer on a single connection.
652// ServeConn blocks, serving the connection until the client hangs up.
653// The caller typically invokes ServeConn in a go statement.
654// ServeConn uses the gob wire format (see package gob) on the
655// connection.  To use an alternate codec, use ServeCodec.
656func ServeConn(conn io.ReadWriteCloser) {
657	DefaultServer.ServeConn(conn)
658}
659
660// ServeCodec is like ServeConn but uses the specified codec to
661// decode requests and encode responses.
662func ServeCodec(codec ServerCodec) {
663	DefaultServer.ServeCodec(codec)
664}
665
666// ServeRequest is like ServeCodec but synchronously serves a single request.
667// It does not close the codec upon completion.
668func ServeRequest(codec ServerCodec) error {
669	return DefaultServer.ServeRequest(codec)
670}
671
672// Accept accepts connections on the listener and serves requests
673// to DefaultServer for each incoming connection.
674// Accept blocks; the caller typically invokes it in a go statement.
675func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
676
677// Can connect to RPC service using HTTP CONNECT to rpcPath.
678var connected = "200 Connected to Go RPC"
679
680// ServeHTTP implements an http.Handler that answers RPC requests.
681func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
682	if req.Method != "CONNECT" {
683		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
684		w.WriteHeader(http.StatusMethodNotAllowed)
685		io.WriteString(w, "405 must CONNECT\n")
686		return
687	}
688	conn, _, err := w.(http.Hijacker).Hijack()
689	if err != nil {
690		log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
691		return
692	}
693	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
694	server.ServeConn(conn)
695}
696
697// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
698// and a debugging handler on debugPath.
699// It is still necessary to invoke http.Serve(), typically in a go statement.
700func (server *Server) HandleHTTP(rpcPath, debugPath string) {
701	http.Handle(rpcPath, server)
702	http.Handle(debugPath, debugHTTP{server})
703}
704
705// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
706// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
707// It is still necessary to invoke http.Serve(), typically in a go statement.
708func HandleHTTP() {
709	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
710}
711