• Home
  • History
  • Annotate
Name Date Size #Lines LOC

..03-May-2022-

README.mdH A D14-Jul-20214.7 KiB188137

doc.goH A D14-Jul-2021613 141

inspectmu.goH A D14-Jul-2021857 3620

pktbuf.goH A D14-Jul-2021927 6649

state.dotH A D14-Jul-20211.1 KiB2218

stream.goH A D14-Jul-202113.2 KiB507318

stream_test.goH A D14-Jul-20213.7 KiB172135

README.md

1# package drpcstream
2
3`import "storj.io/drpc/drpcstream"`
4
5Package drpcstream sends protobufs using the dprc wire protocol.
6
7![Stream state machine diagram](./state.png)
8
9## Usage
10
11#### type Options
12
13```go
14type Options struct {
15	// SplitSize controls the default size we split packets into frames.
16	SplitSize int
17
18	// ManualFlush controls if the stream will automatically flush after every
19	// message send. Note that flushing is not part of the drpc.Stream
20	// interface, so if you use this you must be ready to type assert and
21	// call RawFlush dynamically.
22	ManualFlush bool
23
24	// Internal contains options that are for internal use only.
25	Internal drpcopts.Stream
26}
27```
28
29Options controls configuration settings for a stream.
30
31#### type Stream
32
33```go
34type Stream struct {
35}
36```
37
38Stream represents an rpc actively happening on a transport.
39
40#### func  New
41
42```go
43func New(ctx context.Context, sid uint64, wr *drpcwire.Writer) *Stream
44```
45New returns a new stream bound to the context with the given stream id and will
46use the writer to write messages on. It is important use monotonically
47increasing stream ids within a single transport.
48
49#### func  NewWithOptions
50
51```go
52func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts Options) *Stream
53```
54NewWithOptions returns a new stream bound to the context with the given stream
55id and will use the writer to write messages on. It is important use
56monotonically increasing stream ids within a single transport. The options are
57used to control details of how the Stream operates.
58
59#### func (*Stream) Cancel
60
61```go
62func (s *Stream) Cancel(err error)
63```
64Cancel transitions the stream into a state where all writes to the transport
65will return the provided error, and terminates the stream. It is a no-op if the
66stream is already terminated.
67
68#### func (*Stream) Close
69
70```go
71func (s *Stream) Close() (err error)
72```
73Close terminates the stream and sends that the stream has been closed to the
74remote. It is a no-op if the stream is already terminated.
75
76#### func (*Stream) CloseSend
77
78```go
79func (s *Stream) CloseSend() (err error)
80```
81CloseSend informs the remote that no more messages will be sent. If the remote
82has also already issued a CloseSend, the stream is terminated. It is a no-op if
83the stream already has sent a CloseSend or if it is terminated.
84
85#### func (*Stream) Context
86
87```go
88func (s *Stream) Context() context.Context
89```
90Context returns the context associated with the stream. It is closed when the
91Stream will no longer issue any writes or reads.
92
93#### func (*Stream) Finished
94
95```go
96func (s *Stream) Finished() <-chan struct{}
97```
98Finished returns a channel that is closed when the stream is fully finished and
99will no longer issue any writes or reads.
100
101#### func (*Stream) HandlePacket
102
103```go
104func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error)
105```
106HandlePacket advances the stream state machine by inspecting the packet. It
107returns any major errors that should terminate the transport the stream is
108operating on as well as a boolean indicating if the stream expects more packets.
109
110#### func (*Stream) ID
111
112```go
113func (s *Stream) ID() uint64
114```
115ID returns the stream id.
116
117#### func (*Stream) IsFinished
118
119```go
120func (s *Stream) IsFinished() bool
121```
122IsFinished returns true if the stream is fully finished and will no longer issue
123any writes or reads.
124
125#### func (*Stream) IsTerminated
126
127```go
128func (s *Stream) IsTerminated() bool
129```
130IsTerminated returns true if the stream has been terminated.
131
132#### func (*Stream) MsgRecv
133
134```go
135func (s *Stream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error)
136```
137MsgRecv recives some message data and unmarshals it with enc into msg.
138
139#### func (*Stream) MsgSend
140
141```go
142func (s *Stream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error)
143```
144MsgSend marshals the message with the encoding, writes it, and flushes.
145
146#### func (*Stream) RawFlush
147
148```go
149func (s *Stream) RawFlush() (err error)
150```
151RawFlush flushes any buffers of data.
152
153#### func (*Stream) RawRecv
154
155```go
156func (s *Stream) RawRecv() (data []byte, err error)
157```
158RawRecv returns the raw bytes received for a message.
159
160#### func (*Stream) RawWrite
161
162```go
163func (s *Stream) RawWrite(kind drpcwire.Kind, data []byte) (err error)
164```
165RawWrite sends the data bytes with the given kind.
166
167#### func (*Stream) SendError
168
169```go
170func (s *Stream) SendError(serr error) (err error)
171```
172SendError terminates the stream and sends the error to the remote. It is a no-op
173if the stream is already terminated.
174
175#### func (*Stream) String
176
177```go
178func (s *Stream) String() string
179```
180String returns a string representation of the stream.
181
182#### func (*Stream) Terminated
183
184```go
185func (s *Stream) Terminated() <-chan struct{}
186```
187Terminated returns a channel that is closed when the stream has been terminated.
188