1 /*===========================================================================
2 *
3 *                            PUBLIC DOMAIN NOTICE
4 *               National Center for Biotechnology Information
5 *
6 *  This software/database is a "United States Government Work" under the
7 *  terms of the United States Copyright Act.  It was written as part of
8 *  the author's official duties as a United States Government employee and
9 *  thus cannot be copyrighted.  This software/database is freely available
10 *  to the public for use. The National Library of Medicine and the U.S.
11 *  Government have not placed any restriction on its use or reproduction.
12 *
13 *  Although all reasonable efforts have been taken to ensure the accuracy
14 *  and reliability of the software and data, the NLM and the U.S.
15 *  Government do not and cannot warrant the performance or results that
16 *  may be obtained by using this software or data. The NLM and the U.S.
17 *  Government disclaim all warranties, express or implied, including
18 *  warranties of performance, merchantability or fitness for any particular
19 *  purpose.
20 *
21 *  Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 
27 #include <kfc/stream.hpp>
28 #include <kfc/callstk.hpp>
29 #include <kfc/except.hpp>
30 #include <kfc/caps.hpp>
31 #include <kfc/rsrc.hpp>
32 
33 namespace vdb3
34 {
35 
36     /*------------------------------------------------------------------
37      * StreamItf
38      *  an interface representing a stream of bytes ( bits? )
39      */
40 
read(const bytes_t & num_bytes,Mem & dst,const bytes_t & start)41     bytes_t StreamItf :: read ( const bytes_t & num_bytes,
42         Mem & dst, const bytes_t & start )
43     {
44         // should never have had the capabilities to issue message
45         // so this is an unsupported capability
46         // that should not have been granted
47         CONST_THROW ( xc_caps_over_extended_err, "unsupported read message" );
48     }
49 
write(const bytes_t & num_bytes,const Mem & src,const bytes_t & start)50     bytes_t StreamItf :: write ( const bytes_t & num_bytes,
51         const Mem & src, const bytes_t & start )
52     {
53         // should never have had the capabilities to issue message
54         // so this is an unsupported capability
55         // that should not have been granted
56         CONST_THROW ( xc_caps_over_extended_err, "unsupported write message" );
57     }
58 
make_ref(Refcount * obj,caps_t caps)59     Stream StreamItf :: make_ref ( Refcount * obj, caps_t caps )
60     {
61         return Stream ( obj, this, caps );
62     }
63 
cast(Refcount * obj)64     void * StreamItf :: cast ( Refcount * obj )
65     {
66         return ( void * ) dynamic_cast < StreamItf * > ( obj );
67     }
68 
69 
70     /*------------------------------------------------------------------
71      * Stream
72      *  an object representing a stream of bytes ( bits? )
73      */
74 
75     const U64 dflt_mtu = 128 * 1024;
76 
77     // copy from source
78     // return the number of bytes actually copied
copy(const Stream & src) const79     bytes_t Stream :: copy ( const Stream & src ) const
80     {
81         FUNC_ENTRY ();
82 
83         // a null ref should act like nothing was there
84         // the behavior of /dev/null is a different concept
85         if ( null_ref () )
86             return bytes_t ( 0 );
87 
88         // access stream
89         StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE );
90 
91         // allocate buffer
92         bytes_t mtu = itf -> get_mtu ();
93         Mem buffer = rsrc -> mmgr . alloc ( mtu, false );
94 
95         // read from source
96         bytes_t num_read = src . read ( mtu, buffer, 0 );
97         if ( num_read == ( U64 ) 0 )
98             return num_read;
99 
100         // write everything read
101         return write_all ( num_read, buffer, 0 );
102 
103     }
104 
copy(const bytes_t & num_bytes,const Stream & src) const105     bytes_t Stream :: copy ( const bytes_t & num_bytes, const Stream & src ) const
106     {
107         FUNC_ENTRY ();
108 
109         // a null ref should act like nothing was there
110         // the behavior of /dev/null is a different concept
111         if ( null_ref () )
112             return bytes_t ( 0 );
113 
114         // access stream
115         StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE );
116 
117         // allocate buffer
118         bytes_t mtu = itf -> get_mtu ();
119         if ( num_bytes < mtu )
120             mtu = num_bytes;
121         Mem buffer = rsrc -> mmgr . alloc ( mtu, false );
122 
123         // read from source
124         bytes_t num_read = src . read ( mtu, buffer, 0 );
125         if ( num_read == ( U64 ) 0 )
126             return num_read;
127 
128         // write everything read
129         return write_all ( num_read, buffer, 0 );
130     }
131 
copy_all(const Stream & src) const132     bytes_t Stream :: copy_all ( const Stream & src ) const
133     {
134         FUNC_ENTRY ();
135 
136         // a null ref should act like nothing was there
137         // the behavior of /dev/null is a different concept
138         if ( null_ref () )
139             return bytes_t ( 0 );
140 
141         // access stream
142         StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE );
143 
144         // allocate buffer
145         bytes_t mtu = itf -> get_mtu ();
146         Mem buffer = rsrc -> mmgr . alloc ( mtu, false );
147 
148         // read from source
149         bytes_t num_read = src . read ( mtu, buffer, 0 );
150         if ( num_read == ( U64 ) 0 )
151             return num_read;
152 
153         // write everything read
154         bytes_t total = write_all ( num_read, buffer, 0 );
155         assert ( total == num_read );
156 
157         // enter loop to copy until end of stream
158         while ( 1 )
159         {
160             num_read = src . read ( mtu, buffer, 0 );
161             if ( num_read == ( U64 ) 0 )
162                 break;
163 
164             bytes_t num_writ = write_all ( num_read, buffer, 0 );
165             assert ( num_writ == num_read );
166             total += num_writ;
167         }
168 
169         return total;
170     }
171 
copy_all(const bytes_t & num_bytes,const Stream & src) const172     bytes_t Stream :: copy_all ( const bytes_t & num_bytes, const Stream & src ) const
173     {
174         FUNC_ENTRY ();
175 
176         // a null ref should act like nothing was there
177         // the behavior of /dev/null is a different concept
178         if ( null_ref () )
179             return bytes_t ( 0 );
180 
181         // access stream
182         StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE );
183 
184         // allocate buffer
185         bytes_t mtu = itf -> get_mtu ();
186         if ( num_bytes < mtu )
187             mtu = num_bytes;
188         Mem buffer = rsrc -> mmgr . alloc ( mtu, false );
189 
190         // read from source
191         bytes_t num_read = src . read ( mtu, buffer, 0 );
192         if ( num_read == ( U64 ) 0 )
193             return num_read;
194 
195         // write everything read
196         bytes_t total = write_all ( num_read, buffer, 0 );
197         assert ( total == num_read );
198 
199         // enter loop to copy until end of stream
200         while ( 1 )
201         {
202             // TBD - must read with non-blocking timeout
203             num_read = src . read ( mtu, buffer, 0 );
204             if ( num_read == ( U64 ) 0 )
205                 break;
206 
207             bytes_t num_writ = write_all ( num_read, buffer, 0 );
208             assert ( num_writ == num_read );
209             total += num_writ;
210         }
211 
212         return total;
213     }
214 
read(Mem & dst,index_t start) const215     bytes_t Stream :: read ( Mem & dst, index_t start ) const
216     {
217         FUNC_ENTRY ();
218 
219         if ( start < 0 )
220             THROW ( xc_param_err, "bad start index: %ld", start );
221 
222         bytes_t size = dst . size ();
223         if ( ( U64 ) start >= size )
224             return bytes_t ( 0 );
225 
226         bytes_t to_read = size - ( U64 ) start;
227 
228         if ( null_ref () )
229             return bytes_t ( 0 );
230 
231         StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_READ );
232 
233         bytes_t mtu = itf -> get_mtu ();
234         if ( mtu < to_read )
235             to_read = mtu;
236 
237         return itf -> read ( to_read, dst, start );
238     }
239 
read(const bytes_t & num_bytes,Mem & dst,index_t start) const240     bytes_t Stream :: read ( const bytes_t & num_bytes, Mem & dst, index_t start ) const
241     {
242         FUNC_ENTRY ();
243 
244         if ( start < 0 )
245             THROW ( xc_param_err, "bad start index: %ld", start );
246 
247         bytes_t size = dst . size ();
248         if ( ( U64 ) start >= size )
249             return bytes_t ( 0 );
250 
251         bytes_t to_read = size - ( U64 ) start;
252         if ( to_read > num_bytes )
253             to_read = num_bytes;
254 
255         if ( null_ref () )
256             return bytes_t ( 0 );
257 
258         StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_READ );
259 
260         bytes_t mtu = itf -> get_mtu ();
261         if ( mtu < to_read )
262             to_read = mtu;
263 
264         return itf -> read ( to_read, dst, start );
265     }
266 
write(const Mem & src,index_t start) const267     bytes_t Stream :: write ( const Mem & src, index_t start ) const
268     {
269         FUNC_ENTRY ();
270 
271         if ( start < 0 )
272             THROW ( xc_param_err, "bad start index: %ld", start );
273 
274         bytes_t size = src . size ();
275         if ( ( U64 ) start >= size )
276             return bytes_t ( 0 );
277 
278         bytes_t to_write = size - ( U64 ) start;
279 
280         if ( null_ref () )
281             return bytes_t ( 0 );
282 
283         StreamItf * itf = get_itf ( CAP_WRITE );
284 
285         bytes_t mtu = itf -> get_mtu ();
286         if ( mtu < to_write )
287             to_write = mtu;
288 
289         return itf -> write ( to_write, src, start );
290     }
291 
write_all(const Mem & src,index_t start) const292     bytes_t Stream :: write_all ( const Mem & src, index_t start ) const
293     {
294         FUNC_ENTRY ();
295 
296         if ( start < 0 )
297             THROW ( xc_param_err, "bad start index: %ld", start );
298 
299         bytes_t size = src . size ();
300         if ( ( U64 ) start >= size )
301             return bytes_t ( 0 );
302 
303         bytes_t all_bytes = size - ( U64 ) start;
304 
305         if ( null_ref () )
306             THROW ( xc_null_self_err, "wrote 0 of %lu bytes", ( U64 ) all_bytes );
307 
308         StreamItf * itf = get_itf ( CAP_WRITE );
309 
310         bytes_t mtu = itf -> get_mtu ();
311         bytes_t to_write = ( mtu < all_bytes ) ? mtu : all_bytes;
312 
313         bytes_t total = itf -> write ( to_write, src, start );
314         while ( total < all_bytes )
315         {
316             to_write = all_bytes - total;
317             if ( mtu < to_write )
318                 to_write = mtu;
319             bytes_t num_writ = itf -> write ( to_write, src, start + ( U64 ) total );
320             if ( num_writ == ( U64 ) 0 )
321                 THROW ( xc_transfer_incomplete_err, "wrote %lu of %lu bytes", ( U64 ) total, ( U64 ) all_bytes );
322             total += num_writ;
323         }
324 
325         return total;
326     }
327 
write_all(const bytes_t & num_bytes,const Mem & src,index_t start) const328     bytes_t Stream :: write_all ( const bytes_t & num_bytes, const Mem & src, index_t start ) const
329     {
330         FUNC_ENTRY ();
331 
332         if ( start < 0 )
333             THROW ( xc_param_err, "bad start index: %ld", start );
334 
335         bytes_t size = src . size ();
336         if ( ( U64 ) start >= size )
337             return bytes_t ( 0 );
338 
339         bytes_t all_bytes = size - ( U64 ) start;
340         if ( all_bytes > num_bytes )
341             all_bytes = num_bytes;
342 
343         if ( null_ref () )
344             THROW ( xc_null_self_err, "wrote 0 of %lu bytes", ( U64 ) all_bytes );
345 
346         StreamItf * itf = get_itf ( CAP_WRITE );
347 
348         bytes_t mtu = itf -> get_mtu ();
349         bytes_t to_write = ( mtu < all_bytes ) ? mtu : all_bytes;
350 
351         bytes_t total = itf -> write ( to_write, src, start );
352         while ( total < all_bytes )
353         {
354             to_write = all_bytes - total;
355             if ( mtu < to_write )
356                 to_write = mtu;
357             bytes_t num_writ = itf -> write ( to_write, src, start + ( U64 ) total );
358             if ( num_writ == ( U64 ) 0 )
359                 THROW ( xc_transfer_incomplete_err, "wrote %lu of %lu bytes", ( U64 ) total, ( U64 ) all_bytes );
360             total += num_writ;
361         }
362 
363         return total;
364     }
365 
Stream()366     Stream :: Stream ()
367     {
368     }
369 
Stream(const Stream & r)370     Stream :: Stream ( const Stream & r )
371         : Ref < StreamItf > ( r )
372     {
373     }
374 
operator =(const Stream & r)375     void Stream :: operator = ( const Stream & r )
376     {
377         Ref < StreamItf > :: operator = ( r );
378     }
379 
Stream(const Stream & r,caps_t reduce)380     Stream :: Stream ( const Stream & r, caps_t reduce )
381         : Ref < StreamItf > ( r, reduce )
382     {
383     }
384 
Stream(const OpaqueRef & r)385     Stream :: Stream ( const OpaqueRef & r )
386         : Ref < StreamItf > ( r, StreamItf :: cast )
387     {
388     }
389 
Stream(Refcount * obj,StreamItf * itf,caps_t caps)390     Stream :: Stream ( Refcount * obj, StreamItf * itf, caps_t caps )
391         : Ref < StreamItf > ( obj, itf, caps )
392     {
393     }
394 
395 }
396