1 {
2  *****************************************************************************
3   This file is part of LazUtils.
4 
5   See the file COPYING.modifiedLGPL.txt, included in this distribution,
6   for details about the license.
7  *****************************************************************************
8 
9   Abstract:
10     A dynamic data queue to push and pop arbitrary data.
11 }
12 unit DynQueue;
13 
14 {$mode objfpc}{$H+}
15 
16 interface
17 
18 uses
19   Classes, SysUtils,
20   // LazUtils
21   LazLoggerBase;
22 
23 type
24   TDynamicQueueItem = record
25     Size: integer;
26     Data: array[0..0] of integer;// type is irrelevant, the record is open ended
27   end;
28   PDynamicQueueItem = ^TDynamicQueueItem;
29   ListOfPDynamicQueueItem = ^PDynamicQueueItem;
30 
31   { TDynamicDataQueue
32     A queue for arbitrary data. That means first in first out.
33 
34     Push: put data in the queue
35     Pop:  fetch data from the queue (data is removed from queue)
36     Top:  read data in the queue (data remains in the queue)
37 
38     This queue maintains internally a ring queue for pointers to data chunks of
39     TDynamicQueueItem. It is optimised to reduce the amount of data movement. }
40 
41   TDynamicDataQueue =  class
42   private
43     FItems: ListOfPDynamicQueueItem; // ring queue from FTopIndex to FLastIndex
44     FItemCapacity: integer; // length of ListOfPDynamicQueueItem
45     FTopIndex: integer; // first item in FItems
46     FLastIndex: integer; // last item in FItems
47     FMaximumBlockSize: integer;
48     FMinimumBlockSize: integer;
49     FSize: int64;
50     FTopItemSpace: integer; // space in top item
51     FLastItemSpace: integer; // remaining space in last item
52     procedure SetMaximumBlockSize(const AValue: integer);
53     procedure SetMinimumBlockSize(const AValue: integer);
54     procedure GrowItems;
55     procedure AddItem(ItemSize: integer);
CalculateItemSizenull56     function CalculateItemSize(ItemSize: integer): integer;
PushInternalnull57     function PushInternal(Source: PByte; AStream: TStream; Count: integer): integer;// add to end of queue
PopTopInternalnull58     function PopTopInternal(Dest: PByte; AStream: TStream; Count: integer; KeepData: Boolean): integer;// read from start of queue, remove from queue
59   public
60     constructor Create;
61     destructor Destroy; override;
62     procedure Clear;
63     procedure ConsistencyCheck;
64     procedure WriteDebugReport(WriteData: Boolean);
Pushnull65     function Push(const Buffer; Count: integer): integer;// add to end of queue
Pushnull66     function Push(AStream: TStream; Count: integer): integer;// add to end of queue
Popnull67     function Pop(var Buffer; Count: integer): integer; // read from start of queue, remove from queue
Popnull68     function Pop(AStream: TStream; Count: integer): integer;// read from start of queue, remove from queue
Topnull69     function Top(var Buffer; Count: integer): integer; // read from start of queue, keep data
Topnull70     function Top(AStream: TStream; Count: integer): integer;// read from start of queue, keep data
71     property Size: int64 read FSize;
72     property MinimumBlockSize: integer read FMinimumBlockSize write SetMinimumBlockSize;
73     property MaximumBlockSize: integer read FMaximumBlockSize write SetMaximumBlockSize;
74   end;
75 
76 implementation
77 
78 { TDynamicDataQueue }
79 
80 procedure TDynamicDataQueue.SetMinimumBlockSize(const AValue: integer);
81 begin
82   if (FMinimumBlockSize=AValue) then exit;
83   FMinimumBlockSize:=AValue;
84   if FMinimumBlockSize<16 then FMinimumBlockSize:=16;
85   if FMaximumBlockSize<FMinimumBlockSize then
86     FMaximumBlockSize:=FMinimumBlockSize;
87 end;
88 
89 procedure TDynamicDataQueue.GrowItems;
90 var
91   NewCapacity: LongInt;
92   NewSize: Integer;
93   NewItems: ListOfPDynamicQueueItem;
94   DestIndex: Integer;
95   SrcIndex: LongInt;
96 begin
97   // allocate a new ring queue
98   NewCapacity:=FItemCapacity;
99   if NewCapacity<8 then
100     NewCapacity:=8
101   else
102     NewCapacity:=NewCapacity*2;
103   NewSize:=NewCapacity*SizeOf(Pointer);
104   GetMem(NewItems,NewSize);
105   FillChar(NewItems^,NewSize,0);
106 
107   // copy old items
108   DestIndex:=0;
109   if FItems<>nil then begin
110     SrcIndex:=FTopIndex;
111     repeat
112       NewItems[DestIndex]:=FItems[SrcIndex];
113       if SrcIndex=FLastIndex then break;
114       inc(DestIndex);
115       inc(SrcIndex);
116       if SrcIndex=FItemCapacity then
117         SrcIndex:=0;
118     until false;
119     FreeMem(FItems);
120   end;
121   FTopIndex:=0;
122   FLastIndex:=DestIndex;
123   FItems:=NewItems;
124   FItemCapacity:=NewCapacity;
125 end;
126 
127 procedure TDynamicDataQueue.AddItem(ItemSize: integer);
128 var
129   NewIndex: Integer;
130 
131   procedure RaiseInconsistency;
132   begin
133     raise Exception.Create('TDynamicDataQueue.AddItem NewIndex='+IntToStr(NewIndex));
134   end;
135 
136 begin
137   // check that there is space for the new item
138   NewIndex:=FLastIndex;
139   if (FItems<>nil) and (FItems[NewIndex]<>nil) then begin
140     inc(NewIndex);
141     if NewIndex>=FItemCapacity then
142       NewIndex:=0;
143   end;
144   if NewIndex=FTopIndex then begin
145     GrowItems;
146     NewIndex:=FLastIndex;
147     if FItems[NewIndex]<>nil then begin
148       inc(NewIndex);
149       if NewIndex>=FItemCapacity then
150         NewIndex:=0;
151     end;
152   end;
153   if (FItems=nil) then RaiseInconsistency;
154   if (FItems[NewIndex]<>nil) then RaiseInconsistency;
155 
156   FLastIndex:=NewIndex;
157   GetMem(FItems[FLastIndex],SizeOf(TDynamicQueueItem.Size)+ItemSize);
158   FItems[FLastIndex]^.Size:=ItemSize;
159 end;
160 
CalculateItemSizenull161 function TDynamicDataQueue.CalculateItemSize(ItemSize: integer): integer;
162 begin
163   Result:=ItemSize;
164   if Result<MinimumBlockSize then
165     Result:=MinimumBlockSize;
166   if Result>MaximumBlockSize then
167     Result:=MaximumBlockSize;
168 end;
169 
PushInternalnull170 function TDynamicDataQueue.PushInternal(Source: PByte; AStream: TStream;
171   Count: integer): integer;
172 var
173   CurCount: PtrInt;
174   NewItemSize: LongInt;
175   LastItem: PDynamicQueueItem;
176   Dest: Pointer;
177 begin
178   Result:=0;
179   if Count<=0 then exit;
180   while true do begin
181     while FLastItemSpace>0 do begin
182       // fill the last item
183       CurCount:=Count;
184       if CurCount>FLastItemSpace then
185         CurCount:=FLastItemSpace;
186       LastItem:=FItems[FLastIndex];
187       Dest:=Pointer(@(LastItem^.Data))+LastItem^.Size-FLastItemSpace;
188 
189       // beware: read from a stream can raise an exception
190       if Source<>nil then
191         System.Move(Source[Result],Dest^,CurCount)
192       else
193         CurCount:=AStream.Read(Dest^,CurCount);
194       if CurCount<=0 then exit;
195 
196       // transfer succeeded
197       dec(FLastItemSpace,CurCount); // space decreased
198       inc(fSize,CurCount);   // Queue increased
199       inc(Result,CurCount);  // bytes transferred
200       dec(Count,CurCount);   // less to transfer
201       if Count=0 then exit;
202     end;
203     // add new
204     NewItemSize:=CalculateItemSize(Count);
205     AddItem(NewItemSize);
206     FLastItemSpace:=NewItemSize;
207   end;
208 end;
209 
PopTopInternalnull210 function TDynamicDataQueue.PopTopInternal(Dest: PByte; AStream: TStream;
211   Count: integer; KeepData: Boolean): integer;
212 
213   procedure RaiseInconsistencySizeNot0;
214   begin
215     raise Exception.Create('TDynamicDataQueue.PopTopInternal inconsistency size<>0');
216   end;
217 
218   procedure RaiseInconsistencyEmptyItem;
219   begin
220     raise Exception.Create('TDynamicDataQueue.PopTopInternal inconsistency empty item');
221   end;
222 
223   procedure RaiseInconsistencySizeNegative;
224   begin
225     raise Exception.Create('TDynamicDataQueue.PopTopInternal inconsistency size<0');
226   end;
227 
228 var
229   Item: PDynamicQueueItem;
230   CurCount: Integer;
231   Source: PByte;
232   CurItemSize: LongInt;
233   ReadIndex: LongInt;
234   TransferredCount: LongInt;
235 begin
236   Result:=0;
237   if Count<=0 then exit;
238   ReadIndex:=FTopIndex;
239 
240   while Count>0 do begin
241     if FItems=nil then exit; // no data
242 
243     Item:=FItems[ReadIndex];
244     CurItemSize:=Item^.Size;
245     if ReadIndex=FLastIndex then
246       dec(CurItemSize,FLastItemSpace);
247     CurCount:=CurItemSize;
248     if ReadIndex=FTopIndex then
249       dec(CurCount,FTopItemSpace);
250     if CurCount<=0 then
251       RaiseInconsistencyEmptyItem;
252 
253     // copy data from the TopItem
254     if CurCount>Count then
255       CurCount:=Count;
256     Source:=PByte(@Item^.Data);
257     if ReadIndex=FTopIndex then
258       inc(Source,FTopItemSpace);
259 
260     // beware: writing to a stream can raise an exception
261     if Dest<>nil then begin
262       System.Move(Source^,Dest[Result],CurCount);
263       TransferredCount:=CurCount;
264     end else
265       TransferredCount:=AStream.Write(Dest^,CurCount);
266     if TransferredCount<=0 then
267       exit;
268 
269     // transfer succeeded (at least partially)
270     inc(Result,TransferredCount); // bytes transferred
271     dec(Count,TransferredCount);  // less to transfer
272     if (not KeepData) then begin
273       dec(FSize,TransferredCount);  // Queue decreased
274       if FSize<0 then RaiseInconsistencySizeNegative;
275 
276       if (ReadIndex=FTopIndex) then begin
277         inc(FTopItemSpace,TransferredCount); // space in top item increased
278 
279         if (FTopItemSpace=CurItemSize) then begin
280           // item complete -> remove item
281           FreeMem(Item);
282           FItems[FTopIndex]:=nil;
283           if FTopIndex=FLastIndex then begin
284             // complete queue read
285             if Size<>0 then RaiseInconsistencySizeNot0;
286             Clear;
287             exit;
288           end;
289 
290           FTopItemSpace:=0;
291           inc(FTopIndex);
292           if FTopIndex=FItemCapacity then FTopIndex:=0;
293         end;
294       end;
295     end;
296     if (Count=0) or (TransferredCount<CurCount) then exit;
297 
298     if TransferredCount=CurCount then begin
299       // next item
300       inc(ReadIndex);
301       if ReadIndex=FItemCapacity then ReadIndex:=0;
302     end;
303   end;
304 end;
305 
306 procedure TDynamicDataQueue.SetMaximumBlockSize(const AValue: integer);
307 begin
308   if FMaximumBlockSize=AValue then exit;
309   FMaximumBlockSize:=AValue;
310   if FMaximumBlockSize<FMinimumBlockSize then
311     FMaximumBlockSize:=FMinimumBlockSize;
312 end;
313 
314 constructor TDynamicDataQueue.Create;
315 begin
316   FMinimumBlockSize:=512;
317   FMaximumBlockSize:=4096;
318 end;
319 
320 destructor TDynamicDataQueue.Destroy;
321 begin
322   Clear;
323   inherited Destroy;
324 end;
325 
TDynamicDataQueue.Pushnull326 function TDynamicDataQueue.Push(const Buffer; Count: integer): integer;
327 begin
328   Result:=PushInternal(PByte(@Buffer),nil,Count);
329 end;
330 
TDynamicDataQueue.Pushnull331 function TDynamicDataQueue.Push(AStream: TStream; Count: integer): integer;
332 begin
333   Result:=PushInternal(nil,AStream,Count);
334 end;
335 
Popnull336 function TDynamicDataQueue.Pop(var Buffer; Count: integer): integer;
337 begin
338   Result:=PopTopInternal(PByte(@Buffer),nil,Count,false);
339 end;
340 
Popnull341 function TDynamicDataQueue.Pop(AStream: TStream; Count: integer): integer;
342 begin
343   Result:=PopTopInternal(nil,AStream,Count,false);
344 end;
345 
Topnull346 function TDynamicDataQueue.Top(var Buffer; Count: integer): integer;
347 begin
348   Result:=PopTopInternal(PByte(@Buffer),nil,Count,true);
349 end;
350 
Topnull351 function TDynamicDataQueue.Top(AStream: TStream; Count: integer): integer;
352 begin
353   Result:=PopTopInternal(nil,AStream,Count,true);
354 end;
355 
356 procedure TDynamicDataQueue.Clear;
357 begin
358   while FTopIndex<>FLastIndex do begin
359     FreeMem(FItems[FTopIndex]);
360     inc(FTopIndex);
361     if FTopIndex=FItemCapacity then
362       FTopIndex:=0;
363   end;
364   FTopIndex:=0;
365   FLastIndex:=0;
366   FSize:=0;
367   FreeMem(FItems);
368   FItems:=nil;
369   FItemCapacity:=0;
370   FTopItemSpace:=0;
371   FLastItemSpace:=0;
372 end;
373 
374 procedure TDynamicDataQueue.ConsistencyCheck;
375 
376   procedure Error(const Msg: string);
377   begin
378     raise Exception.Create('TDynamicDataQueue.ConsistencyCheck '+Msg);
379   end;
380 
381 var
382   i: LongInt;
383   RealSize: int64;
384   CurSize: LongInt;
385 begin
386   if Size<0 then Error('');
387   if FMinimumBlockSize>FMaximumBlockSize then Error('');
388   if FMinimumBlockSize<16 then Error('');
389   if (FItems=nil) then begin
390     if Size<>0 then Error('');
391   end else begin
392     if FItemCapacity<=0 then Error('');
393     if Size=0 then Error('');
394     if FTopIndex<0 then Error('');
395     if FLastIndex<0 then Error('');
396     if FTopIndex>=FItemCapacity then Error('');
397     if FLastIndex>=FItemCapacity then Error('');
398 
399     // check used items
400     RealSize:=0;
401     i:=FTopIndex;
402     repeat
403       if FItems[i]=nil then Error('');
404       if FItems[i]^.Size<=0 then Error('');
405       CurSize:=FItems[i]^.Size;
406       if FTopIndex=i then
407         dec(CurSize,FTopItemSpace);
408       if FLastIndex=i then
409         dec(CurSize,FLastItemSpace);
410       inc(RealSize,CurSize);
411       if i=FLastIndex then break;
412       inc(i);
413       if i=FItemCapacity then i:=0;
414     until false;
415     if RealSize<>Size then Error('');
416 
417     // check unused items
418     inc(i);
419     if i=FItemCapacity then i:=0;
420     while (i<>FTopIndex) do begin
421       if FItems[i]<>nil then Error('');
422       inc(i);
423       if i=FItemCapacity then i:=0;
424     end;
425 
426     // check space
427     if FLastItemSpace<0 then Error('');
428     if FItems[FLastIndex]^.Size<=FLastItemSpace then Error('');
429     if FTopItemSpace<0 then Error('');
430     if FItems[FTopIndex]^.Size<=FTopItemSpace then Error('');
431     if (FTopIndex=FLastIndex)
432     and (FTopItemSpace>=FItems[FTopIndex]^.Size-FLastItemSpace) then Error('');
433   end;
434 end;
435 
436 procedure TDynamicDataQueue.WriteDebugReport(WriteData: Boolean);
437 var
438   i: LongInt;
439   DataCount: LongInt;
440   DataOffset: Integer;
441 begin
442   DebugLn(['TDynamicDataQueue.WriteDebugReport FItemCapacity=',FItemCapacity,
443     ' FTopIndex=',FTopIndex,' FTopItemSpace=',FTopItemSpace,
444     ' FLastIndex=',FLastIndex,' FLastItemSpace=',FLastItemSpace,
445     ' Size=',Size,
446     ' MinimumBlockSize=',MinimumBlockSize,
447     ' MaximumBlockSize=',MaximumBlockSize]);
448   if FItems<>nil then begin
449     i:=FTopIndex;
450     repeat
451       DataCount:=FItems[i]^.Size;
452       DataOffset:=0;
453       if FTopIndex=i then begin
454         dec(DataCount,FTopItemSpace);
455         inc(DataOffset,FTopItemSpace);
456       end;
457       if i=FLastIndex then
458         dec(DataCount,FLastItemSpace);
459       debugln([i,' Item=',HexStr({%H-}PtrUInt(FItems[i]),8),' Size=',fItems[i]^.Size,' Start=',DataOffset,' Count=',DataCount]);
460       if WriteData then begin
461         debugln(dbgMemRange(PByte(@FItems[i]^.Data)+DataOffset,DataCount));
462       end;
463 
464       if i=FLastIndex then break;
465       inc(i);
466       if i=FItemCapacity then i:=0;
467     until false;
468   end;
469 end;
470 
471 end.
472