1 /**************************************************************************/
2 /* */
3 /* Copyright (c) 2001, 2010 NoMachine, http://www.nomachine.com/. */
4 /* */
5 /* NXCOMP, NX protocol compression and NX extensions to this software */
6 /* are copyright of NoMachine. Redistribution and use of the present */
7 /* software is allowed according to terms specified in the file LICENSE */
8 /* which comes in the source distribution. */
9 /* */
10 /* Check http://www.nomachine.com/licensing.html for applicability. */
11 /* */
12 /* NX and NoMachine are trademarks of Medialogic S.p.A. */
13 /* */
14 /* All rights reserved. */
15 /* */
16 /**************************************************************************/
17
18 #include <unistd.h>
19 #include <cstring>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22 #include <utime.h>
23
24 #include "Misc.h"
25
26 #include "Split.h"
27
28 #include "Control.h"
29 #include "Statistics.h"
30
31 #include "EncodeBuffer.h"
32 #include "DecodeBuffer.h"
33
34 #include "StaticCompressor.h"
35
36 #include "Unpack.h"
37
38 //
39 // Set the verbosity level.
40 //
41
42 #define PANIC
43 #define WARNING
44 #undef TEST
45 #undef DEBUG
46 #undef DUMP
47
48 //
49 // Define this to trace elements
50 // allocated and deallocated.
51 //
52
53 #undef REFERENCES
54
55 //
56 // Counters used for store control.
57 //
58
59 int SplitStore::totalSplitSize_;
60 int SplitStore::totalSplitStorageSize_;
61
62 //
63 // This is used for reference count.
64 //
65
66 #ifdef REFERENCES
67
68 int Split::references_ = 0;
69
70 #endif
71
Split()72 Split::Split()
73 {
74 resource_ = nothing;
75 position_ = nothing;
76
77 store_ = NULL;
78
79 d_size_ = 0;
80 i_size_ = 0;
81 c_size_ = 0;
82 r_size_ = 0;
83
84 next_ = 0;
85 load_ = 0;
86 save_ = 0;
87
88 checksum_ = NULL;
89 state_ = split_undefined;
90 mode_ = split_none;
91 action_ = is_discarded;
92
93 #ifdef REFERENCES
94
95 references_++;
96
97 *logofs << "Split: Created new Split at "
98 << this << " out of " << references_
99 << " allocated references.\n" << logofs_flush;
100 #endif
101 }
102
~Split()103 Split::~Split()
104 {
105 delete [] checksum_;
106
107 #ifdef REFERENCES
108
109 references_--;
110
111 *logofs << "Split: Deleted Split at "
112 << this << " out of " << references_
113 << " allocated references.\n" << logofs_flush;
114 #endif
115 }
116
SplitStore(StaticCompressor * compressor,CommitStore * commits,int resource)117 SplitStore::SplitStore(StaticCompressor *compressor, CommitStore *commits, int resource)
118
119 : compressor_(compressor), commits_(commits), resource_(resource)
120 {
121 splits_ = new T_splits();
122
123 current_ = splits_ -> end();
124
125 splitStorageSize_ = 0;
126
127 #ifdef TEST
128 *logofs << "SplitStore: Created new store [";
129
130 if (resource_ != nothing)
131 {
132 *logofs << resource_;
133 }
134 else
135 {
136 *logofs << "commit";
137 }
138
139 *logofs << "].\n" << logofs_flush;
140
141 *logofs << "SplitStore: Total messages in stores are "
142 << totalSplitSize_ << " with total storage size "
143 << totalSplitStorageSize_ << ".\n"
144 << logofs_flush;
145 #endif
146 }
147
~SplitStore()148 SplitStore::~SplitStore()
149 {
150 totalSplitSize_ -= splits_ -> size();
151
152 totalSplitStorageSize_ -= splitStorageSize_;
153
154 for (T_splits::iterator i = splits_ -> begin();
155 i != splits_ -> end(); i++)
156 {
157 delete *i;
158 }
159
160 delete splits_;
161
162 #ifdef TEST
163 *logofs << "SplitStore: Deleted store [";
164
165 if (resource_ != nothing)
166 {
167 *logofs << resource_;
168 }
169 else
170 {
171 *logofs << "commit";
172 }
173
174 *logofs << "] with storage size " << splitStorageSize_
175 << ".\n" << logofs_flush;
176
177 *logofs << "SplitStore: Total messages in stores are "
178 << totalSplitSize_ << " with total storage size "
179 << totalSplitStorageSize_ << ".\n"
180 << logofs_flush;
181 #endif
182 }
183
184 //
185 // This is called at the encoding side.
186 //
187
add(MessageStore * store,int resource,T_split_mode mode,int position,T_store_action action,T_checksum checksum,const unsigned char * buffer,const int size)188 Split *SplitStore::add(MessageStore *store, int resource, T_split_mode mode,
189 int position, T_store_action action, T_checksum checksum,
190 const unsigned char *buffer, const int size)
191 {
192 #ifdef TEST
193 *logofs << "SplitStore: Adding message [" << (unsigned int) store ->
194 opcode() << "] resource " << resource << " mode " << mode
195 << " position " << position << " action [" << DumpAction(action)
196 << "] and checksum [" << DumpChecksum(checksum) << "]"
197 << ".\n" << logofs_flush;
198 #endif
199
200 Split *split = new Split();
201
202 if (split == NULL)
203 {
204 #ifdef PANIC
205 *logofs << "SplitStore: PANIC! Can't allocate "
206 << "memory for the split.\n"
207 << logofs_flush;
208 #endif
209
210 cerr << "Error" << ": Can't allocate memory "
211 << "for the split.\n";
212
213 HandleAbort();
214 }
215
216 split -> store_ = store;
217 split -> resource_ = resource;
218 split -> mode_ = mode;
219 split -> position_ = position;
220 split -> action_ = action;
221
222 split -> store_ -> validateSize(size);
223
224 //
225 // The checksum is not provided if the
226 // message is cached.
227 //
228
229 if (checksum != NULL)
230 {
231 split -> checksum_ = new md5_byte_t[MD5_LENGTH];
232
233 memcpy(split -> checksum_, checksum, MD5_LENGTH);
234 }
235
236 //
237 // We don't need the identity data at the
238 // encoding side. This qualifies the split
239 // as a split generated at the encoding
240 // side.
241 //
242
243 split -> i_size_ = store -> identitySize(buffer, size);
244
245 split -> d_size_ = size - split -> i_size_;
246
247 if (action == IS_ADDED || action == is_discarded)
248 {
249 //
250 // If the message was added to message
251 // store or discarded we need to save
252 // the real data so we can transfer it
253 // at later time.
254 //
255
256 split -> data_.resize(split -> d_size_);
257
258 memcpy(split -> data_.begin(), buffer + split -> i_size_, split -> d_size_);
259
260 //
261 // If the message was added, lock it so
262 // it will not be used by the encoding
263 // side until it is recomposed.
264 //
265
266 if (action == IS_ADDED)
267 {
268 split -> store_ -> lock(split -> position_);
269
270 #ifdef TEST
271
272 commits_ -> validate(split);
273
274 #endif
275 }
276 }
277 #ifdef WARNING
278 else
279 {
280 *logofs << "SplitStore: WARNING! Not copying data for the cached message.\n"
281 << logofs_flush;
282 }
283 #endif
284
285 push(split);
286
287 return split;
288 }
289
290 //
291 // This is called at decoding side. If checksum
292 // is provided, the message can be searched on
293 // disk, then, if message is found, an event is
294 // sent to abort the data transfer.
295 //
296
add(MessageStore * store,int resource,int position,T_store_action action,T_checksum checksum,unsigned char * buffer,const int size)297 Split *SplitStore::add(MessageStore *store, int resource, int position,
298 T_store_action action, T_checksum checksum,
299 unsigned char *buffer, const int size)
300 {
301 #ifdef TEST
302 *logofs << "SplitStore: Adding message ["
303 << (unsigned int) store -> opcode() << "] resource "
304 << resource << " position " << position << " action ["
305 << DumpAction(action) << "] and checksum ["
306 << DumpChecksum(checksum) << "].\n" << logofs_flush;
307 #endif
308
309 Split *split = new Split();
310
311 if (split == NULL)
312 {
313 #ifdef PANIC
314 *logofs << "SplitStore: PANIC! Can't allocate "
315 << "memory for the split.\n"
316 << logofs_flush;
317 #endif
318
319 cerr << "Error" << ": Can't allocate memory "
320 << "for the split.\n";
321
322 HandleAbort();
323 }
324
325 split -> store_ = store;
326 split -> resource_ = resource;
327 split -> position_ = position;
328 split -> action_ = action;
329
330 split -> store_ -> validateSize(size);
331
332 //
333 // Check if the checksum was provided
334 // by the remote.
335 //
336
337 if (checksum != NULL)
338 {
339 split -> checksum_ = new md5_byte_t[MD5_LENGTH];
340
341 memcpy(split -> checksum_, checksum, MD5_LENGTH);
342 }
343
344 split -> i_size_ = store -> identitySize(buffer, size);
345
346 //
347 // Copy the identity so we can expand the
348 // message when it is committed.
349 //
350
351 split -> identity_.resize(split -> i_size_);
352
353 memcpy(split -> identity_.begin(), buffer, split -> i_size_);
354
355 split -> d_size_ = size - split -> i_size_;
356
357 if (action == IS_ADDED || action == is_discarded)
358 {
359 //
360 // The unpack procedure will check if the
361 // first 2 bytes of the buffer contain the
362 // pattern and will not try to expand the
363 // image.
364 //
365
366 split -> data_.resize(2);
367
368 unsigned char *data = split -> data_.begin();
369
370 data[0] = SPLIT_PATTERN;
371 data[1] = SPLIT_PATTERN;
372
373 //
374 // If the message was added to the store,
375 // we don't have the data part, yet, so
376 // we need to lock the message until it
377 // is recomposed.
378 //
379
380 if (action == IS_ADDED)
381 {
382 split -> store_ -> lock(split -> position_);
383
384 #ifdef TEST
385
386 commits_ -> validate(split);
387
388 #endif
389 }
390 }
391 else
392 {
393 #ifdef WARNING
394 *logofs << "SplitStore: WARNING! Copying data for the cached message.\n"
395 << logofs_flush;
396 #endif
397
398 //
399 // We may optionally take the data from the
400 // message store in compressed form, but,
401 // as the data has been decompressed in the
402 // buffer, we save a further decompression.
403 //
404
405 split -> data_.resize(split -> d_size_);
406
407 memcpy(split -> data_.begin(), buffer + split -> i_size_, split -> d_size_);
408 }
409
410 push(split);
411
412 return split;
413 }
414
push(Split * split)415 void SplitStore::push(Split *split)
416 {
417 splits_ -> push_back(split);
418
419 splitStorageSize_ += getNodeSize(split);
420
421 totalSplitSize_++;
422
423 totalSplitStorageSize_ += getNodeSize(split);
424
425 statistics -> addSplit();
426
427 #ifdef TEST
428 *logofs << "SplitStore: There are " << splits_ -> size()
429 << " messages in store [" << resource_ << "] with "
430 << "storage size " << splitStorageSize_ << ".\n"
431 << logofs_flush;
432
433 *logofs << "SplitStore: Total messages in stores are "
434 << totalSplitSize_ << " with total storage size "
435 << totalSplitStorageSize_ << ".\n"
436 << logofs_flush;
437 #endif
438
439 split -> state_ = split_added;
440 }
441
dump()442 void SplitStore::dump()
443 {
444 #ifdef DUMP
445
446 int n;
447
448 Split *split;
449
450 *logofs << "SplitStore: DUMP! Dumping content of ";
451
452 if (commits_ == NULL)
453 {
454 *logofs << "[commits]";
455 }
456 else
457 {
458 *logofs << "[splits] for store [" << resource_ << "]";
459 }
460
461 *logofs << " with [" << getSize() << "] elements "
462 << "in the store.\n" << logofs_flush;
463
464 n = 0;
465
466 for (T_splits::iterator i = splits_ -> begin(); i != splits_ -> end(); i++, n++)
467 {
468 split = *i;
469
470 *logofs << "SplitStore: DUMP! Split [" << n << "] has action ["
471 << DumpAction(split -> action_) << "] state ["
472 << DumpState(split -> state_) << "] ";
473
474 if (split -> resource_ >= 0)
475 {
476 *logofs << "resource " << split -> resource_;
477 }
478
479 *logofs << " request " << (unsigned) split -> store_ -> opcode()
480 << " position " << split -> position_ << " size is "
481 << split -> data_.size() << " (" << split -> d_size_
482 << "/" << split -> c_size_ << "/" << split -> r_size_
483 << ") with " << split -> data_.size() - split -> next_
484 << "] bytes to go.\n" << logofs_flush;
485 }
486
487 #endif
488 }
489
send(EncodeBuffer & encodeBuffer,int packetSize)490 int SplitStore::send(EncodeBuffer &encodeBuffer, int packetSize)
491 {
492 if (splits_ -> size() == 0)
493 {
494 #ifdef PANIC
495 *logofs << "SplitStore: PANIC! Function send called with no splits available.\n"
496 << logofs_flush;
497 #endif
498
499 cerr << "Error" << ": Function send called with no splits available.\n";
500
501 HandleAbort();
502 }
503
504 //
505 // A start operation must always be executed on
506 // the split, even in the case the split will be
507 // later aborted.
508 //
509
510 if (current_ == splits_ -> end())
511 {
512 start(encodeBuffer);
513 }
514
515 //
516 // If we have matched the checksum received from
517 // the remote side then we must abort the current
518 // split, else we can send another block of data
519 // to the remote peer.
520 //
521
522 Split *split = *current_;
523
524 unsigned int abort = 0;
525
526 if (split -> state_ == split_loaded)
527 {
528 abort = 1;
529 }
530
531 encodeBuffer.encodeBoolValue(abort);
532
533 if (abort == 1)
534 {
535 #ifdef TEST
536 *logofs << "SplitStore: Aborting split for checksum ["
537 << DumpChecksum(split -> checksum_) << "] position "
538 << split -> position_ << " with " << (split ->
539 data_.size() - split -> next_) << " bytes to go "
540 << "out of " << split -> data_.size()
541 << ".\n" << logofs_flush;
542 #endif
543
544 statistics -> addSplitAborted();
545
546 statistics -> addSplitAbortedBytesOut(split -> data_.size() - split -> next_);
547
548 split -> next_ = split -> data_.size();
549
550 split -> state_ = split_aborted;
551 }
552 else
553 {
554 int count = (packetSize <= 0 || split -> next_ +
555 packetSize > (int) split -> data_.size() ?
556 split -> data_.size() - split -> next_ : packetSize);
557
558 #ifdef TEST
559 *logofs << "SplitStore: Sending split for checksum ["
560 << DumpChecksum(split -> checksum_) << "] count "
561 << count << " position " << split -> position_
562 << ". Data size is " << split -> data_.size() << " ("
563 << split -> d_size_ << "/" << split -> c_size_ << "), "
564 << split -> data_.size() - (split -> next_ + count)
565 << " to go.\n" << logofs_flush;
566 #endif
567
568 encodeBuffer.encodeValue(count, 32, 10);
569
570 encodeBuffer.encodeMemory(split -> data_.begin() + split -> next_, count);
571
572 split -> next_ += count;
573 }
574
575 //
576 // Was data completely transferred? We are the
577 // sending side. We must update the message in
578 // store, even if split was aborted.
579 //
580
581 if (split -> next_ != ((int) split -> data_.size()))
582 {
583 return 0;
584 }
585
586 //
587 // Move the split at the head of the
588 // list to the commits.
589 //
590
591 remove(split);
592
593 //
594 // Reset current position to the
595 // end of repository.
596 //
597
598 current_ = splits_ -> end();
599
600 #ifdef TEST
601 *logofs << "SplitStore: Removed split at head of the list. "
602 << "Resource is " << split -> resource_ << " request "
603 << (unsigned) split -> store_ -> opcode() << " position "
604 << split -> position_ << ".\n" << logofs_flush;
605 #endif
606
607 return 1;
608 }
609
start(EncodeBuffer & encodeBuffer)610 int SplitStore::start(EncodeBuffer &encodeBuffer)
611 {
612 //
613 // Get the element at the top of the
614 // list.
615 //
616
617 current_ = splits_ -> begin();
618
619 Split *split = *current_;
620
621 #ifdef TEST
622 *logofs << "SplitStore: Starting split for checksum ["
623 << DumpChecksum(split -> checksum_) << "] position "
624 << split -> position_ << " with " << (split ->
625 data_.size() - split -> next_) << " bytes to go "
626 << "out of " << split -> data_.size()
627 << ".\n" << logofs_flush;
628 #endif
629
630 //
631 // See if compression of the data part is
632 // enabled.
633 //
634
635 if (split -> store_ -> enableCompress)
636 {
637 //
638 // If the split is going to be aborted don't
639 // compress the data and go straight to the
640 // send. The new data size will be assumed
641 // from the disk cache.
642 //
643
644 if (split -> state_ != split_loaded)
645 {
646 unsigned int compressedSize = 0;
647 unsigned char *compressedData = NULL;
648
649 if (control -> LocalDataCompression &&
650 (compressor_ -> compressBuffer(split -> data_.begin(), split -> d_size_,
651 compressedData, compressedSize)))
652 {
653 //
654 // Replace the data with the one in
655 // compressed form.
656 //
657
658 #ifdef TEST
659 *logofs << "SplitStore: Split data of size " << split -> d_size_
660 << " has been compressed to " << compressedSize
661 << " bytes.\n" << logofs_flush;
662 #endif
663
664 split -> data_.clear();
665
666 split -> data_.resize(compressedSize);
667
668 memcpy(split -> data_.begin(), compressedData, compressedSize);
669
670 split -> c_size_ = compressedSize;
671
672 //
673 // Inform our peer that the data is
674 // compressed and send the new size.
675 //
676
677 encodeBuffer.encodeBoolValue(1);
678
679 encodeBuffer.encodeValue(compressedSize, 32, 14);
680
681 #ifdef TEST
682 *logofs << "SplitStore: Signaled " << split -> c_size_
683 << " bytes of compressed data for this message.\n"
684 << logofs_flush;
685 #endif
686
687 return 1;
688 }
689 }
690 #ifdef TEST
691 else
692 {
693 *logofs << "SplitStore: Not trying to compress the "
694 << "loaded message.\n" << logofs_flush;
695 }
696 #endif
697
698 //
699 // Tell to the remote that data will
700 // follow uncompressed.
701 //
702
703 encodeBuffer.encodeBoolValue(0);
704 }
705
706 return 1;
707 }
708
start(DecodeBuffer & decodeBuffer)709 int SplitStore::start(DecodeBuffer &decodeBuffer)
710 {
711 #ifdef TEST
712 *logofs << "SplitStore: Going to receive a new split from the remote side.\n"
713 << logofs_flush;
714 #endif
715
716 //
717 // Get the element at the head
718 // of the list.
719 //
720
721 current_ = splits_ -> begin();
722
723 Split *split = *current_;
724
725 unsigned int compressedSize = 0;
726
727 //
728 // Save the data size known by the remote.
729 // This information will be needed if the
730 // remote will not have a chance to abort
731 // the split.
732 //
733
734 split -> r_size_ = split -> d_size_;
735
736 //
737 // Find out if data was compressed by the
738 // remote.
739 //
740
741 if (split -> store_ -> enableCompress)
742 {
743 decodeBuffer.decodeBoolValue(compressedSize);
744
745 if (compressedSize == 1)
746 {
747 //
748 // Get the compressed size.
749 //
750
751 if (control -> isProtoStep7() == 1)
752 {
753 decodeBuffer.decodeValue(compressedSize, 32, 14);
754 }
755 else
756 {
757 //
758 // As we can't refuse to handle the decoding
759 // of the split message when connected to an
760 // old proxy version, we need to decode this
761 // in a way that is compatible.
762 //
763
764 unsigned int diffSize;
765
766 decodeBuffer.decodeValue(diffSize, 32, 14);
767
768 split -> store_ -> lastResize += diffSize;
769
770 compressedSize = split -> store_ -> lastResize;
771 }
772
773 split -> store_ -> validateSize(split -> d_size_, compressedSize);
774
775 split -> r_size_ = compressedSize;
776 }
777 }
778
779 //
780 // Update the size if the split
781 // was not already loaded.
782 //
783
784 if (split -> state_ != split_loaded)
785 {
786 split -> data_.clear();
787
788 if (compressedSize > 0)
789 {
790 split -> c_size_ = compressedSize;
791
792 #ifdef TEST
793 *logofs << "SplitStore: Split data of size "
794 << split -> d_size_ << " was compressed to "
795 << split -> c_size_ << " bytes.\n"
796 << logofs_flush;
797 #endif
798
799 split -> data_.resize(split -> c_size_);
800 }
801 else
802 {
803 split -> data_.resize(split -> d_size_);
804 }
805
806 unsigned char *data = split -> data_.begin();
807
808 data[0] = SPLIT_PATTERN;
809 data[1] = SPLIT_PATTERN;
810 }
811 #ifdef TEST
812 else
813 {
814 //
815 // The message had been already
816 // loaded from disk.
817 //
818
819 if (compressedSize > 0)
820 {
821 if ((int) compressedSize != split -> c_size_)
822 {
823 *logofs << "SplitStore: WARNING! Compressed data size is "
824 << "different than the loaded compressed size.\n"
825 << logofs_flush;
826 }
827
828 *logofs << "SplitStore: Ignoring the new size with "
829 << "loaded compressed size " << split -> c_size_
830 << ".\n" << logofs_flush;
831 }
832 }
833 #endif
834
835 return 1;
836 }
837
receive(DecodeBuffer & decodeBuffer)838 int SplitStore::receive(DecodeBuffer &decodeBuffer)
839 {
840 if (splits_ -> size() == 0)
841 {
842 #ifdef PANIC
843 *logofs << "SplitStore: PANIC! Function receive called with no splits available.\n"
844 << logofs_flush;
845 #endif
846
847 cerr << "Error" << ": Function receive called with no splits available.\n";
848
849 HandleAbort();
850 }
851
852 if (current_ == splits_ -> end())
853 {
854 start(decodeBuffer);
855 }
856
857 //
858 // Check first if split was aborted, else add
859 // any new data to message being recomposed.
860 //
861
862 Split *split = *current_;
863
864 unsigned int abort = 0;
865
866 decodeBuffer.decodeBoolValue(abort);
867
868 if (abort == 1)
869 {
870 #ifdef TEST
871 *logofs << "SplitStore: Aborting split for checksum ["
872 << DumpChecksum(split -> checksum_) << "] position "
873 << split -> position_ << " with " << (split ->
874 data_.size() - split -> next_) << " bytes to go "
875 << "out of " << split -> data_.size()
876 << ".\n" << logofs_flush;
877 #endif
878
879 statistics -> addSplitAborted();
880
881 statistics -> addSplitAbortedBytesOut(split -> r_size_ - split -> next_);
882
883 split -> next_ = split -> r_size_;
884
885 split -> state_ = split_aborted;
886 }
887 else
888 {
889 //
890 // Get the size of the packet.
891 //
892
893 unsigned int count;
894
895 decodeBuffer.decodeValue(count, 32, 10);
896
897 //
898 // If the split was not already loaded from
899 // disk, decode the packet and update our
900 // copy of the data. The encoding side may
901 // have not received the abort event, yet,
902 // and may be unaware that the message is
903 // stored in compressed form at our side.
904 //
905
906 #ifdef TEST
907 *logofs << "SplitStore: Receiving split for checksum ["
908 << DumpChecksum(split -> checksum_) << "] count "
909 << count << " position " << split -> position_
910 << ". Data size is " << split -> data_.size() << " ("
911 << split -> d_size_ << "/" << split -> c_size_ << "/"
912 << split -> r_size_ << "), " << split -> r_size_ -
913 (split -> next_ + count) << " to go.\n"
914 << logofs_flush;
915 #endif
916
917 if (split -> next_ + count > (unsigned) split -> r_size_)
918 {
919 #ifdef PANIC
920 *logofs << "SplitStore: PANIC! Invalid data count "
921 << count << "provided in the split.\n"
922 << logofs_flush;
923
924 *logofs << "SplitStore: PANIC! While receiving split for "
925 << "checksum [" << DumpChecksum(split -> checksum_)
926 << "] with count " << count << " action ["
927 << DumpAction(split -> action_) << "] state ["
928 << DumpState(split -> state_) << "]. Data size is "
929 << split -> data_.size() << " (" << split -> d_size_
930 << "/" << split -> c_size_ << "), " << split ->
931 data_.size() - (split -> next_ + count)
932 << " to go.\n" << logofs_flush;
933 #endif
934
935 cerr << "Error" << ": Invalid data count "
936 << count << "provided in the split.\n";
937
938 HandleAbort();
939 }
940
941 if (split -> state_ != split_loaded)
942 {
943 #ifdef TEST
944
945 if (split -> next_ + count > split -> data_.size())
946 {
947 #ifdef PANIC
948 *logofs << "SplitStore: PANIC! Inconsistent split data size "
949 << split -> data_.size() << " with expected size "
950 << split -> r_size_ << ".\n"
951 << logofs_flush;
952 #endif
953
954 HandleAbort();
955 }
956
957 #endif
958
959 memcpy(split -> data_.begin() + split -> next_,
960 decodeBuffer.decodeMemory(count), count);
961 }
962 else
963 {
964 #ifdef TEST
965 *logofs << "SplitStore: WARNING! Data discarded with split "
966 << "loaded from disk.\n" << logofs_flush;
967 #endif
968
969 decodeBuffer.decodeMemory(count);
970 }
971
972 split -> next_ += count;
973 }
974
975 //
976 // Is unsplit complete?
977 //
978
979 if (split -> next_ != split -> r_size_)
980 {
981 return 0;
982 }
983
984 //
985 // If the persistent cache is enabled,
986 // we have a valid checksum and the
987 // split was not originally retrieved
988 // from disk, save the message on disk.
989 //
990
991 if (split -> state_ != split_loaded &&
992 split -> state_ != split_aborted)
993 {
994 save(split);
995 }
996
997 //
998 // Move the split at the head of the
999 // list to the commits.
1000 //
1001
1002 remove(split);
1003
1004 //
1005 // Reset the current position to the
1006 // end of the repository.
1007 //
1008
1009 current_ = splits_ -> end();
1010
1011 #ifdef TEST
1012 *logofs << "SplitStore: Removed split at head of the list. "
1013 << "Resource is " << split -> resource_ << " request "
1014 << (unsigned) split -> store_ -> opcode() << " position "
1015 << split -> position_ << ".\n" << logofs_flush;
1016 #endif
1017
1018 return 1;
1019 }
1020
pop()1021 Split *SplitStore::pop()
1022 {
1023 if (splits_ -> size() == 0)
1024 {
1025 #ifdef TEST
1026 *logofs << "SplitStore: The split store is empty.\n"
1027 << logofs_flush;
1028 #endif
1029
1030 return NULL;
1031 }
1032
1033 //
1034 // Move the pointer at the end of the list.
1035 // The next send operation will eventually
1036 // start a new split.
1037 //
1038
1039 current_ = splits_ -> end();
1040
1041 Split *split = *(splits_ -> begin());
1042
1043 splits_ -> pop_front();
1044
1045 #ifdef TEST
1046 *logofs << "SplitStore: Removed split at the head of the "
1047 << "list with resource " << split -> resource_
1048 << " request " << (unsigned) split -> store_ ->
1049 opcode() << " position " << split -> position_
1050 << ".\n" << logofs_flush;
1051 #endif
1052
1053 splitStorageSize_ -= getNodeSize(split);
1054
1055 totalSplitSize_--;
1056
1057 totalSplitStorageSize_ -= getNodeSize(split);
1058
1059 #ifdef TEST
1060 *logofs << "SplitStore: There are " << splits_ -> size()
1061 << " messages in store [" << resource_ << "] with "
1062 << "storage size " << splitStorageSize_ << ".\n"
1063 << logofs_flush;
1064
1065 *logofs << "SplitStore: Total messages in stores are "
1066 << totalSplitSize_ << " with total storage size "
1067 << totalSplitStorageSize_ << ".\n"
1068 << logofs_flush;
1069 #endif
1070
1071 return split;
1072 }
1073
remove(Split * split)1074 void SplitStore::remove(Split *split)
1075 {
1076 #ifdef TEST
1077 *logofs << "SplitStore: Going to remove the split from the list.\n"
1078 << logofs_flush;
1079 #endif
1080
1081 #ifdef TEST
1082
1083 if (split != getFirstSplit())
1084 {
1085 #ifdef PANIC
1086 *logofs << "SplitStore: PANIC! Trying to remove a split "
1087 << "not at the head of the list.\n"
1088 << logofs_flush;
1089 #endif
1090
1091 cerr << "Error" << ": Trying to remove a split "
1092 << "not at the head of the list.\n";
1093
1094 HandleAbort();
1095 }
1096
1097 #endif
1098
1099 //
1100 // Move the split to the commit store.
1101 //
1102
1103 splits_ -> pop_front();
1104
1105 commits_ -> splits_ -> push_back(split);
1106
1107 splitStorageSize_ -= getNodeSize(split);
1108
1109 totalSplitSize_--;
1110
1111 totalSplitStorageSize_ -= getNodeSize(split);
1112
1113 #ifdef TEST
1114 *logofs << "SplitStore: There are " << splits_ -> size()
1115 << " messages in store [" << resource_ << "] with "
1116 << "storage size " << splitStorageSize_ << ".\n"
1117 << logofs_flush;
1118
1119 *logofs << "SplitStore: Total messages in stores are "
1120 << totalSplitSize_ << " with total storage size "
1121 << totalSplitStorageSize_ << ".\n"
1122 << logofs_flush;
1123 #endif
1124
1125 #ifdef TEST
1126
1127 if (splits_ -> size() == 0)
1128 {
1129 if (splitStorageSize_ != 0)
1130 {
1131 #ifdef PANIC
1132 *logofs << "SplitStore: PANIC! Internal error calculating "
1133 << "split data size. It is " << splitStorageSize_
1134 << " while should be 0.\n" << logofs_flush;
1135 #endif
1136
1137 cerr << "Error" << ": Internal error calculating "
1138 << "split data size. It is " << splitStorageSize_
1139 << " while should be 0.\n";
1140
1141 HandleAbort();
1142 }
1143 }
1144
1145 #endif
1146 }
1147
name(const T_checksum checksum)1148 const char *SplitStore::name(const T_checksum checksum)
1149 {
1150 if (checksum == NULL)
1151 {
1152 return NULL;
1153 }
1154
1155 char *pathName = control -> ImageCachePath;
1156
1157 if (pathName == NULL)
1158 {
1159 #ifdef PANIC
1160 *logofs << "SplitStore: PANIC! Cannot determine directory of "
1161 << "NX image files.\n" << logofs_flush;
1162 #endif
1163
1164 return NULL;
1165 }
1166
1167 int pathSize = strlen(pathName);
1168
1169 //
1170 // File name is "[path][/I-c/I-][checksum][\0]",
1171 // where c is the first hex digit of checksum.
1172 //
1173
1174 int nameSize = pathSize + 7 + MD5_LENGTH * 2 + 1;
1175
1176 char *fileName = new char[nameSize];
1177
1178 if (fileName == NULL)
1179 {
1180 #ifdef PANIC
1181 *logofs << "SplitStore: PANIC! Cannot allocate space for "
1182 << "NX image file name.\n" << logofs_flush;
1183 #endif
1184
1185 return NULL;
1186 }
1187
1188 strcpy(fileName, pathName);
1189
1190 sprintf(fileName + pathSize, "/I-%1X/I-",
1191 *((unsigned char *) checksum) >> 4);
1192
1193 for (unsigned int i = 0; i < MD5_LENGTH; i++)
1194 {
1195 sprintf(fileName + pathSize + 7 + (i * 2), "%02X",
1196 ((unsigned char *) checksum)[i]);
1197 }
1198
1199 return fileName;
1200 }
1201
save(Split * split)1202 int SplitStore::save(Split *split)
1203 {
1204 //
1205 // Check if saving the message on the
1206 // persistent cache is enabled.
1207 //
1208
1209 if (split -> save_ == 0)
1210 {
1211 return 0;
1212 }
1213
1214 T_checksum checksum = split -> checksum_;
1215
1216 const char *fileName = name(checksum);
1217
1218 if (fileName == NULL)
1219 {
1220 return 0;
1221 }
1222
1223 unsigned int splitSize;
1224
1225 ostream *fileStream = NULL;
1226
1227 unsigned char *fileHeader = NULL;
1228
1229 //
1230 // Get the other data from the split.
1231 //
1232
1233 unsigned char opcode = split -> store_ -> opcode();
1234
1235 unsigned char *data = split -> data_.begin();
1236
1237 int dataSize = split -> d_size_;
1238 int compressedSize = split -> c_size_;
1239
1240 #ifdef DEBUG
1241 *logofs << "SplitStore: Going to save split OPCODE#"
1242 << (unsigned int) opcode << " to file '" << fileName
1243 << "' with size " << dataSize << " and compressed size "
1244 << compressedSize << ".\n" << logofs_flush;
1245 #endif
1246
1247 DisableSignals();
1248
1249 //
1250 // Change the mask to make the file only
1251 // readable by the user, then restore the
1252 // old mask.
1253 //
1254
1255 mode_t fileMode;
1256
1257 //
1258 // Check if the file already exists. We try to
1259 // load the message when the split is started
1260 // and save it only if it is not found. Still
1261 // the remote side may send the same image mul-
1262 // tiple time and we may not have the time to
1263 // notify the abort.
1264 //
1265
1266 struct stat fileStat;
1267
1268 if (stat(fileName, &fileStat) == 0)
1269 {
1270 #ifdef TEST
1271 *logofs << "SplitStore: Image file '" << fileName
1272 << "' already present on disk.\n"
1273 << logofs_flush;
1274 #endif
1275
1276 goto SplitStoreSaveError;
1277 }
1278
1279 fileMode = umask(0077);
1280
1281 fileStream = new ofstream(fileName, ios::out | ios::binary);
1282
1283 umask(fileMode);
1284
1285 if (CheckData(fileStream) < 0)
1286 {
1287 #ifdef PANIC
1288 *logofs << "SplitStore: PANIC! Cannot open file '" << fileName
1289 << "' for output.\n" << logofs_flush;
1290 #endif
1291
1292 goto SplitStoreSaveError;
1293 }
1294
1295 fileHeader = new unsigned char[SPLIT_HEADER_SIZE];
1296
1297 if (fileHeader == NULL)
1298 {
1299 #ifdef PANIC
1300 *logofs << "SplitStore: PANIC! Cannot allocate space for "
1301 << "NX image header.\n" << logofs_flush;
1302 #endif
1303
1304 goto SplitStoreSaveError;
1305 }
1306
1307 //
1308 // Leave 3 bytes for future use. Please note
1309 // that, on some CPUs, we can't use PutULONG()
1310 // to write integers that are not aligned to
1311 // the word boundary.
1312 //
1313
1314 *fileHeader = opcode;
1315
1316 *(fileHeader + 1) = 0;
1317 *(fileHeader + 2) = 0;
1318 *(fileHeader + 3) = 0;
1319
1320 PutULONG(dataSize, fileHeader + 4, false);
1321 PutULONG(compressedSize, fileHeader + 8, false);
1322
1323 splitSize = (compressedSize > 0 ? compressedSize : dataSize);
1324
1325 if (PutData(fileStream, fileHeader, SPLIT_HEADER_SIZE) < 0 ||
1326 PutData(fileStream, data, splitSize) < 0)
1327 {
1328 #ifdef PANIC
1329 *logofs << "SplitStore: PANIC! Cannot write to NX "
1330 << "image file '" << fileName << "'.\n"
1331 << logofs_flush;
1332 #endif
1333
1334 goto SplitStoreSaveError;
1335 }
1336
1337 //
1338 // Check if all the data was written on the
1339 // disk and, if not, remove the faulty copy.
1340 //
1341
1342 FlushData(fileStream);
1343
1344 if (CheckData(fileStream) < 0)
1345 {
1346 #ifdef PANIC
1347 *logofs << "SplitStore: PANIC! Failed to write NX "
1348 << "image file '" << fileName << "'.\n"
1349 << logofs_flush;
1350 #endif
1351
1352 cerr << "Warning" << ": Failed to write NX "
1353 << "image file '" << fileName << "'.\n";
1354
1355 goto SplitStoreSaveError;
1356 }
1357
1358 #ifdef TEST
1359 *logofs << "SplitStore: Saved split to file '" << fileName
1360 << "' with data size " << dataSize << " and "
1361 << "compressed data size " << compressedSize
1362 << ".\n" << logofs_flush;
1363 #endif
1364
1365 delete fileStream;
1366
1367 delete [] fileName;
1368 delete [] fileHeader;
1369
1370 EnableSignals();
1371
1372 //
1373 // Update the timestamp as the operation
1374 // may have taken some time.
1375 //
1376
1377 getNewTimestamp();
1378
1379 return 1;
1380
1381 SplitStoreSaveError:
1382
1383 delete fileStream;
1384
1385 if (fileName != NULL)
1386 {
1387 unlink(fileName);
1388 }
1389
1390 delete [] fileName;
1391 delete [] fileHeader;
1392
1393 EnableSignals();
1394
1395 return -1;
1396 }
1397
find(Split * split)1398 int SplitStore::find(Split *split)
1399 {
1400 const char *fileName = name(split -> checksum_);
1401
1402 if (fileName == NULL)
1403 {
1404 return 0;
1405 }
1406
1407 #ifdef DEBUG
1408 *logofs << "SplitStore: Going to find split OPCODE#"
1409 << (unsigned) split -> store_ -> opcode()
1410 << " in file '" << fileName << "'.\n"
1411 << logofs_flush;
1412 #endif
1413
1414 //
1415 // Check if the file exists and, at the
1416 // same time, update the modification
1417 // time to prevent its deletion.
1418 //
1419
1420 if (utime(fileName, NULL) == 0)
1421 {
1422 #ifdef TEST
1423 *logofs << "SplitStore: Found split OPCODE#"
1424 << (unsigned) split -> store_ -> opcode()
1425 << " in file '" << fileName << "'.\n"
1426 << logofs_flush;
1427 #endif
1428
1429 delete [] fileName;
1430
1431 return 1;
1432 }
1433
1434 #ifdef TEST
1435 *logofs << "SplitStore: WARNING! Can't find split "
1436 << "OPCODE#" << (unsigned) split -> store_ ->
1437 opcode() << " in file '" << fileName
1438 << "'.\n" << logofs_flush;
1439 #endif
1440
1441 delete [] fileName;
1442
1443 return 0;
1444 }
1445
load(Split * split)1446 int SplitStore::load(Split *split)
1447 {
1448 //
1449 // Check if loading the image is enabled.
1450 //
1451
1452 if (split -> load_ == 0)
1453 {
1454 return 0;
1455 }
1456
1457 const char *fileName = name(split -> checksum_);
1458
1459 if (fileName == NULL)
1460 {
1461 return 0;
1462 }
1463
1464 unsigned char fileOpcode;
1465
1466 int fileSize;
1467 int fileCSize;
1468
1469 istream *fileStream = NULL;
1470
1471 unsigned char *fileHeader = NULL;
1472
1473 DisableSignals();
1474
1475 #ifdef DEBUG
1476 *logofs << "SplitStore: Going to load split OPCODE#"
1477 << (unsigned int) split -> store_ -> opcode()
1478 << " from file '" << fileName << "'.\n"
1479 << logofs_flush;
1480 #endif
1481
1482 fileStream = new ifstream(fileName, ios::in | ios::binary);
1483
1484 if (CheckData(fileStream) < 0)
1485 {
1486 #ifdef TEST
1487 *logofs << "SplitStore: WARNING! Can't open image file '"
1488 << fileName << "' on disk.\n" << logofs_flush;
1489 #endif
1490
1491 goto SplitStoreLoadError;
1492 }
1493
1494 fileHeader = new unsigned char[SPLIT_HEADER_SIZE];
1495
1496 if (fileHeader == NULL)
1497 {
1498 #ifdef PANIC
1499 *logofs << "SplitStore: PANIC! Cannot allocate space for "
1500 << "NX image header.\n" << logofs_flush;
1501 #endif
1502
1503 cerr << "Error" << ": Cannot allocate space for "
1504 << "NX image header.\n";
1505
1506 goto SplitStoreLoadError;
1507 }
1508
1509 if (GetData(fileStream, fileHeader, SPLIT_HEADER_SIZE) < 0)
1510 {
1511 #ifdef PANIC
1512 *logofs << "SplitStore: PANIC! Cannot read header from "
1513 << "NX image file '" << fileName << "'.\n"
1514 << logofs_flush;
1515 #endif
1516
1517 cerr << "Warning" << ": Cannot read header from "
1518 << "NX image file '" << fileName << "'.\n";
1519
1520 goto SplitStoreLoadError;
1521 }
1522
1523 fileOpcode = *fileHeader;
1524
1525 fileSize = GetULONG(fileHeader + 4, false);
1526 fileCSize = GetULONG(fileHeader + 8, false);
1527
1528 //
1529 // Don't complain if we find that data was saved
1530 // in compressed form even if we were not aware
1531 // of the compressed data size. The remote side
1532 // compresses the data only at the time it starts
1533 // the transferral of the split. We replace our
1534 // copy of the data with whatever we find on the
1535 // disk.
1536 //
1537
1538 if (fileOpcode != split -> store_ -> opcode() ||
1539 fileSize != split -> d_size_ ||
1540 fileSize > control -> MaximumRequestSize ||
1541 fileCSize > control -> MaximumRequestSize)
1542
1543 {
1544 #ifdef TEST
1545 *logofs << "SplitStore: PANIC! Corrupted image file '" << fileName
1546 << "'. Expected " << (unsigned int) split -> store_ -> opcode()
1547 << "/" << split -> d_size_ << "/" << split -> c_size_ << " found "
1548 << (unsigned int) fileOpcode << "/" << fileSize << "/"
1549 << fileCSize << ".\n" << logofs_flush;
1550 #endif
1551
1552 cerr << "Warning" << ": Corrupted image file '" << fileName
1553 << "'. Expected " << (unsigned int) split -> store_ -> opcode()
1554 << "/" << split -> d_size_ << "/" << split -> c_size_ << " found "
1555 << (unsigned int) fileOpcode << "/" << fileSize << "/"
1556 << fileCSize << ".\n";
1557
1558 goto SplitStoreLoadError;
1559 }
1560
1561 //
1562 // Update the data size with the size
1563 // we got from the disk record.
1564 //
1565
1566 split -> d_size_ = fileSize;
1567 split -> c_size_ = fileCSize;
1568
1569 unsigned int splitSize;
1570
1571 if (fileCSize > 0)
1572 {
1573 splitSize = fileCSize;
1574 }
1575 else
1576 {
1577 splitSize = fileSize;
1578 }
1579
1580 //
1581 // Allocate a new buffer if we didn't
1582 // do that already or if the size is
1583 // different.
1584 //
1585
1586 if (split -> data_.size() != splitSize)
1587 {
1588 split -> data_.clear();
1589
1590 split -> data_.resize(splitSize);
1591 }
1592
1593 if (GetData(fileStream, split -> data_.begin(), splitSize) < 0)
1594 {
1595 #ifdef PANIC
1596 *logofs << "SplitStore: PANIC! Cannot read data from "
1597 << "NX image file '" << fileName << "'.\n"
1598 << logofs_flush;
1599 #endif
1600
1601 cerr << "Warning" << ": Cannot read data from "
1602 << "NX image file '" << fileName << "'.\n";
1603
1604 goto SplitStoreLoadError;
1605 }
1606
1607 delete fileStream;
1608
1609 delete [] fileHeader;
1610 delete [] fileName;
1611
1612 EnableSignals();
1613
1614 //
1615 // Update the timestamp as the operation
1616 // may have taken some time.
1617 //
1618
1619 getNewTimestamp();
1620
1621 return 1;
1622
1623 SplitStoreLoadError:
1624
1625 delete fileStream;
1626
1627 unlink(fileName);
1628
1629 delete [] fileName;
1630 delete [] fileHeader;
1631
1632 EnableSignals();
1633
1634 return -1;
1635 }
1636
pop()1637 Split *CommitStore::pop()
1638 {
1639 if (splits_ -> size() == 0)
1640 {
1641 #ifdef TEST
1642 *logofs << "CommitStore: The commit store is empty.\n"
1643 << logofs_flush;
1644 #endif
1645
1646 return NULL;
1647 }
1648
1649 Split *split = *(splits_ -> begin());
1650
1651 splits_ -> pop_front();
1652
1653 #ifdef TEST
1654 *logofs << "CommitStore: Removed commit split at the head "
1655 << "of the list with resource " << split -> resource_
1656 << " request " << (unsigned) split -> store_ ->
1657 opcode() << " position " << split -> position_
1658 << ".\n" << logofs_flush;
1659 #endif
1660
1661 return split;
1662 }
1663
expand(Split * split,unsigned char * buffer,const int size)1664 int CommitStore::expand(Split *split, unsigned char *buffer, const int size)
1665 {
1666 #ifdef TEST
1667 *logofs << "CommitStore: Expanding split data with "
1668 << size << " bytes to write.\n"
1669 << logofs_flush;
1670 #endif
1671
1672 #ifdef TEST
1673
1674 if (size < split -> i_size_ + split -> d_size_)
1675 {
1676 #ifdef PANIC
1677 *logofs << "CommitStore: PANIC! Wrong size of the provided "
1678 << "buffer. It should be " << split -> i_size_ +
1679 split -> d_size_ << " instead of " << size
1680 << ".\n" << logofs_flush;
1681 #endif
1682
1683 cerr << "Error" << ": Wrong size of the provided "
1684 << "buffer. It should be " << split -> i_size_ +
1685 split -> d_size_ << " instead of " << size
1686 << ".\n";
1687
1688 HandleAbort();
1689 }
1690
1691 #endif
1692
1693 #ifdef DEBUG
1694 *logofs << "CommitStore: Copying " << split -> i_size_
1695 << " bytes of identity.\n" << logofs_flush;
1696 #endif
1697
1698 memcpy(buffer, split -> identity_.begin(), split -> i_size_);
1699
1700 //
1701 // Copy data, if any, to the buffer.
1702 //
1703
1704 if (size > split -> i_size_)
1705 {
1706 //
1707 // Check if message has been stored
1708 // in compressed format.
1709 //
1710
1711 if (split -> c_size_ == 0)
1712 {
1713 #ifdef DEBUG
1714 *logofs << "CommitStore: Copying " << split -> d_size_
1715 << " bytes of plain data.\n" << logofs_flush;
1716 #endif
1717
1718 memcpy(buffer + split -> i_size_, split -> data_.begin(), split -> d_size_);
1719 }
1720 else
1721 {
1722 #ifdef DEBUG
1723 *logofs << "CommitStore: Decompressing " << split -> c_size_
1724 << " bytes and copying " << split -> d_size_
1725 << " bytes of data.\n" << logofs_flush;
1726 #endif
1727
1728 if (compressor_ ->
1729 decompressBuffer(buffer + split -> i_size_,
1730 split -> d_size_, split -> data_.begin(),
1731 split -> c_size_) < 0)
1732 {
1733 #ifdef PANIC
1734 *logofs << "CommitStore: PANIC! Split data decompression failed.\n"
1735 << logofs_flush;
1736 #endif
1737
1738 cerr << "Error" << ": Split data decompression failed.\n";
1739
1740 return -1;
1741 }
1742 }
1743 }
1744
1745 return 1;
1746 }
1747
update(Split * split)1748 int CommitStore::update(Split *split)
1749 {
1750 if (split -> action_ != IS_ADDED)
1751 {
1752 return 0;
1753 }
1754
1755 //
1756 // We don't need the identity data at
1757 // the encoding side.
1758 //
1759
1760 if (split -> identity_.size() == 0)
1761 {
1762 #ifdef TEST
1763 *logofs << "SplitStore: Going to update the size "
1764 << "for object at position " << split -> position_
1765 << " with data size " << split -> d_size_
1766 << " and compressed data size " << split ->
1767 c_size_ << ".\n" << logofs_flush;
1768 #endif
1769
1770 split -> store_ -> updateData(split -> position_, split -> d_size_,
1771 split -> c_size_);
1772 }
1773 else
1774 {
1775 #ifdef TEST
1776 *logofs << "SplitStore: Going to update data and size "
1777 << "for object at position " << split -> position_
1778 << " with data size " << split -> d_size_
1779 << " and compressed data size " << split ->
1780 c_size_ << ".\n" << logofs_flush;
1781 #endif
1782
1783 split -> store_ -> updateData(split -> position_, split -> data_.begin(),
1784 split -> d_size_, split -> c_size_);
1785 }
1786
1787 //
1788 // Unlock message so that we can remove
1789 // or save it on disk at shutdown.
1790 //
1791
1792 if (split -> action_ == IS_ADDED)
1793 {
1794 split -> store_ -> unlock(split -> position_);
1795
1796 #ifdef TEST
1797
1798 validate(split);
1799
1800 #endif
1801 }
1802
1803 return 1;
1804 }
1805
validate(Split * split)1806 int CommitStore::validate(Split *split)
1807 {
1808 MessageStore *store = split -> store_;
1809
1810 int p, n, s;
1811
1812 s = store -> cacheSlots;
1813
1814 for (p = 0, n = 0; p < s; p++)
1815 {
1816 if (store -> getLocks(p) == 1)
1817 {
1818 n++;
1819 }
1820 else if (store -> getLocks(p) != 0)
1821 {
1822 #ifdef PANIC
1823 *logofs << "CommitStore: PANIC! Repository for OPCODE#"
1824 << (unsigned int) store -> opcode() << " has "
1825 << store -> getLocks(p) << " locks for message "
1826 << "at position " << p << ".\n" << logofs_flush;
1827 #endif
1828
1829 cerr << "Error" << ": Repository for OPCODE#"
1830 << (unsigned int) store -> opcode() << " has "
1831 << store -> getLocks(p) << " locks for message "
1832 << "at position " << p << ".\n";
1833
1834 HandleAbort();
1835 }
1836 }
1837
1838 #ifdef TEST
1839 *logofs << "CommitStore: Repository for OPCODE#"
1840 << (unsigned int) store -> opcode()
1841 << " has " << n << " locked messages.\n"
1842 << logofs_flush;
1843 #endif
1844
1845 return 1;
1846 }
1847