1/* 2 Copyright (C) 2000-2005 SKYRIX Software AG 3 4 This file is part of SOPE. 5 6 SOPE is free software; you can redistribute it and/or modify it under 7 the terms of the GNU Lesser General Public License as published by the 8 Free Software Foundation; either version 2, or (at your option) any 9 later version. 10 11 SOPE is distributed in the hope that it will be useful, but WITHOUT ANY 12 WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 14 License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with SOPE; see the file COPYING. If not, write to the 18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA 19 02111-1307, USA. 20*/ 21 22#include <NGStreams/NGBufferedStream.h> 23#include "common.h" 24 25#define NEWLINE_CHAR '\n' 26#define WRITE_WARN_SIZE (1024 * 1024 * 100) /* 100MB */ 27 28@implementation NGBufferedStream 29 30static const unsigned DEFAULT_BUFFER_SIZE = 512; 31static Class DataStreamClass = Nil; 32 33+ (void)initialize { 34 DataStreamClass = NSClassFromString(@"NGDataStream"); 35} 36 37// returns the number of bytes which where read from the buffer 38#define numberOfConsumedReadBufferBytes(self) \ 39 ((self->readBufferSize == 0) ? 0 : (self->readBufferPos - self->readBuffer)) 40 41// returns the number of bytes which can be read from buffer (without source access) 42#define numberOfAvailableReadBufferBytes(self) \ 43 (self->readBufferFillSize - numberOfConsumedReadBufferBytes(self)) 44 45// look whether all bytes in the buffer where consumed, if so, reset the buffer 46#define checkReadBufferFillState(self) \ 47 if (numberOfAvailableReadBufferBytes(self) == 0) { \ 48 self->readBufferPos = self->readBuffer; \ 49 self->readBufferFillSize = 0; \ 50 } 51 52// ******************** constructors ******************** 53 54+ (id)filterWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size { 55 if (_source == nil) return nil; 56 if (*(Class *)_source == DataStreamClass) return _source; 57 return [[[self alloc] initWithSource:_source bufferSize:_size] autorelease]; 58} 59 60// TODO: can we reduced duplicate code here ... 61 62- (id)initWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size { 63 if (_source == nil) { 64 [self release]; 65 return nil; 66 } 67 if (*(Class *)_source == DataStreamClass) { 68 [self release]; 69 return (id)[_source retain]; 70 } 71 72 if ((self = [super initWithSource:_source])) { 73 self->readBuffer = calloc(_size, 1); 74 self->writeBuffer = calloc(_size, 1); 75 76 self->readBufferPos = self->readBuffer; 77 self->readBufferSize = _size; 78 self->readBufferFillSize = 0; // no bytes are read from source 79 self->writeBufferFillSize = 0; 80 self->writeBufferSize = _size; 81 self->flags._flushOnNewline = 1; 82 } 83 return self; 84} 85 86- (id)initWithInputSource:(id<NGInputStream>)_source bufferSize:(unsigned)_s { 87 if (_source == nil) { 88 [self release]; 89 return nil; 90 } 91 if (*(Class *)_source == DataStreamClass) { 92 [self release]; 93 return (id)[_source retain]; 94 } 95 96 if ((self = [super initWithInputSource:_source])) { 97 self->readBuffer = calloc(_s, 1); 98 self->readBufferPos = self->readBuffer; 99 self->readBufferSize = _s; 100 self->readBufferFillSize = 0; // no bytes are read from source 101 self->flags._flushOnNewline = 1; 102 } 103 return self; 104} 105- (id)initWithOutputSource:(id<NGOutputStream>)_src bufferSize:(unsigned)_s { 106 if (_src == nil) { 107 [self release]; 108 return nil; 109 } 110 if (*(Class *)_src == DataStreamClass) { 111 [self release]; 112 return (id)[_src retain]; 113 } 114 115 if ((self = [super initWithOutputSource:_src])) { 116 self->writeBuffer = calloc(_s, 1); 117 self->writeBufferFillSize = 0; 118 self->writeBufferSize = _s; 119 self->flags._flushOnNewline = 1; 120 } 121 return self; 122} 123 124- (id)initWithSource:(id<NGStream>)_source { 125 return [self initWithSource:_source bufferSize:DEFAULT_BUFFER_SIZE]; 126} 127- (id)initWithInputSource:(id<NGInputStream>)_source { 128 return [self initWithInputSource:_source bufferSize:DEFAULT_BUFFER_SIZE]; 129} 130- (id)initWithOutputSource:(id<NGOutputStream>)_source { 131 return [self initWithOutputSource:_source bufferSize:DEFAULT_BUFFER_SIZE]; 132} 133 134- (void)dealloc { 135 [self flush]; 136 137 if (self->readBuffer) { 138 free(self->readBuffer); 139 self->readBuffer = NULL; 140 self->readBufferPos = NULL; 141 } 142 self->readBufferFillSize = 0; 143 self->readBufferSize = 0; 144 145 if (self->writeBuffer) { 146 free(self->writeBuffer); 147 self->writeBuffer = NULL; 148 } 149 self->writeBufferFillSize = 0; 150 self->writeBufferSize = 0; 151 [super dealloc]; 152} 153 154/* accessors */ 155 156- (void)setReadBufferSize:(unsigned)_size { 157 [self flush]; 158 159 if (_size == self->readBufferSize) 160 return; 161 162 if (_size == 0) { 163 if (self->readBuffer != NULL) { 164 free(self->readBuffer); 165 self->readBuffer = NULL; 166 } 167 self->readBufferSize = _size; 168 self->readBufferPos = NULL; 169 } 170 else { 171 if (self->readBuffer != NULL) 172 self->readBuffer = realloc(self->readBuffer, _size); 173 else 174 self->readBuffer = calloc(_size, 1); 175 176 self->readBufferSize = _size; 177 self->readBufferPos = self->readBuffer; 178 self->readBufferFillSize = 0; // no bytes a read from source 179 } 180} 181- (unsigned)readBufferSize { 182 return self->readBufferSize; 183} 184 185- (void)setWriteBufferSize:(unsigned)_size { 186 [self flush]; 187 188 if (_size == self->writeBufferSize) 189 return; 190 191 self->writeBuffer = realloc(self->writeBuffer, _size); 192 self->writeBufferSize = _size; 193} 194- (unsigned)writeBufferSize { 195 return self->writeBufferSize; 196} 197 198/* blocking .. */ 199 200- (BOOL)wouldBlockInMode:(NGStreamMode)_mode { 201 BOOL canRead, canWrite; 202 203 if (self->readBufferSize == 0) 204 canRead = NO; 205 else 206 canRead = (numberOfAvailableReadBufferBytes(self) > 0); 207 208 canWrite = (self->writeBufferSize == 0) 209 ? NO 210 : (self->writeBufferFillSize > 0); 211 212 if ((_mode == NGStreamMode_readWrite) && canRead && canWrite) 213 return NO; 214 if ((_mode == NGStreamMode_readOnly) && canRead) { 215 return NO; 216 } 217 if ((_mode == NGStreamMode_writeOnly) && canWrite) 218 return NO; 219 220 return ([self->source respondsToSelector:@selector(wouldBlockInMode:)]) 221 ? [(id)self->source wouldBlockInMode:_mode] 222 : YES; 223} 224 225/* primitives */ 226 227- (unsigned)readBytes:(void *)_buf count:(unsigned)_len { 228 register unsigned availBytes = numberOfAvailableReadBufferBytes(self); 229 230 if (self->readBufferSize == 0) { // no read buffering is done (buffersize==0) 231 return (readBytes != NULL) 232 ? readBytes(source, _cmd, _buf, _len) 233 : [source readBytes:_buf count:_len]; 234 } 235 236 if (availBytes >= _len) { 237 // there are enough bytes in the buffer to fulfill the request 238 if (_len == 1) { 239 *(unsigned char *)_buf = *(unsigned char *)self->readBufferPos; 240 self->readBufferPos++; 241 } 242 else { 243 memcpy(_buf, self->readBufferPos, _len); 244 self->readBufferPos += _len; // update read position (consumed-size) 245 } 246 checkReadBufferFillState(self); // check whether all bytes where consumed 247 return _len; 248 } 249 else if (availBytes > 0) { 250 // there are some bytes in the buffer, these are returned 251 252 memcpy(_buf, self->readBufferPos, availBytes);// copy all bytes from buffer 253 self->readBufferPos = self->readBuffer; // reset position 254 self->readBufferFillSize = 0; // no bytes available in buffer anymore 255 return availBytes; 256 } 257 else if (_len > self->readBufferSize) { 258 /* 259 requested _len is bigger than the buffersize, so we can bypass the 260 buffer (which is empty, as guaranteed by the previous 'ifs' 261 */ 262 263 NSAssert(self->readBufferPos == self->readBuffer, 264 @"read buffer position is not reset"); 265 NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer"); 266 267 availBytes = (readBytes != NULL) 268 ? (unsigned)readBytes(source, _cmd, _buf, _len) 269 : [source readBytes:_buf count:_len]; 270 271 if (availBytes == NGStreamError) 272 return NGStreamError; 273 274 NSAssert(availBytes != 0, @"readBytes:count: may never return zero !"); 275 276 return availBytes; // return the number of bytes which could be read 277 } 278 else { 279 /* 280 no bytes are available and the requested _len is smaller than the 281 possible buffer size, we have to read the next block of input from the 282 source 283 */ 284 285 NSAssert(self->readBufferPos == self->readBuffer, 286 @"read buffer position is not reset"); 287 NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer"); 288 289 self->readBufferFillSize = (readBytes != NULL) 290 ? (unsigned)readBytes(source,_cmd, self->readBuffer,self->readBufferSize) 291 : [source readBytes:self->readBuffer count:self->readBufferSize]; 292 293 if (self->readBufferFillSize == NGStreamError) { 294 self->readBufferFillSize = 0; 295 return NGStreamError; 296 } 297 298 NSAssert(self->readBufferFillSize != 0, 299 @"readBytes:count: may never return zero !"); 300 301 /* 302 now comes a section which is roughly the same like the first to 303 conditionals in this method 304 */ 305 if (self->readBufferFillSize >= _len) { 306 // there are enough bytes in the buffer to fulfill the request 307 308 memcpy(_buf, self->readBufferPos, _len); 309 self->readBufferPos += _len; // update read position (consumed-size) 310 checkReadBufferFillState(self); // check whether all bytes where consumed 311 return _len; 312 } 313 else { // (readBufferFillSize > 0) (this is ensured by the above assert) 314 // there are some bytes in the buffer, these are returned 315 316 availBytes = self->readBufferFillSize; 317 // copy all bytes from buffer 318 memcpy(_buf, self->readBufferPos, self->readBufferFillSize); 319 self->readBufferPos = self->readBuffer; // reset position 320 self->readBufferFillSize = 0; // no bytes available in buffer anymore 321 return availBytes; 322 } 323 } 324} 325 326- (int)readByte { 327 if (self->readBufferSize == 0) // no read buffering is done (buffersize==0) 328 return [super readByte]; 329 330 if (numberOfAvailableReadBufferBytes(self) >= 1) { 331 unsigned char byte = *(unsigned char *)self->readBufferPos; 332 self->readBufferPos++; 333 checkReadBufferFillState(self); // check whether all bytes where consumed 334 return byte; 335 } 336 return [super readByte]; 337} 338 339- (unsigned)writeBytes:(const void *)_buf count:(unsigned)_len { 340 register unsigned tmp = 0; 341 register unsigned remaining = _len; 342 register void *track = (void *)_buf; 343 344#if DEBUG 345 if (_len > WRITE_WARN_SIZE) { 346 NSLog(@"WARNING(%s): got passed in length %uMB (%u bytes, errcode=%u) ...", 347 __PRETTY_FUNCTION__, (_len / 1024 / 1024), _len, NGStreamError); 348 } 349#endif 350 351 while (remaining > 0) { 352 // how much bytes available in buffer ? 353 tmp = self->writeBufferSize - self->writeBufferFillSize; 354 tmp = (tmp > remaining) ? remaining : tmp; 355 356 memcpy((self->writeBuffer + self->writeBufferFillSize), track, tmp); 357 track += tmp; 358 remaining -= tmp; 359 self->writeBufferFillSize += tmp; 360 361 if (self->writeBufferFillSize == self->writeBufferSize) { 362 BOOL ok; 363 364 ok = [self->source safeWriteBytes:self->writeBuffer 365 count:self->writeBufferFillSize]; 366 if (!ok) return NGStreamError; 367 368 self->writeBufferFillSize = 0; 369 } 370 } 371 372 if (self->flags._flushOnNewline == 1) { 373 // scan buffer for newlines, if one is found, flush buffer 374 375 for (tmp = 0; tmp < _len; tmp++) { 376 if (tmp == NEWLINE_CHAR) { 377 if (![self flush]) 378 return NGStreamError; 379 break; 380 } 381 } 382 } 383 384 /* clean up for GC */ 385 tmp = 0; 386 track = NULL; // clean up for GC 387 remaining = 0; 388 389 return _len; 390} 391 392- (BOOL)close { 393 if (![self flush]) 394 return NO; 395 396 if (self->readBuffer) { 397 free(self->readBuffer); 398 self->readBuffer = NULL; 399 self->readBufferPos = NULL; 400 } 401 self->readBufferFillSize = 0; 402 self->readBufferSize = 0; 403 404 if (self->writeBuffer) { 405 free(self->writeBuffer); 406 self->writeBuffer = NULL; 407 } 408 self->writeBufferFillSize = 0; 409 self->writeBufferSize = 0; 410 411 return [super close]; 412} 413 414- (BOOL)flush { 415 if (self->writeBufferFillSize > 0) { 416 BOOL ok; 417 418#if DEBUG 419 if (self->writeBufferFillSize > WRITE_WARN_SIZE) { 420 NSLog(@"WARNING(%s): shall flush %uMB (%u bytes, errcode=%u) ...", 421 __PRETTY_FUNCTION__, (self->writeBufferFillSize/1024/1024), 422 self->writeBufferFillSize, NGStreamError); 423 //abort(); 424 } 425#endif 426 427 ok = [self->source 428 safeWriteBytes:self->writeBuffer 429 count:self->writeBufferFillSize]; 430 if (!ok) { 431 /* should check exception for fill size ? ... */ 432 return NO; 433 } 434 435 self->writeBufferFillSize = 0; 436 } 437 return YES; 438} 439 440@end /* NGBufferedStream */ 441 442@implementation NGStream(NGBufferedStreamExtensions) 443 444- (NGBufferedStream *)bufferedStream { 445 return [NGBufferedStream filterWithSource:self]; 446} 447 448@end /* NGStream(NGBufferedStreamExtensions) */ 449 450@implementation NGBufferedStream(NGBufferedStreamExtensions) 451 452- (NGBufferedStream *)bufferedStream { 453 return self; 454} 455 456@end /* NGBufferedStream(NGBufferedStreamExtensions) */ 457