1package rpc 2 3import ( 4 "sync" 5 6 "golang.org/x/net/context" 7) 8 9type call struct { 10 ctx context.Context 11 12 resultCh chan *rpcResponseMessage 13 14 method string 15 seqid SeqNumber 16 arg interface{} 17 res interface{} 18 ctype CompressionType 19 errorUnwrapper ErrorUnwrapper 20 instrumenter *NetworkInstrumenter 21} 22 23type callContainer struct { 24 callsMtx sync.RWMutex 25 calls map[SeqNumber]*call 26 seqMtx sync.Mutex 27 seqid SeqNumber 28} 29 30func newCallContainer() *callContainer { 31 return &callContainer{ 32 calls: make(map[SeqNumber]*call), 33 seqid: 0, 34 } 35} 36 37func (cc *callContainer) NewCall(ctx context.Context, m string, arg interface{}, res interface{}, 38 ctype CompressionType, u ErrorUnwrapper, instrumenter *NetworkInstrumenter) *call { 39 // Buffer one response to take into account that a call stops 40 // waiting for its result when its canceled. (See 41 // https://github.com/keybase/go-framed-msgpack-rpc/issues/62 42 // .) 43 return &call{ 44 ctx: ctx, 45 resultCh: make(chan *rpcResponseMessage, 1), 46 method: m, 47 arg: arg, 48 res: res, 49 ctype: ctype, 50 errorUnwrapper: u, 51 seqid: cc.nextSeqid(), 52 instrumenter: instrumenter, 53 } 54} 55 56func (cc *callContainer) nextSeqid() SeqNumber { 57 cc.seqMtx.Lock() 58 defer cc.seqMtx.Unlock() 59 60 ret := cc.seqid 61 cc.seqid++ 62 return ret 63} 64 65func (cc *callContainer) AddCall(c *call) { 66 cc.callsMtx.Lock() 67 defer cc.callsMtx.Unlock() 68 69 cc.calls[c.seqid] = c 70} 71 72func (cc *callContainer) RetrieveCall(seqid SeqNumber) *call { 73 cc.callsMtx.RLock() 74 defer cc.callsMtx.RUnlock() 75 76 return cc.calls[seqid] 77} 78 79func (cc *callContainer) RemoveCall(seqid SeqNumber) { 80 cc.callsMtx.Lock() 81 defer cc.callsMtx.Unlock() 82 83 delete(cc.calls, seqid) 84} 85