1package rpc
2
3import (
4	"sync"
5
6	"golang.org/x/net/context"
7)
8
9type call struct {
10	ctx context.Context
11
12	resultCh chan *rpcResponseMessage
13
14	method         string
15	seqid          SeqNumber
16	arg            interface{}
17	res            interface{}
18	ctype          CompressionType
19	errorUnwrapper ErrorUnwrapper
20	instrumenter   *NetworkInstrumenter
21}
22
23type callContainer struct {
24	callsMtx sync.RWMutex
25	calls    map[SeqNumber]*call
26	seqMtx   sync.Mutex
27	seqid    SeqNumber
28}
29
30func newCallContainer() *callContainer {
31	return &callContainer{
32		calls: make(map[SeqNumber]*call),
33		seqid: 0,
34	}
35}
36
37func (cc *callContainer) NewCall(ctx context.Context, m string, arg interface{}, res interface{},
38	ctype CompressionType, u ErrorUnwrapper, instrumenter *NetworkInstrumenter) *call {
39	// Buffer one response to take into account that a call stops
40	// waiting for its result when its canceled. (See
41	// https://github.com/keybase/go-framed-msgpack-rpc/issues/62
42	// .)
43	return &call{
44		ctx:            ctx,
45		resultCh:       make(chan *rpcResponseMessage, 1),
46		method:         m,
47		arg:            arg,
48		res:            res,
49		ctype:          ctype,
50		errorUnwrapper: u,
51		seqid:          cc.nextSeqid(),
52		instrumenter:   instrumenter,
53	}
54}
55
56func (cc *callContainer) nextSeqid() SeqNumber {
57	cc.seqMtx.Lock()
58	defer cc.seqMtx.Unlock()
59
60	ret := cc.seqid
61	cc.seqid++
62	return ret
63}
64
65func (cc *callContainer) AddCall(c *call) {
66	cc.callsMtx.Lock()
67	defer cc.callsMtx.Unlock()
68
69	cc.calls[c.seqid] = c
70}
71
72func (cc *callContainer) RetrieveCall(seqid SeqNumber) *call {
73	cc.callsMtx.RLock()
74	defer cc.callsMtx.RUnlock()
75
76	return cc.calls[seqid]
77}
78
79func (cc *callContainer) RemoveCall(seqid SeqNumber) {
80	cc.callsMtx.Lock()
81	defer cc.callsMtx.Unlock()
82
83	delete(cc.calls, seqid)
84}
85