1 {
2 *****************************************************************************
3 This file is part of LazUtils.
4
5 See the file COPYING.modifiedLGPL.txt, included in this distribution,
6 for details about the license.
7 *****************************************************************************
8
9 Abstract:
10 A dynamic data queue to push and pop arbitrary data.
11 }
12 unit DynQueue;
13
14 {$mode objfpc}{$H+}
15
16 interface
17
18 uses
19 Classes, SysUtils,
20 // LazUtils
21 LazLoggerBase;
22
23 type
24 TDynamicQueueItem = record
25 Size: integer;
26 Data: array[0..0] of integer;// type is irrelevant, the record is open ended
27 end;
28 PDynamicQueueItem = ^TDynamicQueueItem;
29 ListOfPDynamicQueueItem = ^PDynamicQueueItem;
30
31 { TDynamicDataQueue
32 A queue for arbitrary data. That means first in first out.
33
34 Push: put data in the queue
35 Pop: fetch data from the queue (data is removed from queue)
36 Top: read data in the queue (data remains in the queue)
37
38 This queue maintains internally a ring queue for pointers to data chunks of
39 TDynamicQueueItem. It is optimised to reduce the amount of data movement. }
40
41 TDynamicDataQueue = class
42 private
43 FItems: ListOfPDynamicQueueItem; // ring queue from FTopIndex to FLastIndex
44 FItemCapacity: integer; // length of ListOfPDynamicQueueItem
45 FTopIndex: integer; // first item in FItems
46 FLastIndex: integer; // last item in FItems
47 FMaximumBlockSize: integer;
48 FMinimumBlockSize: integer;
49 FSize: int64;
50 FTopItemSpace: integer; // space in top item
51 FLastItemSpace: integer; // remaining space in last item
52 procedure SetMaximumBlockSize(const AValue: integer);
53 procedure SetMinimumBlockSize(const AValue: integer);
54 procedure GrowItems;
55 procedure AddItem(ItemSize: integer);
CalculateItemSizenull56 function CalculateItemSize(ItemSize: integer): integer;
PushInternalnull57 function PushInternal(Source: PByte; AStream: TStream; Count: integer): integer;// add to end of queue
PopTopInternalnull58 function PopTopInternal(Dest: PByte; AStream: TStream; Count: integer; KeepData: Boolean): integer;// read from start of queue, remove from queue
59 public
60 constructor Create;
61 destructor Destroy; override;
62 procedure Clear;
63 procedure ConsistencyCheck;
64 procedure WriteDebugReport(WriteData: Boolean);
Pushnull65 function Push(const Buffer; Count: integer): integer;// add to end of queue
Pushnull66 function Push(AStream: TStream; Count: integer): integer;// add to end of queue
Popnull67 function Pop(var Buffer; Count: integer): integer; // read from start of queue, remove from queue
Popnull68 function Pop(AStream: TStream; Count: integer): integer;// read from start of queue, remove from queue
Topnull69 function Top(var Buffer; Count: integer): integer; // read from start of queue, keep data
Topnull70 function Top(AStream: TStream; Count: integer): integer;// read from start of queue, keep data
71 property Size: int64 read FSize;
72 property MinimumBlockSize: integer read FMinimumBlockSize write SetMinimumBlockSize;
73 property MaximumBlockSize: integer read FMaximumBlockSize write SetMaximumBlockSize;
74 end;
75
76 implementation
77
78 { TDynamicDataQueue }
79
80 procedure TDynamicDataQueue.SetMinimumBlockSize(const AValue: integer);
81 begin
82 if (FMinimumBlockSize=AValue) then exit;
83 FMinimumBlockSize:=AValue;
84 if FMinimumBlockSize<16 then FMinimumBlockSize:=16;
85 if FMaximumBlockSize<FMinimumBlockSize then
86 FMaximumBlockSize:=FMinimumBlockSize;
87 end;
88
89 procedure TDynamicDataQueue.GrowItems;
90 var
91 NewCapacity: LongInt;
92 NewSize: Integer;
93 NewItems: ListOfPDynamicQueueItem;
94 DestIndex: Integer;
95 SrcIndex: LongInt;
96 begin
97 // allocate a new ring queue
98 NewCapacity:=FItemCapacity;
99 if NewCapacity<8 then
100 NewCapacity:=8
101 else
102 NewCapacity:=NewCapacity*2;
103 NewSize:=NewCapacity*SizeOf(Pointer);
104 GetMem(NewItems,NewSize);
105 FillChar(NewItems^,NewSize,0);
106
107 // copy old items
108 DestIndex:=0;
109 if FItems<>nil then begin
110 SrcIndex:=FTopIndex;
111 repeat
112 NewItems[DestIndex]:=FItems[SrcIndex];
113 if SrcIndex=FLastIndex then break;
114 inc(DestIndex);
115 inc(SrcIndex);
116 if SrcIndex=FItemCapacity then
117 SrcIndex:=0;
118 until false;
119 FreeMem(FItems);
120 end;
121 FTopIndex:=0;
122 FLastIndex:=DestIndex;
123 FItems:=NewItems;
124 FItemCapacity:=NewCapacity;
125 end;
126
127 procedure TDynamicDataQueue.AddItem(ItemSize: integer);
128 var
129 NewIndex: Integer;
130
131 procedure RaiseInconsistency;
132 begin
133 raise Exception.Create('TDynamicDataQueue.AddItem NewIndex='+IntToStr(NewIndex));
134 end;
135
136 begin
137 // check that there is space for the new item
138 NewIndex:=FLastIndex;
139 if (FItems<>nil) and (FItems[NewIndex]<>nil) then begin
140 inc(NewIndex);
141 if NewIndex>=FItemCapacity then
142 NewIndex:=0;
143 end;
144 if NewIndex=FTopIndex then begin
145 GrowItems;
146 NewIndex:=FLastIndex;
147 if FItems[NewIndex]<>nil then begin
148 inc(NewIndex);
149 if NewIndex>=FItemCapacity then
150 NewIndex:=0;
151 end;
152 end;
153 if (FItems=nil) then RaiseInconsistency;
154 if (FItems[NewIndex]<>nil) then RaiseInconsistency;
155
156 FLastIndex:=NewIndex;
157 GetMem(FItems[FLastIndex],SizeOf(TDynamicQueueItem.Size)+ItemSize);
158 FItems[FLastIndex]^.Size:=ItemSize;
159 end;
160
CalculateItemSizenull161 function TDynamicDataQueue.CalculateItemSize(ItemSize: integer): integer;
162 begin
163 Result:=ItemSize;
164 if Result<MinimumBlockSize then
165 Result:=MinimumBlockSize;
166 if Result>MaximumBlockSize then
167 Result:=MaximumBlockSize;
168 end;
169
PushInternalnull170 function TDynamicDataQueue.PushInternal(Source: PByte; AStream: TStream;
171 Count: integer): integer;
172 var
173 CurCount: PtrInt;
174 NewItemSize: LongInt;
175 LastItem: PDynamicQueueItem;
176 Dest: Pointer;
177 begin
178 Result:=0;
179 if Count<=0 then exit;
180 while true do begin
181 while FLastItemSpace>0 do begin
182 // fill the last item
183 CurCount:=Count;
184 if CurCount>FLastItemSpace then
185 CurCount:=FLastItemSpace;
186 LastItem:=FItems[FLastIndex];
187 Dest:=Pointer(@(LastItem^.Data))+LastItem^.Size-FLastItemSpace;
188
189 // beware: read from a stream can raise an exception
190 if Source<>nil then
191 System.Move(Source[Result],Dest^,CurCount)
192 else
193 CurCount:=AStream.Read(Dest^,CurCount);
194 if CurCount<=0 then exit;
195
196 // transfer succeeded
197 dec(FLastItemSpace,CurCount); // space decreased
198 inc(fSize,CurCount); // Queue increased
199 inc(Result,CurCount); // bytes transferred
200 dec(Count,CurCount); // less to transfer
201 if Count=0 then exit;
202 end;
203 // add new
204 NewItemSize:=CalculateItemSize(Count);
205 AddItem(NewItemSize);
206 FLastItemSpace:=NewItemSize;
207 end;
208 end;
209
PopTopInternalnull210 function TDynamicDataQueue.PopTopInternal(Dest: PByte; AStream: TStream;
211 Count: integer; KeepData: Boolean): integer;
212
213 procedure RaiseInconsistencySizeNot0;
214 begin
215 raise Exception.Create('TDynamicDataQueue.PopTopInternal inconsistency size<>0');
216 end;
217
218 procedure RaiseInconsistencyEmptyItem;
219 begin
220 raise Exception.Create('TDynamicDataQueue.PopTopInternal inconsistency empty item');
221 end;
222
223 procedure RaiseInconsistencySizeNegative;
224 begin
225 raise Exception.Create('TDynamicDataQueue.PopTopInternal inconsistency size<0');
226 end;
227
228 var
229 Item: PDynamicQueueItem;
230 CurCount: Integer;
231 Source: PByte;
232 CurItemSize: LongInt;
233 ReadIndex: LongInt;
234 TransferredCount: LongInt;
235 begin
236 Result:=0;
237 if Count<=0 then exit;
238 ReadIndex:=FTopIndex;
239
240 while Count>0 do begin
241 if FItems=nil then exit; // no data
242
243 Item:=FItems[ReadIndex];
244 CurItemSize:=Item^.Size;
245 if ReadIndex=FLastIndex then
246 dec(CurItemSize,FLastItemSpace);
247 CurCount:=CurItemSize;
248 if ReadIndex=FTopIndex then
249 dec(CurCount,FTopItemSpace);
250 if CurCount<=0 then
251 RaiseInconsistencyEmptyItem;
252
253 // copy data from the TopItem
254 if CurCount>Count then
255 CurCount:=Count;
256 Source:=PByte(@Item^.Data);
257 if ReadIndex=FTopIndex then
258 inc(Source,FTopItemSpace);
259
260 // beware: writing to a stream can raise an exception
261 if Dest<>nil then begin
262 System.Move(Source^,Dest[Result],CurCount);
263 TransferredCount:=CurCount;
264 end else
265 TransferredCount:=AStream.Write(Dest^,CurCount);
266 if TransferredCount<=0 then
267 exit;
268
269 // transfer succeeded (at least partially)
270 inc(Result,TransferredCount); // bytes transferred
271 dec(Count,TransferredCount); // less to transfer
272 if (not KeepData) then begin
273 dec(FSize,TransferredCount); // Queue decreased
274 if FSize<0 then RaiseInconsistencySizeNegative;
275
276 if (ReadIndex=FTopIndex) then begin
277 inc(FTopItemSpace,TransferredCount); // space in top item increased
278
279 if (FTopItemSpace=CurItemSize) then begin
280 // item complete -> remove item
281 FreeMem(Item);
282 FItems[FTopIndex]:=nil;
283 if FTopIndex=FLastIndex then begin
284 // complete queue read
285 if Size<>0 then RaiseInconsistencySizeNot0;
286 Clear;
287 exit;
288 end;
289
290 FTopItemSpace:=0;
291 inc(FTopIndex);
292 if FTopIndex=FItemCapacity then FTopIndex:=0;
293 end;
294 end;
295 end;
296 if (Count=0) or (TransferredCount<CurCount) then exit;
297
298 if TransferredCount=CurCount then begin
299 // next item
300 inc(ReadIndex);
301 if ReadIndex=FItemCapacity then ReadIndex:=0;
302 end;
303 end;
304 end;
305
306 procedure TDynamicDataQueue.SetMaximumBlockSize(const AValue: integer);
307 begin
308 if FMaximumBlockSize=AValue then exit;
309 FMaximumBlockSize:=AValue;
310 if FMaximumBlockSize<FMinimumBlockSize then
311 FMaximumBlockSize:=FMinimumBlockSize;
312 end;
313
314 constructor TDynamicDataQueue.Create;
315 begin
316 FMinimumBlockSize:=512;
317 FMaximumBlockSize:=4096;
318 end;
319
320 destructor TDynamicDataQueue.Destroy;
321 begin
322 Clear;
323 inherited Destroy;
324 end;
325
TDynamicDataQueue.Pushnull326 function TDynamicDataQueue.Push(const Buffer; Count: integer): integer;
327 begin
328 Result:=PushInternal(PByte(@Buffer),nil,Count);
329 end;
330
TDynamicDataQueue.Pushnull331 function TDynamicDataQueue.Push(AStream: TStream; Count: integer): integer;
332 begin
333 Result:=PushInternal(nil,AStream,Count);
334 end;
335
Popnull336 function TDynamicDataQueue.Pop(var Buffer; Count: integer): integer;
337 begin
338 Result:=PopTopInternal(PByte(@Buffer),nil,Count,false);
339 end;
340
Popnull341 function TDynamicDataQueue.Pop(AStream: TStream; Count: integer): integer;
342 begin
343 Result:=PopTopInternal(nil,AStream,Count,false);
344 end;
345
Topnull346 function TDynamicDataQueue.Top(var Buffer; Count: integer): integer;
347 begin
348 Result:=PopTopInternal(PByte(@Buffer),nil,Count,true);
349 end;
350
Topnull351 function TDynamicDataQueue.Top(AStream: TStream; Count: integer): integer;
352 begin
353 Result:=PopTopInternal(nil,AStream,Count,true);
354 end;
355
356 procedure TDynamicDataQueue.Clear;
357 begin
358 while FTopIndex<>FLastIndex do begin
359 FreeMem(FItems[FTopIndex]);
360 inc(FTopIndex);
361 if FTopIndex=FItemCapacity then
362 FTopIndex:=0;
363 end;
364 FTopIndex:=0;
365 FLastIndex:=0;
366 FSize:=0;
367 FreeMem(FItems);
368 FItems:=nil;
369 FItemCapacity:=0;
370 FTopItemSpace:=0;
371 FLastItemSpace:=0;
372 end;
373
374 procedure TDynamicDataQueue.ConsistencyCheck;
375
376 procedure Error(const Msg: string);
377 begin
378 raise Exception.Create('TDynamicDataQueue.ConsistencyCheck '+Msg);
379 end;
380
381 var
382 i: LongInt;
383 RealSize: int64;
384 CurSize: LongInt;
385 begin
386 if Size<0 then Error('');
387 if FMinimumBlockSize>FMaximumBlockSize then Error('');
388 if FMinimumBlockSize<16 then Error('');
389 if (FItems=nil) then begin
390 if Size<>0 then Error('');
391 end else begin
392 if FItemCapacity<=0 then Error('');
393 if Size=0 then Error('');
394 if FTopIndex<0 then Error('');
395 if FLastIndex<0 then Error('');
396 if FTopIndex>=FItemCapacity then Error('');
397 if FLastIndex>=FItemCapacity then Error('');
398
399 // check used items
400 RealSize:=0;
401 i:=FTopIndex;
402 repeat
403 if FItems[i]=nil then Error('');
404 if FItems[i]^.Size<=0 then Error('');
405 CurSize:=FItems[i]^.Size;
406 if FTopIndex=i then
407 dec(CurSize,FTopItemSpace);
408 if FLastIndex=i then
409 dec(CurSize,FLastItemSpace);
410 inc(RealSize,CurSize);
411 if i=FLastIndex then break;
412 inc(i);
413 if i=FItemCapacity then i:=0;
414 until false;
415 if RealSize<>Size then Error('');
416
417 // check unused items
418 inc(i);
419 if i=FItemCapacity then i:=0;
420 while (i<>FTopIndex) do begin
421 if FItems[i]<>nil then Error('');
422 inc(i);
423 if i=FItemCapacity then i:=0;
424 end;
425
426 // check space
427 if FLastItemSpace<0 then Error('');
428 if FItems[FLastIndex]^.Size<=FLastItemSpace then Error('');
429 if FTopItemSpace<0 then Error('');
430 if FItems[FTopIndex]^.Size<=FTopItemSpace then Error('');
431 if (FTopIndex=FLastIndex)
432 and (FTopItemSpace>=FItems[FTopIndex]^.Size-FLastItemSpace) then Error('');
433 end;
434 end;
435
436 procedure TDynamicDataQueue.WriteDebugReport(WriteData: Boolean);
437 var
438 i: LongInt;
439 DataCount: LongInt;
440 DataOffset: Integer;
441 begin
442 DebugLn(['TDynamicDataQueue.WriteDebugReport FItemCapacity=',FItemCapacity,
443 ' FTopIndex=',FTopIndex,' FTopItemSpace=',FTopItemSpace,
444 ' FLastIndex=',FLastIndex,' FLastItemSpace=',FLastItemSpace,
445 ' Size=',Size,
446 ' MinimumBlockSize=',MinimumBlockSize,
447 ' MaximumBlockSize=',MaximumBlockSize]);
448 if FItems<>nil then begin
449 i:=FTopIndex;
450 repeat
451 DataCount:=FItems[i]^.Size;
452 DataOffset:=0;
453 if FTopIndex=i then begin
454 dec(DataCount,FTopItemSpace);
455 inc(DataOffset,FTopItemSpace);
456 end;
457 if i=FLastIndex then
458 dec(DataCount,FLastItemSpace);
459 debugln([i,' Item=',HexStr({%H-}PtrUInt(FItems[i]),8),' Size=',fItems[i]^.Size,' Start=',DataOffset,' Count=',DataCount]);
460 if WriteData then begin
461 debugln(dbgMemRange(PByte(@FItems[i]^.Data)+DataOffset,DataCount));
462 end;
463
464 if i=FLastIndex then break;
465 inc(i);
466 if i=FItemCapacity then i:=0;
467 until false;
468 end;
469 end;
470
471 end.
472