1/*
2  Copyright (C) 2000-2005 SKYRIX Software AG
3
4  This file is part of SOPE.
5
6  SOPE is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License as published by the
8  Free Software Foundation; either version 2, or (at your option) any
9  later version.
10
11  SOPE is distributed in the hope that it will be useful, but WITHOUT ANY
12  WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
14  License for more details.
15
16  You should have received a copy of the GNU Lesser General Public
17  License along with SOPE; see the file COPYING.  If not, write to the
18  Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19  02111-1307, USA.
20*/
21
22#include <NGStreams/NGBufferedStream.h>
23#include "common.h"
24
25#define NEWLINE_CHAR '\n'
26#define WRITE_WARN_SIZE (1024 * 1024 * 100) /* 100MB */
27
28@implementation NGBufferedStream
29
30static const unsigned DEFAULT_BUFFER_SIZE = 512;
31static Class DataStreamClass = Nil;
32
33+ (void)initialize {
34  DataStreamClass = NSClassFromString(@"NGDataStream");
35}
36
37// returns the number of bytes which where read from the buffer
38#define numberOfConsumedReadBufferBytes(self) \
39  ((self->readBufferSize == 0) ? 0 : (self->readBufferPos - self->readBuffer))
40
41// returns the number of bytes which can be read from buffer (without source access)
42#define numberOfAvailableReadBufferBytes(self) \
43  (self->readBufferFillSize - numberOfConsumedReadBufferBytes(self))
44
45// look whether all bytes in the buffer where consumed, if so, reset the buffer
46#define checkReadBufferFillState(self) \
47  if (numberOfAvailableReadBufferBytes(self) == 0) { \
48    self->readBufferPos = self->readBuffer; \
49    self->readBufferFillSize = 0;  \
50  }
51
52// ******************** constructors ********************
53
54+ (id)filterWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
55  if (_source == nil) return nil;
56  if (*(Class *)_source == DataStreamClass) return _source;
57  return [[[self alloc] initWithSource:_source bufferSize:_size] autorelease];
58}
59
60// TODO: can we reduced duplicate code here ...
61
62- (id)initWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
63  if (_source == nil) {
64    [self release];
65    return nil;
66  }
67  if (*(Class *)_source == DataStreamClass) {
68    [self release];
69    return (id)[_source retain];
70  }
71
72  if ((self = [super initWithSource:_source])) {
73    self->readBuffer  = calloc(_size, 1);
74    self->writeBuffer = calloc(_size, 1);
75
76    self->readBufferPos       = self->readBuffer;
77    self->readBufferSize      = _size;
78    self->readBufferFillSize  = 0; // no bytes are read from source
79    self->writeBufferFillSize = 0;
80    self->writeBufferSize     = _size;
81    self->flags._flushOnNewline = 1;
82  }
83  return self;
84}
85
86- (id)initWithInputSource:(id<NGInputStream>)_source bufferSize:(unsigned)_s {
87  if (_source == nil) {
88    [self release];
89    return nil;
90  }
91  if (*(Class *)_source == DataStreamClass) {
92    [self release];
93    return (id)[_source retain];
94  }
95
96  if ((self = [super initWithInputSource:_source])) {
97    self->readBuffer            = calloc(_s, 1);
98    self->readBufferPos         = self->readBuffer;
99    self->readBufferSize        = _s;
100    self->readBufferFillSize    = 0; // no bytes are read from source
101    self->flags._flushOnNewline = 1;
102  }
103  return self;
104}
105- (id)initWithOutputSource:(id<NGOutputStream>)_src bufferSize:(unsigned)_s {
106  if (_src == nil) {
107    [self release];
108    return nil;
109  }
110  if (*(Class *)_src == DataStreamClass) {
111    [self release];
112    return (id)[_src retain];
113  }
114
115  if ((self = [super initWithOutputSource:_src])) {
116    self->writeBuffer           = calloc(_s, 1);
117    self->writeBufferFillSize   = 0;
118    self->writeBufferSize       = _s;
119    self->flags._flushOnNewline = 1;
120  }
121  return self;
122}
123
124- (id)initWithSource:(id<NGStream>)_source {
125  return [self initWithSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
126}
127- (id)initWithInputSource:(id<NGInputStream>)_source {
128  return [self initWithInputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
129}
130- (id)initWithOutputSource:(id<NGOutputStream>)_source {
131  return [self initWithOutputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
132}
133
134- (void)dealloc {
135  [self flush];
136
137  if (self->readBuffer) {
138    free(self->readBuffer);
139    self->readBuffer    = NULL;
140    self->readBufferPos = NULL;
141  }
142  self->readBufferFillSize = 0;
143  self->readBufferSize     = 0;
144
145  if (self->writeBuffer) {
146    free(self->writeBuffer);
147    self->writeBuffer = NULL;
148  }
149  self->writeBufferFillSize = 0;
150  self->writeBufferSize     = 0;
151  [super dealloc];
152}
153
154/* accessors */
155
156- (void)setReadBufferSize:(unsigned)_size {
157  [self flush];
158
159  if (_size == self->readBufferSize)
160    return;
161
162  if (_size == 0) {
163    if (self->readBuffer != NULL) {
164      free(self->readBuffer);
165      self->readBuffer = NULL;
166    }
167    self->readBufferSize = _size;
168    self->readBufferPos  = NULL;
169  }
170  else {
171    if (self->readBuffer != NULL)
172      self->readBuffer = realloc(self->readBuffer, _size);
173    else
174      self->readBuffer = calloc(_size, 1);
175
176    self->readBufferSize     = _size;
177    self->readBufferPos      = self->readBuffer;
178    self->readBufferFillSize = 0; // no bytes a read from source
179  }
180}
181- (unsigned)readBufferSize {
182  return self->readBufferSize;
183}
184
185- (void)setWriteBufferSize:(unsigned)_size {
186  [self flush];
187
188  if (_size == self->writeBufferSize)
189    return;
190
191  self->writeBuffer = realloc(self->writeBuffer, _size);
192  self->writeBufferSize = _size;
193}
194- (unsigned)writeBufferSize {
195  return self->writeBufferSize;
196}
197
198/* blocking .. */
199
200- (BOOL)wouldBlockInMode:(NGStreamMode)_mode {
201  BOOL canRead, canWrite;
202
203  if (self->readBufferSize == 0)
204    canRead = NO;
205  else
206    canRead = (numberOfAvailableReadBufferBytes(self) > 0);
207
208  canWrite = (self->writeBufferSize == 0)
209    ? NO
210    : (self->writeBufferFillSize > 0);
211
212  if ((_mode == NGStreamMode_readWrite) && canRead && canWrite)
213    return NO;
214  if ((_mode == NGStreamMode_readOnly) && canRead) {
215    return NO;
216  }
217  if ((_mode == NGStreamMode_writeOnly) && canWrite)
218    return NO;
219
220  return ([self->source respondsToSelector:@selector(wouldBlockInMode:)])
221    ? [(id)self->source wouldBlockInMode:_mode]
222    : YES;
223}
224
225/* primitives */
226
227- (unsigned)readBytes:(void *)_buf count:(unsigned)_len {
228  register unsigned availBytes = numberOfAvailableReadBufferBytes(self);
229
230  if (self->readBufferSize == 0) { // no read buffering is done (buffersize==0)
231    return (readBytes != NULL)
232      ? readBytes(source, _cmd, _buf, _len)
233      : [source readBytes:_buf count:_len];
234  }
235
236  if (availBytes >= _len) {
237    // there are enough bytes in the buffer to fulfill the request
238    if (_len == 1) {
239      *(unsigned char *)_buf = *(unsigned char *)self->readBufferPos;
240      self->readBufferPos++;
241    }
242    else {
243      memcpy(_buf, self->readBufferPos, _len);
244      self->readBufferPos += _len; // update read position (consumed-size)
245    }
246    checkReadBufferFillState(self); // check whether all bytes where consumed
247    return _len;
248  }
249  else if (availBytes > 0) {
250    // there are some bytes in the buffer, these are returned
251
252    memcpy(_buf, self->readBufferPos, availBytes);// copy all bytes from buffer
253    self->readBufferPos      = self->readBuffer;  // reset position
254    self->readBufferFillSize = 0;   // no bytes available in buffer anymore
255    return availBytes;
256  }
257  else if (_len > self->readBufferSize) {
258    /*
259      requested _len is bigger than the buffersize, so we can bypass the
260      buffer (which is empty, as guaranteed by the previous 'ifs'
261    */
262
263    NSAssert(self->readBufferPos == self->readBuffer,
264             @"read buffer position is not reset");
265    NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
266
267    availBytes = (readBytes != NULL)
268      ? (unsigned)readBytes(source, _cmd, _buf, _len)
269      : [source readBytes:_buf count:_len];
270
271    if (availBytes == NGStreamError)
272      return NGStreamError;
273
274    NSAssert(availBytes != 0, @"readBytes:count: may never return zero !");
275
276    return availBytes; // return the number of bytes which could be read
277  }
278  else {
279    /*
280      no bytes are available and the requested _len is smaller than the
281      possible buffer size, we have to read the next block of input from the
282      source
283    */
284
285    NSAssert(self->readBufferPos == self->readBuffer,
286             @"read buffer position is not reset");
287    NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
288
289    self->readBufferFillSize = (readBytes != NULL)
290      ? (unsigned)readBytes(source,_cmd, self->readBuffer,self->readBufferSize)
291      : [source readBytes:self->readBuffer count:self->readBufferSize];
292
293    if (self->readBufferFillSize == NGStreamError) {
294      self->readBufferFillSize = 0;
295      return NGStreamError;
296    }
297
298    NSAssert(self->readBufferFillSize != 0,
299             @"readBytes:count: may never return zero !");
300
301    /*
302       now comes a section which is roughly the same like the first to
303       conditionals in this method
304    */
305    if (self->readBufferFillSize >= _len) {
306      // there are enough bytes in the buffer to fulfill the request
307
308      memcpy(_buf, self->readBufferPos, _len);
309      self->readBufferPos += _len;    // update read position (consumed-size)
310      checkReadBufferFillState(self); // check whether all bytes where consumed
311      return _len;
312    }
313    else { // (readBufferFillSize > 0) (this is ensured by the above assert)
314      // there are some bytes in the buffer, these are returned
315
316      availBytes = self->readBufferFillSize;
317      // copy all bytes from buffer
318      memcpy(_buf, self->readBufferPos, self->readBufferFillSize);
319      self->readBufferPos      = self->readBuffer; // reset position
320      self->readBufferFillSize = 0; // no bytes available in buffer anymore
321      return availBytes;
322    }
323  }
324}
325
326- (int)readByte {
327  if (self->readBufferSize == 0) // no read buffering is done (buffersize==0)
328    return [super readByte];
329
330  if (numberOfAvailableReadBufferBytes(self) >= 1) {
331    unsigned char byte = *(unsigned char *)self->readBufferPos;
332    self->readBufferPos++;
333    checkReadBufferFillState(self); // check whether all bytes where consumed
334    return byte;
335  }
336  return [super readByte];
337}
338
339- (unsigned)writeBytes:(const void *)_buf count:(unsigned)_len {
340  register unsigned tmp       = 0;
341  register unsigned remaining = _len;
342  register void     *track    = (void *)_buf;
343
344#if DEBUG
345  if (_len > WRITE_WARN_SIZE) {
346    NSLog(@"WARNING(%s): got passed in length %uMB (%u bytes, errcode=%u) ...",
347          __PRETTY_FUNCTION__, (_len / 1024 / 1024), _len, NGStreamError);
348  }
349#endif
350
351  while (remaining > 0) {
352    // how much bytes available in buffer ?
353    tmp = self->writeBufferSize - self->writeBufferFillSize;
354    tmp = (tmp > remaining) ? remaining : tmp;
355
356    memcpy((self->writeBuffer + self->writeBufferFillSize), track, tmp);
357    track += tmp;
358    remaining -= tmp;
359    self->writeBufferFillSize += tmp;
360
361    if (self->writeBufferFillSize == self->writeBufferSize) {
362      BOOL ok;
363
364      ok = [self->source safeWriteBytes:self->writeBuffer
365                         count:self->writeBufferFillSize];
366      if (!ok) return NGStreamError;
367
368      self->writeBufferFillSize = 0;
369    }
370  }
371
372  if (self->flags._flushOnNewline == 1) {
373    // scan buffer for newlines, if one is found, flush buffer
374
375    for (tmp = 0; tmp < _len; tmp++) {
376      if (tmp == NEWLINE_CHAR) {
377        if (![self flush])
378          return NGStreamError;
379        break;
380      }
381    }
382  }
383
384  /* clean up for GC */
385  tmp       = 0;
386  track     = NULL; // clean up for GC
387  remaining = 0;
388
389  return _len;
390}
391
392- (BOOL)close {
393  if (![self flush])
394    return NO;
395
396  if (self->readBuffer) {
397    free(self->readBuffer);
398    self->readBuffer = NULL;
399    self->readBufferPos = NULL;
400  }
401  self->readBufferFillSize = 0;
402  self->readBufferSize = 0;
403
404  if (self->writeBuffer) {
405    free(self->writeBuffer);
406    self->writeBuffer = NULL;
407  }
408  self->writeBufferFillSize = 0;
409  self->writeBufferSize = 0;
410
411  return [super close];
412}
413
414- (BOOL)flush {
415  if (self->writeBufferFillSize > 0) {
416    BOOL ok;
417
418#if DEBUG
419    if (self->writeBufferFillSize > WRITE_WARN_SIZE) {
420      NSLog(@"WARNING(%s): shall flush %uMB (%u bytes, errcode=%u) ...",
421            __PRETTY_FUNCTION__, (self->writeBufferFillSize/1024/1024),
422            self->writeBufferFillSize, NGStreamError);
423      //abort();
424    }
425#endif
426
427    ok = [self->source
428              safeWriteBytes:self->writeBuffer
429              count:self->writeBufferFillSize];
430    if (!ok) {
431      /* should check exception for fill size ? ... */
432      return NO;
433    }
434
435    self->writeBufferFillSize = 0;
436  }
437  return YES;
438}
439
440@end /* NGBufferedStream */
441
442@implementation NGStream(NGBufferedStreamExtensions)
443
444- (NGBufferedStream *)bufferedStream {
445  return [NGBufferedStream filterWithSource:self];
446}
447
448@end /* NGStream(NGBufferedStreamExtensions) */
449
450@implementation NGBufferedStream(NGBufferedStreamExtensions)
451
452- (NGBufferedStream *)bufferedStream {
453  return self;
454}
455
456@end /* NGBufferedStream(NGBufferedStreamExtensions) */
457