1 /*=========================================================================== 2 * 3 * PUBLIC DOMAIN NOTICE 4 * National Center for Biotechnology Information 5 * 6 * This software/database is a "United States Government Work" under the 7 * terms of the United States Copyright Act. It was written as part of 8 * the author's official duties as a United States Government employee and 9 * thus cannot be copyrighted. This software/database is freely available 10 * to the public for use. The National Library of Medicine and the U.S. 11 * Government have not placed any restriction on its use or reproduction. 12 * 13 * Although all reasonable efforts have been taken to ensure the accuracy 14 * and reliability of the software and data, the NLM and the U.S. 15 * Government do not and cannot warrant the performance or results that 16 * may be obtained by using this software or data. The NLM and the U.S. 17 * Government disclaim all warranties, express or implied, including 18 * warranties of performance, merchantability or fitness for any particular 19 * purpose. 20 * 21 * Please cite the author in any work or product based on this material. 22 * 23 * =========================================================================== 24 * 25 */ 26 27 #include <kfc/stream.hpp> 28 #include <kfc/callstk.hpp> 29 #include <kfc/except.hpp> 30 #include <kfc/caps.hpp> 31 #include <kfc/rsrc.hpp> 32 33 namespace vdb3 34 { 35 36 /*------------------------------------------------------------------ 37 * StreamItf 38 * an interface representing a stream of bytes ( bits? ) 39 */ 40 read(const bytes_t & num_bytes,Mem & dst,const bytes_t & start)41 bytes_t StreamItf :: read ( const bytes_t & num_bytes, 42 Mem & dst, const bytes_t & start ) 43 { 44 // should never have had the capabilities to issue message 45 // so this is an unsupported capability 46 // that should not have been granted 47 CONST_THROW ( xc_caps_over_extended_err, "unsupported read message" ); 48 } 49 write(const bytes_t & num_bytes,const Mem & src,const bytes_t & start)50 bytes_t StreamItf :: write ( const bytes_t & num_bytes, 51 const Mem & src, const bytes_t & start ) 52 { 53 // should never have had the capabilities to issue message 54 // so this is an unsupported capability 55 // that should not have been granted 56 CONST_THROW ( xc_caps_over_extended_err, "unsupported write message" ); 57 } 58 make_ref(Refcount * obj,caps_t caps)59 Stream StreamItf :: make_ref ( Refcount * obj, caps_t caps ) 60 { 61 return Stream ( obj, this, caps ); 62 } 63 cast(Refcount * obj)64 void * StreamItf :: cast ( Refcount * obj ) 65 { 66 return ( void * ) dynamic_cast < StreamItf * > ( obj ); 67 } 68 69 70 /*------------------------------------------------------------------ 71 * Stream 72 * an object representing a stream of bytes ( bits? ) 73 */ 74 75 const U64 dflt_mtu = 128 * 1024; 76 77 // copy from source 78 // return the number of bytes actually copied copy(const Stream & src) const79 bytes_t Stream :: copy ( const Stream & src ) const 80 { 81 FUNC_ENTRY (); 82 83 // a null ref should act like nothing was there 84 // the behavior of /dev/null is a different concept 85 if ( null_ref () ) 86 return bytes_t ( 0 ); 87 88 // access stream 89 StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE ); 90 91 // allocate buffer 92 bytes_t mtu = itf -> get_mtu (); 93 Mem buffer = rsrc -> mmgr . alloc ( mtu, false ); 94 95 // read from source 96 bytes_t num_read = src . read ( mtu, buffer, 0 ); 97 if ( num_read == ( U64 ) 0 ) 98 return num_read; 99 100 // write everything read 101 return write_all ( num_read, buffer, 0 ); 102 103 } 104 copy(const bytes_t & num_bytes,const Stream & src) const105 bytes_t Stream :: copy ( const bytes_t & num_bytes, const Stream & src ) const 106 { 107 FUNC_ENTRY (); 108 109 // a null ref should act like nothing was there 110 // the behavior of /dev/null is a different concept 111 if ( null_ref () ) 112 return bytes_t ( 0 ); 113 114 // access stream 115 StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE ); 116 117 // allocate buffer 118 bytes_t mtu = itf -> get_mtu (); 119 if ( num_bytes < mtu ) 120 mtu = num_bytes; 121 Mem buffer = rsrc -> mmgr . alloc ( mtu, false ); 122 123 // read from source 124 bytes_t num_read = src . read ( mtu, buffer, 0 ); 125 if ( num_read == ( U64 ) 0 ) 126 return num_read; 127 128 // write everything read 129 return write_all ( num_read, buffer, 0 ); 130 } 131 copy_all(const Stream & src) const132 bytes_t Stream :: copy_all ( const Stream & src ) const 133 { 134 FUNC_ENTRY (); 135 136 // a null ref should act like nothing was there 137 // the behavior of /dev/null is a different concept 138 if ( null_ref () ) 139 return bytes_t ( 0 ); 140 141 // access stream 142 StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE ); 143 144 // allocate buffer 145 bytes_t mtu = itf -> get_mtu (); 146 Mem buffer = rsrc -> mmgr . alloc ( mtu, false ); 147 148 // read from source 149 bytes_t num_read = src . read ( mtu, buffer, 0 ); 150 if ( num_read == ( U64 ) 0 ) 151 return num_read; 152 153 // write everything read 154 bytes_t total = write_all ( num_read, buffer, 0 ); 155 assert ( total == num_read ); 156 157 // enter loop to copy until end of stream 158 while ( 1 ) 159 { 160 num_read = src . read ( mtu, buffer, 0 ); 161 if ( num_read == ( U64 ) 0 ) 162 break; 163 164 bytes_t num_writ = write_all ( num_read, buffer, 0 ); 165 assert ( num_writ == num_read ); 166 total += num_writ; 167 } 168 169 return total; 170 } 171 copy_all(const bytes_t & num_bytes,const Stream & src) const172 bytes_t Stream :: copy_all ( const bytes_t & num_bytes, const Stream & src ) const 173 { 174 FUNC_ENTRY (); 175 176 // a null ref should act like nothing was there 177 // the behavior of /dev/null is a different concept 178 if ( null_ref () ) 179 return bytes_t ( 0 ); 180 181 // access stream 182 StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_WRITE ); 183 184 // allocate buffer 185 bytes_t mtu = itf -> get_mtu (); 186 if ( num_bytes < mtu ) 187 mtu = num_bytes; 188 Mem buffer = rsrc -> mmgr . alloc ( mtu, false ); 189 190 // read from source 191 bytes_t num_read = src . read ( mtu, buffer, 0 ); 192 if ( num_read == ( U64 ) 0 ) 193 return num_read; 194 195 // write everything read 196 bytes_t total = write_all ( num_read, buffer, 0 ); 197 assert ( total == num_read ); 198 199 // enter loop to copy until end of stream 200 while ( 1 ) 201 { 202 // TBD - must read with non-blocking timeout 203 num_read = src . read ( mtu, buffer, 0 ); 204 if ( num_read == ( U64 ) 0 ) 205 break; 206 207 bytes_t num_writ = write_all ( num_read, buffer, 0 ); 208 assert ( num_writ == num_read ); 209 total += num_writ; 210 } 211 212 return total; 213 } 214 read(Mem & dst,index_t start) const215 bytes_t Stream :: read ( Mem & dst, index_t start ) const 216 { 217 FUNC_ENTRY (); 218 219 if ( start < 0 ) 220 THROW ( xc_param_err, "bad start index: %ld", start ); 221 222 bytes_t size = dst . size (); 223 if ( ( U64 ) start >= size ) 224 return bytes_t ( 0 ); 225 226 bytes_t to_read = size - ( U64 ) start; 227 228 if ( null_ref () ) 229 return bytes_t ( 0 ); 230 231 StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_READ ); 232 233 bytes_t mtu = itf -> get_mtu (); 234 if ( mtu < to_read ) 235 to_read = mtu; 236 237 return itf -> read ( to_read, dst, start ); 238 } 239 read(const bytes_t & num_bytes,Mem & dst,index_t start) const240 bytes_t Stream :: read ( const bytes_t & num_bytes, Mem & dst, index_t start ) const 241 { 242 FUNC_ENTRY (); 243 244 if ( start < 0 ) 245 THROW ( xc_param_err, "bad start index: %ld", start ); 246 247 bytes_t size = dst . size (); 248 if ( ( U64 ) start >= size ) 249 return bytes_t ( 0 ); 250 251 bytes_t to_read = size - ( U64 ) start; 252 if ( to_read > num_bytes ) 253 to_read = num_bytes; 254 255 if ( null_ref () ) 256 return bytes_t ( 0 ); 257 258 StreamItf * itf = get_itf ( CAP_PROP_READ | CAP_READ ); 259 260 bytes_t mtu = itf -> get_mtu (); 261 if ( mtu < to_read ) 262 to_read = mtu; 263 264 return itf -> read ( to_read, dst, start ); 265 } 266 write(const Mem & src,index_t start) const267 bytes_t Stream :: write ( const Mem & src, index_t start ) const 268 { 269 FUNC_ENTRY (); 270 271 if ( start < 0 ) 272 THROW ( xc_param_err, "bad start index: %ld", start ); 273 274 bytes_t size = src . size (); 275 if ( ( U64 ) start >= size ) 276 return bytes_t ( 0 ); 277 278 bytes_t to_write = size - ( U64 ) start; 279 280 if ( null_ref () ) 281 return bytes_t ( 0 ); 282 283 StreamItf * itf = get_itf ( CAP_WRITE ); 284 285 bytes_t mtu = itf -> get_mtu (); 286 if ( mtu < to_write ) 287 to_write = mtu; 288 289 return itf -> write ( to_write, src, start ); 290 } 291 write_all(const Mem & src,index_t start) const292 bytes_t Stream :: write_all ( const Mem & src, index_t start ) const 293 { 294 FUNC_ENTRY (); 295 296 if ( start < 0 ) 297 THROW ( xc_param_err, "bad start index: %ld", start ); 298 299 bytes_t size = src . size (); 300 if ( ( U64 ) start >= size ) 301 return bytes_t ( 0 ); 302 303 bytes_t all_bytes = size - ( U64 ) start; 304 305 if ( null_ref () ) 306 THROW ( xc_null_self_err, "wrote 0 of %lu bytes", ( U64 ) all_bytes ); 307 308 StreamItf * itf = get_itf ( CAP_WRITE ); 309 310 bytes_t mtu = itf -> get_mtu (); 311 bytes_t to_write = ( mtu < all_bytes ) ? mtu : all_bytes; 312 313 bytes_t total = itf -> write ( to_write, src, start ); 314 while ( total < all_bytes ) 315 { 316 to_write = all_bytes - total; 317 if ( mtu < to_write ) 318 to_write = mtu; 319 bytes_t num_writ = itf -> write ( to_write, src, start + ( U64 ) total ); 320 if ( num_writ == ( U64 ) 0 ) 321 THROW ( xc_transfer_incomplete_err, "wrote %lu of %lu bytes", ( U64 ) total, ( U64 ) all_bytes ); 322 total += num_writ; 323 } 324 325 return total; 326 } 327 write_all(const bytes_t & num_bytes,const Mem & src,index_t start) const328 bytes_t Stream :: write_all ( const bytes_t & num_bytes, const Mem & src, index_t start ) const 329 { 330 FUNC_ENTRY (); 331 332 if ( start < 0 ) 333 THROW ( xc_param_err, "bad start index: %ld", start ); 334 335 bytes_t size = src . size (); 336 if ( ( U64 ) start >= size ) 337 return bytes_t ( 0 ); 338 339 bytes_t all_bytes = size - ( U64 ) start; 340 if ( all_bytes > num_bytes ) 341 all_bytes = num_bytes; 342 343 if ( null_ref () ) 344 THROW ( xc_null_self_err, "wrote 0 of %lu bytes", ( U64 ) all_bytes ); 345 346 StreamItf * itf = get_itf ( CAP_WRITE ); 347 348 bytes_t mtu = itf -> get_mtu (); 349 bytes_t to_write = ( mtu < all_bytes ) ? mtu : all_bytes; 350 351 bytes_t total = itf -> write ( to_write, src, start ); 352 while ( total < all_bytes ) 353 { 354 to_write = all_bytes - total; 355 if ( mtu < to_write ) 356 to_write = mtu; 357 bytes_t num_writ = itf -> write ( to_write, src, start + ( U64 ) total ); 358 if ( num_writ == ( U64 ) 0 ) 359 THROW ( xc_transfer_incomplete_err, "wrote %lu of %lu bytes", ( U64 ) total, ( U64 ) all_bytes ); 360 total += num_writ; 361 } 362 363 return total; 364 } 365 Stream()366 Stream :: Stream () 367 { 368 } 369 Stream(const Stream & r)370 Stream :: Stream ( const Stream & r ) 371 : Ref < StreamItf > ( r ) 372 { 373 } 374 operator =(const Stream & r)375 void Stream :: operator = ( const Stream & r ) 376 { 377 Ref < StreamItf > :: operator = ( r ); 378 } 379 Stream(const Stream & r,caps_t reduce)380 Stream :: Stream ( const Stream & r, caps_t reduce ) 381 : Ref < StreamItf > ( r, reduce ) 382 { 383 } 384 Stream(const OpaqueRef & r)385 Stream :: Stream ( const OpaqueRef & r ) 386 : Ref < StreamItf > ( r, StreamItf :: cast ) 387 { 388 } 389 Stream(Refcount * obj,StreamItf * itf,caps_t caps)390 Stream :: Stream ( Refcount * obj, StreamItf * itf, caps_t caps ) 391 : Ref < StreamItf > ( obj, itf, caps ) 392 { 393 } 394 395 } 396