1// Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved. 2// Use of this source code is governed by a MIT license found in the LICENSE file. 3 4package codec 5 6import ( 7 "bufio" 8 "errors" 9 "io" 10 "net/rpc" 11) 12 13var errRpcJsonNeedsTermWhitespace = errors.New("rpc requires JsonHandle with TermWhitespace=true") 14 15// Rpc provides a rpc Server or Client Codec for rpc communication. 16type Rpc interface { 17 ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec 18 ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec 19} 20 21// RPCOptions holds options specific to rpc functionality 22type RPCOptions struct { 23 // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls. 24 // 25 // Set RPCNoBuffer=true to turn buffering off. 26 // Buffering can still be done if buffered connections are passed in, or 27 // buffering is configured on the handle. 28 RPCNoBuffer bool 29} 30 31// rpcCodec defines the struct members and common methods. 32type rpcCodec struct { 33 c io.Closer 34 r io.Reader 35 w io.Writer 36 f ioFlusher 37 38 dec *Decoder 39 enc *Encoder 40 // bw *bufio.Writer 41 // br *bufio.Reader 42 h Handle 43 44 cls atomicClsErr 45} 46 47func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec { 48 // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h) 49 return newRPCCodec2(conn, conn, conn, h) 50} 51 52func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec { 53 // defensive: ensure that jsonH has TermWhitespace turned on. 54 if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace { 55 panic(errRpcJsonNeedsTermWhitespace) 56 } 57 // always ensure that we use a flusher, and always flush what was written to the connection. 58 // we lose nothing by using a buffered writer internally. 59 f, ok := w.(ioFlusher) 60 bh := basicHandle(h) 61 if !bh.RPCNoBuffer { 62 if bh.WriterBufferSize <= 0 { 63 if !ok { 64 bw := bufio.NewWriter(w) 65 f, w = bw, bw 66 } 67 } 68 if bh.ReaderBufferSize <= 0 { 69 if _, ok = w.(ioPeeker); !ok { 70 if _, ok = w.(ioBuffered); !ok { 71 br := bufio.NewReader(r) 72 r = br 73 } 74 } 75 } 76 } 77 return rpcCodec{ 78 c: c, 79 w: w, 80 r: r, 81 f: f, 82 h: h, 83 enc: NewEncoder(w, h), 84 dec: NewDecoder(r, h), 85 } 86} 87 88func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) { 89 if c.c != nil { 90 cls := c.cls.load() 91 if cls.closed { 92 return cls.errClosed 93 } 94 } 95 err = c.enc.Encode(obj1) 96 if err == nil { 97 if writeObj2 { 98 err = c.enc.Encode(obj2) 99 } 100 // if err == nil && c.f != nil { 101 // err = c.f.Flush() 102 // } 103 } 104 if c.f != nil { 105 if err == nil { 106 err = c.f.Flush() 107 } else { 108 _ = c.f.Flush() // swallow flush error, so we maintain prior error on write 109 } 110 } 111 return 112} 113 114func (c *rpcCodec) swallow(err *error) { 115 defer panicToErr(c.dec, err) 116 c.dec.swallow() 117} 118 119func (c *rpcCodec) read(obj interface{}) (err error) { 120 if c.c != nil { 121 cls := c.cls.load() 122 if cls.closed { 123 return cls.errClosed 124 } 125 } 126 //If nil is passed in, we should read and discard 127 if obj == nil { 128 // var obj2 interface{} 129 // return c.dec.Decode(&obj2) 130 c.swallow(&err) 131 return 132 } 133 return c.dec.Decode(obj) 134} 135 136func (c *rpcCodec) Close() error { 137 if c.c == nil { 138 return nil 139 } 140 cls := c.cls.load() 141 if cls.closed { 142 return cls.errClosed 143 } 144 cls.errClosed = c.c.Close() 145 cls.closed = true 146 c.cls.store(cls) 147 return cls.errClosed 148} 149 150func (c *rpcCodec) ReadResponseBody(body interface{}) error { 151 return c.read(body) 152} 153 154// ------------------------------------- 155 156type goRpcCodec struct { 157 rpcCodec 158} 159 160func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error { 161 return c.write(r, body, true) 162} 163 164func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error { 165 return c.write(r, body, true) 166} 167 168func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error { 169 return c.read(r) 170} 171 172func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error { 173 return c.read(r) 174} 175 176func (c *goRpcCodec) ReadRequestBody(body interface{}) error { 177 return c.read(body) 178} 179 180// ------------------------------------- 181 182// goRpc is the implementation of Rpc that uses the communication protocol 183// as defined in net/rpc package. 184type goRpc struct{} 185 186// GoRpc implements Rpc using the communication protocol defined in net/rpc package. 187// 188// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered. 189// 190// For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle. 191// This ensures we use an adequate buffer during reading and writing. 192// If not configured, we will internally initialize and use a buffer during reads and writes. 193// This can be turned off via the RPCNoBuffer option on the Handle. 194// var handle codec.JsonHandle 195// handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer 196// 197// Example 1: one way of configuring buffering explicitly: 198// var handle codec.JsonHandle // codec handle 199// handle.ReaderBufferSize = 1024 200// handle.WriterBufferSize = 1024 201// var conn io.ReadWriteCloser // connection got from a socket 202// var serverCodec = GoRpc.ServerCodec(conn, handle) 203// var clientCodec = GoRpc.ClientCodec(conn, handle) 204// 205// Example 2: you can also explicitly create a buffered connection yourself, 206// and not worry about configuring the buffer sizes in the Handle. 207// var handle codec.Handle // codec handle 208// var conn io.ReadWriteCloser // connection got from a socket 209// var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser 210// io.Closer 211// *bufio.Reader 212// *bufio.Writer 213// }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} 214// var serverCodec = GoRpc.ServerCodec(bufconn, handle) 215// var clientCodec = GoRpc.ClientCodec(bufconn, handle) 216// 217var GoRpc goRpc 218 219func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec { 220 return &goRpcCodec{newRPCCodec(conn, h)} 221} 222 223func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec { 224 return &goRpcCodec{newRPCCodec(conn, h)} 225} 226