1 /**************************************************************************/
2 /*                                                                        */
3 /* Copyright (c) 2001, 2010 NoMachine, http://www.nomachine.com/.         */
4 /*                                                                        */
5 /* NXCOMP, NX protocol compression and NX extensions to this software     */
6 /* are copyright of NoMachine. Redistribution and use of the present      */
7 /* software is allowed according to terms specified in the file LICENSE   */
8 /* which comes in the source distribution.                                */
9 /*                                                                        */
10 /* Check http://www.nomachine.com/licensing.html for applicability.       */
11 /*                                                                        */
12 /* NX and NoMachine are trademarks of Medialogic S.p.A.                   */
13 /*                                                                        */
14 /* All rights reserved.                                                   */
15 /*                                                                        */
16 /**************************************************************************/
17 
18 #include <unistd.h>
19 #include <cstring>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22 #include <utime.h>
23 
24 #include "Misc.h"
25 
26 #include "Split.h"
27 
28 #include "Control.h"
29 #include "Statistics.h"
30 
31 #include "EncodeBuffer.h"
32 #include "DecodeBuffer.h"
33 
34 #include "StaticCompressor.h"
35 
36 #include "Unpack.h"
37 
38 //
39 // Set the verbosity level.
40 //
41 
42 #define PANIC
43 #define WARNING
44 #undef  TEST
45 #undef  DEBUG
46 #undef  DUMP
47 
48 //
49 // Define this to trace elements
50 // allocated and deallocated.
51 //
52 
53 #undef  REFERENCES
54 
55 //
56 // Counters used for store control.
57 //
58 
59 int SplitStore::totalSplitSize_;
60 int SplitStore::totalSplitStorageSize_;
61 
62 //
63 // This is used for reference count.
64 //
65 
66 #ifdef REFERENCES
67 
68 int Split::references_ = 0;
69 
70 #endif
71 
Split()72 Split::Split()
73 {
74   resource_ = nothing;
75   position_ = nothing;
76 
77   store_ = NULL;
78 
79   d_size_ = 0;
80   i_size_ = 0;
81   c_size_ = 0;
82   r_size_ = 0;
83 
84   next_ = 0;
85   load_ = 0;
86   save_ = 0;
87 
88   checksum_ = NULL;
89   state_    = split_undefined;
90   mode_     = split_none;
91   action_   = is_discarded;
92 
93   #ifdef REFERENCES
94 
95   references_++;
96 
97   *logofs << "Split: Created new Split at "
98           << this << " out of " << references_
99           << " allocated references.\n" << logofs_flush;
100   #endif
101 }
102 
~Split()103 Split::~Split()
104 {
105   delete [] checksum_;
106 
107   #ifdef REFERENCES
108 
109   references_--;
110 
111   *logofs << "Split: Deleted Split at "
112           << this << " out of " << references_
113           << " allocated references.\n" << logofs_flush;
114   #endif
115 }
116 
SplitStore(StaticCompressor * compressor,CommitStore * commits,int resource)117 SplitStore::SplitStore(StaticCompressor *compressor, CommitStore *commits, int resource)
118 
119   : compressor_(compressor), commits_(commits), resource_(resource)
120 {
121   splits_ = new T_splits();
122 
123   current_ = splits_ -> end();
124 
125   splitStorageSize_ = 0;
126 
127   #ifdef TEST
128   *logofs << "SplitStore: Created new store [";
129 
130   if (resource_ != nothing)
131   {
132     *logofs << resource_;
133   }
134   else
135   {
136     *logofs << "commit";
137   }
138 
139   *logofs << "].\n" << logofs_flush;
140 
141   *logofs << "SplitStore: Total messages in stores are "
142           << totalSplitSize_ << " with total storage size "
143           << totalSplitStorageSize_ << ".\n"
144           << logofs_flush;
145   #endif
146 }
147 
~SplitStore()148 SplitStore::~SplitStore()
149 {
150   totalSplitSize_ -= splits_ -> size();
151 
152   totalSplitStorageSize_ -= splitStorageSize_;
153 
154   for (T_splits::iterator i = splits_ -> begin();
155            i != splits_ -> end(); i++)
156   {
157     delete *i;
158   }
159 
160   delete splits_;
161 
162   #ifdef TEST
163   *logofs << "SplitStore: Deleted store [";
164 
165   if (resource_ != nothing)
166   {
167     *logofs << resource_;
168   }
169   else
170   {
171     *logofs << "commit";
172   }
173 
174   *logofs << "] with storage size " << splitStorageSize_
175           << ".\n" << logofs_flush;
176 
177   *logofs << "SplitStore: Total messages in stores are "
178           << totalSplitSize_ << " with total storage size "
179           << totalSplitStorageSize_ << ".\n"
180           << logofs_flush;
181   #endif
182 }
183 
184 //
185 // This is called at the encoding side.
186 //
187 
add(MessageStore * store,int resource,T_split_mode mode,int position,T_store_action action,T_checksum checksum,const unsigned char * buffer,const int size)188 Split *SplitStore::add(MessageStore *store, int resource, T_split_mode mode,
189                            int position, T_store_action action, T_checksum checksum,
190                                const unsigned char *buffer, const int size)
191 {
192   #ifdef TEST
193   *logofs << "SplitStore: Adding message [" << (unsigned int) store ->
194              opcode() << "] resource " << resource << " mode " << mode
195           << " position " << position << " action [" << DumpAction(action)
196           << "] and checksum [" << DumpChecksum(checksum) << "]"
197           << ".\n" << logofs_flush;
198   #endif
199 
200   Split *split = new Split();
201 
202   if (split == NULL)
203   {
204     #ifdef PANIC
205     *logofs << "SplitStore: PANIC! Can't allocate "
206             << "memory for the split.\n"
207             << logofs_flush;
208     #endif
209 
210     cerr << "Error" << ": Can't allocate memory "
211          << "for the split.\n";
212 
213     HandleAbort();
214   }
215 
216   split -> store_    = store;
217   split -> resource_ = resource;
218   split -> mode_     = mode;
219   split -> position_ = position;
220   split -> action_   = action;
221 
222   split -> store_ -> validateSize(size);
223 
224   //
225   // The checksum is not provided if the
226   // message is cached.
227   //
228 
229   if (checksum != NULL)
230   {
231     split -> checksum_ = new md5_byte_t[MD5_LENGTH];
232 
233     memcpy(split -> checksum_, checksum, MD5_LENGTH);
234   }
235 
236   //
237   // We don't need the identity data at the
238   // encoding side. This qualifies the split
239   // as a split generated at the encoding
240   // side.
241   //
242 
243   split -> i_size_ = store -> identitySize(buffer, size);
244 
245   split -> d_size_ = size - split -> i_size_;
246 
247   if (action == IS_ADDED || action == is_discarded)
248   {
249     //
250     // If the message was added to message
251     // store or discarded we need to save
252     // the real data so we can transfer it
253     // at later time.
254     //
255 
256     split -> data_.resize(split -> d_size_);
257 
258     memcpy(split -> data_.begin(), buffer + split -> i_size_, split -> d_size_);
259 
260     //
261     // If the message was added, lock it so
262     // it will not be used by the encoding
263     // side until it is recomposed.
264     //
265 
266     if (action == IS_ADDED)
267     {
268       split -> store_ -> lock(split -> position_);
269 
270       #ifdef TEST
271 
272       commits_ -> validate(split);
273 
274       #endif
275     }
276   }
277   #ifdef WARNING
278   else
279   {
280     *logofs << "SplitStore: WARNING! Not copying data for the cached message.\n"
281             << logofs_flush;
282   }
283   #endif
284 
285   push(split);
286 
287   return split;
288 }
289 
290 //
291 // This is called at decoding side. If checksum
292 // is provided, the message can be searched on
293 // disk, then, if message is found, an event is
294 // sent to abort the data transfer.
295 //
296 
add(MessageStore * store,int resource,int position,T_store_action action,T_checksum checksum,unsigned char * buffer,const int size)297 Split *SplitStore::add(MessageStore *store, int resource, int position,
298                            T_store_action action, T_checksum checksum,
299                                unsigned char *buffer, const int size)
300 {
301   #ifdef TEST
302   *logofs << "SplitStore: Adding message ["
303           << (unsigned int) store -> opcode() << "] resource "
304           << resource << " position " << position << " action ["
305           << DumpAction(action) << "] and checksum ["
306           << DumpChecksum(checksum) << "].\n" << logofs_flush;
307   #endif
308 
309   Split *split = new Split();
310 
311   if (split == NULL)
312   {
313     #ifdef PANIC
314     *logofs << "SplitStore: PANIC! Can't allocate "
315             << "memory for the split.\n"
316             << logofs_flush;
317     #endif
318 
319     cerr << "Error" << ": Can't allocate memory "
320          << "for the split.\n";
321 
322     HandleAbort();
323   }
324 
325   split -> store_    = store;
326   split -> resource_ = resource;
327   split -> position_ = position;
328   split -> action_   = action;
329 
330   split -> store_ -> validateSize(size);
331 
332   //
333   // Check if the checksum was provided
334   // by the remote.
335   //
336 
337   if (checksum != NULL)
338   {
339     split -> checksum_ = new md5_byte_t[MD5_LENGTH];
340 
341     memcpy(split -> checksum_, checksum, MD5_LENGTH);
342   }
343 
344   split -> i_size_ = store -> identitySize(buffer, size);
345 
346   //
347   // Copy the identity so we can expand the
348   // message when it is committed.
349   //
350 
351   split -> identity_.resize(split -> i_size_);
352 
353   memcpy(split -> identity_.begin(), buffer, split -> i_size_);
354 
355   split -> d_size_ = size - split -> i_size_;
356 
357   if (action == IS_ADDED || action == is_discarded)
358   {
359     //
360     // The unpack procedure will check if the
361     // first 2 bytes of the buffer contain the
362     // pattern and will not try to expand the
363     // image.
364     //
365 
366     split -> data_.resize(2);
367 
368     unsigned char *data = split -> data_.begin();
369 
370     data[0] = SPLIT_PATTERN;
371     data[1] = SPLIT_PATTERN;
372 
373     //
374     // If the message was added to the store,
375     // we don't have the data part, yet, so
376     // we need to lock the message until it
377     // is recomposed.
378     //
379 
380     if (action == IS_ADDED)
381     {
382       split -> store_ -> lock(split -> position_);
383 
384       #ifdef TEST
385 
386       commits_ -> validate(split);
387 
388       #endif
389     }
390   }
391   else
392   {
393     #ifdef WARNING
394     *logofs << "SplitStore: WARNING! Copying data for the cached message.\n"
395             << logofs_flush;
396     #endif
397 
398     //
399     // We may optionally take the data from the
400     // message store in compressed form, but,
401     // as the data has been decompressed in the
402     // buffer, we save a further decompression.
403     //
404 
405     split -> data_.resize(split -> d_size_);
406 
407     memcpy(split -> data_.begin(), buffer + split -> i_size_, split -> d_size_);
408   }
409 
410   push(split);
411 
412   return split;
413 }
414 
push(Split * split)415 void SplitStore::push(Split *split)
416 {
417   splits_ -> push_back(split);
418 
419   splitStorageSize_ += getNodeSize(split);
420 
421   totalSplitSize_++;
422 
423   totalSplitStorageSize_ += getNodeSize(split);
424 
425   statistics -> addSplit();
426 
427   #ifdef TEST
428   *logofs << "SplitStore: There are " << splits_ -> size()
429           << " messages in store [" << resource_ << "] with "
430           << "storage size " << splitStorageSize_ << ".\n"
431           << logofs_flush;
432 
433   *logofs << "SplitStore: Total messages in stores are "
434           << totalSplitSize_ << " with total storage size "
435           << totalSplitStorageSize_ << ".\n"
436           << logofs_flush;
437   #endif
438 
439   split -> state_ = split_added;
440 }
441 
dump()442 void SplitStore::dump()
443 {
444   #ifdef DUMP
445 
446   int n;
447 
448   Split *split;
449 
450   *logofs << "SplitStore: DUMP! Dumping content of ";
451 
452   if (commits_ == NULL)
453   {
454     *logofs << "[commits]";
455   }
456   else
457   {
458     *logofs << "[splits] for store [" << resource_ << "]";
459   }
460 
461   *logofs << " with [" << getSize() << "] elements "
462           << "in the store.\n" << logofs_flush;
463 
464   n = 0;
465 
466   for (T_splits::iterator i = splits_ -> begin(); i != splits_ -> end(); i++, n++)
467   {
468     split = *i;
469 
470     *logofs << "SplitStore: DUMP! Split [" << n << "] has action ["
471             << DumpAction(split -> action_) << "] state ["
472             << DumpState(split -> state_) << "] ";
473 
474     if (split -> resource_ >= 0)
475     {
476       *logofs << "resource " << split -> resource_;
477     }
478 
479     *logofs << " request " << (unsigned) split -> store_ -> opcode()
480             << " position " << split -> position_ << " size is "
481             << split -> data_.size() << " (" << split -> d_size_
482             << "/" << split -> c_size_ << "/" << split -> r_size_
483             << ") with " << split -> data_.size() - split -> next_
484             << "] bytes to go.\n" << logofs_flush;
485   }
486 
487   #endif
488 }
489 
send(EncodeBuffer & encodeBuffer,int packetSize)490 int SplitStore::send(EncodeBuffer &encodeBuffer, int packetSize)
491 {
492   if (splits_ -> size() == 0)
493   {
494     #ifdef PANIC
495     *logofs << "SplitStore: PANIC! Function send called with no splits available.\n"
496             << logofs_flush;
497     #endif
498 
499     cerr << "Error" << ": Function send called with no splits available.\n";
500 
501     HandleAbort();
502   }
503 
504   //
505   // A start operation must always be executed on
506   // the split, even in the case the split will be
507   // later aborted.
508   //
509 
510   if (current_ == splits_ -> end())
511   {
512     start(encodeBuffer);
513   }
514 
515   //
516   // If we have matched the checksum received from
517   // the remote side then we must abort the current
518   // split, else we can send another block of data
519   // to the remote peer.
520   //
521 
522   Split *split = *current_;
523 
524   unsigned int abort = 0;
525 
526   if (split -> state_ == split_loaded)
527   {
528     abort = 1;
529   }
530 
531   encodeBuffer.encodeBoolValue(abort);
532 
533   if (abort == 1)
534   {
535     #ifdef TEST
536     *logofs << "SplitStore: Aborting split for checksum ["
537             << DumpChecksum(split -> checksum_) << "] position "
538             << split -> position_ << " with " << (split ->
539                data_.size() - split -> next_) << " bytes to go "
540             << "out of " << split -> data_.size()
541             << ".\n" << logofs_flush;
542     #endif
543 
544     statistics -> addSplitAborted();
545 
546     statistics -> addSplitAbortedBytesOut(split -> data_.size() - split -> next_);
547 
548     split -> next_ = split -> data_.size();
549 
550     split -> state_ = split_aborted;
551   }
552   else
553   {
554     int count = (packetSize <= 0 || split -> next_ +
555                      packetSize > (int) split -> data_.size() ?
556                          split -> data_.size() - split -> next_ : packetSize);
557 
558     #ifdef TEST
559     *logofs << "SplitStore: Sending split for checksum ["
560             << DumpChecksum(split -> checksum_) << "] count "
561             << count << " position " << split -> position_
562             << ". Data size is " << split -> data_.size() << " ("
563             << split -> d_size_ << "/" << split -> c_size_ << "), "
564             << split -> data_.size() - (split -> next_ + count)
565             << " to go.\n" << logofs_flush;
566     #endif
567 
568     encodeBuffer.encodeValue(count, 32, 10);
569 
570     encodeBuffer.encodeMemory(split -> data_.begin() + split -> next_, count);
571 
572     split -> next_ += count;
573   }
574 
575   //
576   // Was data completely transferred? We are the
577   // sending side. We must update the message in
578   // store, even if split was aborted.
579   //
580 
581   if (split -> next_ != ((int) split -> data_.size()))
582   {
583     return 0;
584   }
585 
586   //
587   // Move the split at the head of the
588   // list to the commits.
589   //
590 
591   remove(split);
592 
593   //
594   // Reset current position to the
595   // end of repository.
596   //
597 
598   current_ = splits_ -> end();
599 
600   #ifdef TEST
601   *logofs << "SplitStore: Removed split at head of the list. "
602           << "Resource is " << split -> resource_ << " request "
603           << (unsigned) split -> store_ -> opcode() << " position "
604           << split -> position_ << ".\n" << logofs_flush;
605   #endif
606 
607   return 1;
608 }
609 
start(EncodeBuffer & encodeBuffer)610 int SplitStore::start(EncodeBuffer &encodeBuffer)
611 {
612   //
613   // Get the element at the top of the
614   // list.
615   //
616 
617   current_ = splits_ -> begin();
618 
619   Split *split = *current_;
620 
621   #ifdef TEST
622   *logofs << "SplitStore: Starting split for checksum ["
623           << DumpChecksum(split -> checksum_) << "] position "
624           << split -> position_ << " with " << (split ->
625              data_.size() - split -> next_) << " bytes to go "
626           << "out of " << split -> data_.size()
627           << ".\n" << logofs_flush;
628   #endif
629 
630   //
631   // See if compression of the data part is
632   // enabled.
633   //
634 
635   if (split -> store_ -> enableCompress)
636   {
637     //
638     // If the split is going to be aborted don't
639     // compress the data and go straight to the
640     // send. The new data size will be assumed
641     // from the disk cache.
642     //
643 
644     if (split -> state_ != split_loaded)
645     {
646       unsigned int compressedSize = 0;
647       unsigned char *compressedData = NULL;
648 
649       if (control -> LocalDataCompression &&
650               (compressor_ -> compressBuffer(split -> data_.begin(), split -> d_size_,
651                                                  compressedData, compressedSize)))
652       {
653         //
654         // Replace the data with the one in
655         // compressed form.
656         //
657 
658         #ifdef TEST
659         *logofs << "SplitStore: Split data of size " << split -> d_size_
660                 << " has been compressed to " << compressedSize
661                 << " bytes.\n" << logofs_flush;
662         #endif
663 
664         split -> data_.clear();
665 
666         split -> data_.resize(compressedSize);
667 
668         memcpy(split -> data_.begin(), compressedData, compressedSize);
669 
670         split -> c_size_ = compressedSize;
671 
672         //
673         // Inform our peer that the data is
674         // compressed and send the new size.
675         //
676 
677         encodeBuffer.encodeBoolValue(1);
678 
679         encodeBuffer.encodeValue(compressedSize, 32, 14);
680 
681         #ifdef TEST
682         *logofs << "SplitStore: Signaled " << split -> c_size_
683                 << " bytes of compressed data for this message.\n"
684                 << logofs_flush;
685         #endif
686 
687         return 1;
688       }
689     }
690     #ifdef TEST
691     else
692     {
693       *logofs << "SplitStore: Not trying to compress the "
694               << "loaded message.\n" << logofs_flush;
695     }
696     #endif
697 
698     //
699     // Tell to the remote that data will
700     // follow uncompressed.
701     //
702 
703     encodeBuffer.encodeBoolValue(0);
704   }
705 
706   return 1;
707 }
708 
start(DecodeBuffer & decodeBuffer)709 int SplitStore::start(DecodeBuffer &decodeBuffer)
710 {
711   #ifdef TEST
712   *logofs << "SplitStore: Going to receive a new split from the remote side.\n"
713           << logofs_flush;
714   #endif
715 
716   //
717   // Get the element at the head
718   // of the list.
719   //
720 
721   current_ = splits_ -> begin();
722 
723   Split *split = *current_;
724 
725   unsigned int compressedSize = 0;
726 
727   //
728   // Save the data size known by the remote.
729   // This information will be needed if the
730   // remote will not have a chance to abort
731   // the split.
732   //
733 
734   split -> r_size_ = split -> d_size_;
735 
736   //
737   // Find out if data was compressed by the
738   // remote.
739   //
740 
741   if (split -> store_ -> enableCompress)
742   {
743     decodeBuffer.decodeBoolValue(compressedSize);
744 
745     if (compressedSize == 1)
746     {
747       //
748       // Get the compressed size.
749       //
750 
751       if (control -> isProtoStep7() == 1)
752       {
753         decodeBuffer.decodeValue(compressedSize, 32, 14);
754       }
755       else
756       {
757         //
758         // As we can't refuse to handle the decoding
759         // of the split message when connected to an
760         // old proxy version, we need to decode this
761         // in a way that is compatible.
762         //
763 
764         unsigned int diffSize;
765 
766         decodeBuffer.decodeValue(diffSize, 32, 14);
767 
768         split -> store_ -> lastResize += diffSize;
769 
770         compressedSize = split -> store_ -> lastResize;
771       }
772 
773       split -> store_ -> validateSize(split -> d_size_, compressedSize);
774 
775       split -> r_size_ = compressedSize;
776     }
777   }
778 
779   //
780   // Update the size if the split
781   // was not already loaded.
782   //
783 
784   if (split -> state_ != split_loaded)
785   {
786     split -> data_.clear();
787 
788     if (compressedSize > 0)
789     {
790       split -> c_size_ = compressedSize;
791 
792       #ifdef TEST
793       *logofs << "SplitStore: Split data of size "
794               << split -> d_size_ << " was compressed to "
795               << split -> c_size_ << " bytes.\n"
796               << logofs_flush;
797       #endif
798 
799       split -> data_.resize(split -> c_size_);
800     }
801     else
802     {
803       split -> data_.resize(split -> d_size_);
804     }
805 
806     unsigned char *data = split -> data_.begin();
807 
808     data[0] = SPLIT_PATTERN;
809     data[1] = SPLIT_PATTERN;
810   }
811   #ifdef TEST
812   else
813   {
814     //
815     // The message had been already
816     // loaded from disk.
817     //
818 
819     if (compressedSize > 0)
820     {
821       if ((int) compressedSize != split -> c_size_)
822       {
823         *logofs << "SplitStore: WARNING! Compressed data size is "
824                 << "different than the loaded compressed size.\n"
825                 << logofs_flush;
826       }
827 
828       *logofs << "SplitStore: Ignoring the new size with "
829               << "loaded compressed size " << split -> c_size_
830               << ".\n" << logofs_flush;
831     }
832   }
833   #endif
834 
835   return 1;
836 }
837 
receive(DecodeBuffer & decodeBuffer)838 int SplitStore::receive(DecodeBuffer &decodeBuffer)
839 {
840   if (splits_ -> size() == 0)
841   {
842     #ifdef PANIC
843     *logofs << "SplitStore: PANIC! Function receive called with no splits available.\n"
844             << logofs_flush;
845     #endif
846 
847     cerr << "Error" << ": Function receive called with no splits available.\n";
848 
849     HandleAbort();
850   }
851 
852   if (current_ == splits_ -> end())
853   {
854     start(decodeBuffer);
855   }
856 
857   //
858   // Check first if split was aborted, else add
859   // any new data to message being recomposed.
860   //
861 
862   Split *split = *current_;
863 
864   unsigned int abort = 0;
865 
866   decodeBuffer.decodeBoolValue(abort);
867 
868   if (abort == 1)
869   {
870     #ifdef TEST
871     *logofs << "SplitStore: Aborting split for checksum ["
872             << DumpChecksum(split -> checksum_) << "] position "
873             << split -> position_ << " with " << (split ->
874                data_.size() - split -> next_) << " bytes to go "
875             << "out of " << split -> data_.size()
876             << ".\n" << logofs_flush;
877     #endif
878 
879     statistics -> addSplitAborted();
880 
881     statistics -> addSplitAbortedBytesOut(split -> r_size_ - split -> next_);
882 
883     split -> next_ = split -> r_size_;
884 
885     split -> state_ = split_aborted;
886   }
887   else
888   {
889     //
890     // Get the size of the packet.
891     //
892 
893     unsigned int count;
894 
895     decodeBuffer.decodeValue(count, 32, 10);
896 
897     //
898     // If the split was not already loaded from
899     // disk, decode the packet and update our
900     // copy of the data. The encoding side may
901     // have not received the abort event, yet,
902     // and may be unaware that the message is
903     // stored in compressed form at our side.
904     //
905 
906     #ifdef TEST
907     *logofs << "SplitStore: Receiving split for checksum ["
908             << DumpChecksum(split -> checksum_) << "] count "
909             << count << " position " << split -> position_
910             << ". Data size is " << split -> data_.size() << " ("
911             << split -> d_size_ << "/" << split -> c_size_ << "/"
912             << split -> r_size_ << "), " << split -> r_size_ -
913                (split -> next_ + count) << " to go.\n"
914             << logofs_flush;
915     #endif
916 
917     if (split -> next_ + count > (unsigned) split -> r_size_)
918     {
919       #ifdef PANIC
920       *logofs << "SplitStore: PANIC! Invalid data count "
921               << count << "provided in the split.\n"
922               << logofs_flush;
923 
924       *logofs << "SplitStore: PANIC! While receiving split for "
925               << "checksum [" << DumpChecksum(split -> checksum_)
926               << "] with count " << count << " action ["
927               << DumpAction(split -> action_) << "] state ["
928               << DumpState(split -> state_) << "]. Data size is "
929               << split -> data_.size() << " (" << split -> d_size_
930               << "/" << split -> c_size_ << "), " << split ->
931                  data_.size() - (split -> next_ + count)
932               << " to go.\n" << logofs_flush;
933       #endif
934 
935       cerr << "Error" << ": Invalid data count "
936            << count << "provided in the split.\n";
937 
938       HandleAbort();
939     }
940 
941     if (split -> state_ != split_loaded)
942     {
943       #ifdef TEST
944 
945       if (split -> next_ + count > split -> data_.size())
946       {
947         #ifdef PANIC
948         *logofs << "SplitStore: PANIC! Inconsistent split data size "
949                 << split -> data_.size() << " with expected size "
950                 << split -> r_size_ << ".\n"
951                << logofs_flush;
952         #endif
953 
954         HandleAbort();
955       }
956 
957       #endif
958 
959       memcpy(split -> data_.begin() + split -> next_,
960                  decodeBuffer.decodeMemory(count), count);
961     }
962     else
963     {
964       #ifdef TEST
965       *logofs << "SplitStore: WARNING! Data discarded with split "
966               << "loaded from disk.\n" << logofs_flush;
967       #endif
968 
969       decodeBuffer.decodeMemory(count);
970     }
971 
972     split -> next_ += count;
973   }
974 
975   //
976   // Is unsplit complete?
977   //
978 
979   if (split -> next_ != split -> r_size_)
980   {
981     return 0;
982   }
983 
984   //
985   // If the persistent cache is enabled,
986   // we have a valid checksum and the
987   // split was not originally retrieved
988   // from disk, save the message on disk.
989   //
990 
991   if (split -> state_ != split_loaded &&
992           split -> state_ != split_aborted)
993   {
994     save(split);
995   }
996 
997   //
998   // Move the split at the head of the
999   // list to the commits.
1000   //
1001 
1002   remove(split);
1003 
1004   //
1005   // Reset the current position to the
1006   // end of the repository.
1007   //
1008 
1009   current_ = splits_ -> end();
1010 
1011   #ifdef TEST
1012   *logofs << "SplitStore: Removed split at head of the list. "
1013           << "Resource is " << split -> resource_ << " request "
1014           << (unsigned) split -> store_ -> opcode() << " position "
1015           << split -> position_ << ".\n" << logofs_flush;
1016   #endif
1017 
1018   return 1;
1019 }
1020 
pop()1021 Split *SplitStore::pop()
1022 {
1023   if (splits_ -> size() == 0)
1024   {
1025     #ifdef TEST
1026     *logofs << "SplitStore: The split store is empty.\n"
1027             << logofs_flush;
1028     #endif
1029 
1030     return NULL;
1031   }
1032 
1033   //
1034   // Move the pointer at the end of the list.
1035   // The next send operation will eventually
1036   // start a new split.
1037   //
1038 
1039   current_ = splits_ -> end();
1040 
1041   Split *split = *(splits_ -> begin());
1042 
1043   splits_ -> pop_front();
1044 
1045   #ifdef TEST
1046   *logofs << "SplitStore: Removed split at the head of the "
1047           << "list with resource " << split -> resource_
1048           << " request " << (unsigned) split -> store_ ->
1049              opcode() << " position " << split -> position_
1050           << ".\n" << logofs_flush;
1051   #endif
1052 
1053   splitStorageSize_ -= getNodeSize(split);
1054 
1055   totalSplitSize_--;
1056 
1057   totalSplitStorageSize_ -= getNodeSize(split);
1058 
1059   #ifdef TEST
1060   *logofs << "SplitStore: There are " << splits_ -> size()
1061           << " messages in store [" << resource_ << "] with "
1062           << "storage size " << splitStorageSize_ << ".\n"
1063           << logofs_flush;
1064 
1065   *logofs << "SplitStore: Total messages in stores are "
1066           << totalSplitSize_ << " with total storage size "
1067           << totalSplitStorageSize_ << ".\n"
1068           << logofs_flush;
1069   #endif
1070 
1071   return split;
1072 }
1073 
remove(Split * split)1074 void SplitStore::remove(Split *split)
1075 {
1076   #ifdef TEST
1077   *logofs << "SplitStore: Going to remove the split from the list.\n"
1078           << logofs_flush;
1079   #endif
1080 
1081   #ifdef TEST
1082 
1083   if (split != getFirstSplit())
1084   {
1085     #ifdef PANIC
1086     *logofs << "SplitStore: PANIC! Trying to remove a split "
1087             << "not at the head of the list.\n"
1088             << logofs_flush;
1089     #endif
1090 
1091     cerr << "Error" << ": Trying to remove a split "
1092          << "not at the head of the list.\n";
1093 
1094     HandleAbort();
1095   }
1096 
1097   #endif
1098 
1099   //
1100   // Move the split to the commit store.
1101   //
1102 
1103   splits_ -> pop_front();
1104 
1105   commits_ -> splits_ -> push_back(split);
1106 
1107   splitStorageSize_ -= getNodeSize(split);
1108 
1109   totalSplitSize_--;
1110 
1111   totalSplitStorageSize_ -= getNodeSize(split);
1112 
1113   #ifdef TEST
1114   *logofs << "SplitStore: There are " << splits_ -> size()
1115           << " messages in store [" << resource_ << "] with "
1116           << "storage size " << splitStorageSize_ << ".\n"
1117           << logofs_flush;
1118 
1119   *logofs << "SplitStore: Total messages in stores are "
1120           << totalSplitSize_ << " with total storage size "
1121           << totalSplitStorageSize_ << ".\n"
1122           << logofs_flush;
1123   #endif
1124 
1125   #ifdef TEST
1126 
1127   if (splits_ -> size() == 0)
1128   {
1129     if (splitStorageSize_ != 0)
1130     {
1131       #ifdef PANIC
1132       *logofs << "SplitStore: PANIC! Internal error calculating "
1133               << "split data size. It is " << splitStorageSize_
1134               << " while should be 0.\n" << logofs_flush;
1135       #endif
1136 
1137       cerr << "Error" << ": Internal error calculating "
1138            << "split data size. It is " << splitStorageSize_
1139            << " while should be 0.\n";
1140 
1141       HandleAbort();
1142     }
1143   }
1144 
1145   #endif
1146 }
1147 
name(const T_checksum checksum)1148 const char *SplitStore::name(const T_checksum checksum)
1149 {
1150   if (checksum == NULL)
1151   {
1152     return NULL;
1153   }
1154 
1155   char *pathName = control -> ImageCachePath;
1156 
1157   if (pathName == NULL)
1158   {
1159     #ifdef PANIC
1160     *logofs << "SplitStore: PANIC! Cannot determine directory of "
1161             << "NX image files.\n" << logofs_flush;
1162     #endif
1163 
1164     return NULL;
1165   }
1166 
1167   int pathSize = strlen(pathName);
1168 
1169   //
1170   // File name is "[path][/I-c/I-][checksum][\0]",
1171   // where c is the first hex digit of checksum.
1172   //
1173 
1174   int nameSize = pathSize + 7 + MD5_LENGTH * 2 + 1;
1175 
1176   char *fileName = new char[nameSize];
1177 
1178   if (fileName == NULL)
1179   {
1180     #ifdef PANIC
1181     *logofs << "SplitStore: PANIC! Cannot allocate space for "
1182             << "NX image file name.\n" << logofs_flush;
1183     #endif
1184 
1185     return NULL;
1186   }
1187 
1188   strcpy(fileName, pathName);
1189 
1190   sprintf(fileName + pathSize, "/I-%1X/I-",
1191               *((unsigned char *) checksum) >> 4);
1192 
1193   for (unsigned int i = 0; i < MD5_LENGTH; i++)
1194   {
1195     sprintf(fileName + pathSize + 7 + (i * 2), "%02X",
1196                 ((unsigned char *) checksum)[i]);
1197   }
1198 
1199   return fileName;
1200 }
1201 
save(Split * split)1202 int SplitStore::save(Split *split)
1203 {
1204   //
1205   // Check if saving the message on the
1206   // persistent cache is enabled.
1207   //
1208 
1209   if (split -> save_ == 0)
1210   {
1211     return 0;
1212   }
1213 
1214   T_checksum checksum = split -> checksum_;
1215 
1216   const char *fileName = name(checksum);
1217 
1218   if (fileName == NULL)
1219   {
1220     return 0;
1221   }
1222 
1223   unsigned int splitSize;
1224 
1225   ostream *fileStream = NULL;
1226 
1227   unsigned char *fileHeader = NULL;
1228 
1229   //
1230   // Get the other data from the split.
1231   //
1232 
1233   unsigned char opcode = split -> store_ -> opcode();
1234 
1235   unsigned char *data = split -> data_.begin();
1236 
1237   int dataSize = split -> d_size_;
1238   int compressedSize = split -> c_size_;
1239 
1240   #ifdef DEBUG
1241   *logofs << "SplitStore: Going to save split OPCODE#"
1242           << (unsigned int) opcode << " to file '" << fileName
1243           << "' with size " << dataSize << " and compressed size "
1244           << compressedSize << ".\n" << logofs_flush;
1245   #endif
1246 
1247   DisableSignals();
1248 
1249   //
1250   // Change the mask to make the file only
1251   // readable by the user, then restore the
1252   // old mask.
1253   //
1254 
1255   mode_t fileMode;
1256 
1257   //
1258   // Check if the file already exists. We try to
1259   // load the message when the split is started
1260   // and save it only if it is not found. Still
1261   // the remote side may send the same image mul-
1262   // tiple time and we may not have the time to
1263   // notify the abort.
1264   //
1265 
1266   struct stat fileStat;
1267 
1268   if (stat(fileName, &fileStat) == 0)
1269   {
1270     #ifdef TEST
1271     *logofs << "SplitStore: Image file '" << fileName
1272             << "' already present on disk.\n"
1273             << logofs_flush;
1274     #endif
1275 
1276     goto SplitStoreSaveError;
1277   }
1278 
1279   fileMode = umask(0077);
1280 
1281   fileStream = new ofstream(fileName, ios::out | ios::binary);
1282 
1283   umask(fileMode);
1284 
1285   if (CheckData(fileStream) < 0)
1286   {
1287     #ifdef PANIC
1288     *logofs << "SplitStore: PANIC! Cannot open file '" << fileName
1289             << "' for output.\n" << logofs_flush;
1290     #endif
1291 
1292     goto SplitStoreSaveError;
1293   }
1294 
1295   fileHeader = new unsigned char[SPLIT_HEADER_SIZE];
1296 
1297   if (fileHeader == NULL)
1298   {
1299     #ifdef PANIC
1300     *logofs << "SplitStore: PANIC! Cannot allocate space for "
1301             << "NX image header.\n" << logofs_flush;
1302     #endif
1303 
1304     goto SplitStoreSaveError;
1305   }
1306 
1307   //
1308   // Leave 3 bytes for future use. Please note
1309   // that, on some CPUs, we can't use PutULONG()
1310   // to write integers that are not aligned to
1311   // the word boundary.
1312   //
1313 
1314   *fileHeader = opcode;
1315 
1316   *(fileHeader + 1) = 0;
1317   *(fileHeader + 2) = 0;
1318   *(fileHeader + 3) = 0;
1319 
1320   PutULONG(dataSize, fileHeader + 4, false);
1321   PutULONG(compressedSize, fileHeader + 8, false);
1322 
1323   splitSize = (compressedSize > 0 ? compressedSize : dataSize);
1324 
1325   if (PutData(fileStream, fileHeader, SPLIT_HEADER_SIZE) < 0 ||
1326           PutData(fileStream, data, splitSize) < 0)
1327   {
1328     #ifdef PANIC
1329     *logofs << "SplitStore: PANIC! Cannot write to NX "
1330             << "image file '" << fileName << "'.\n"
1331             << logofs_flush;
1332     #endif
1333 
1334     goto SplitStoreSaveError;
1335   }
1336 
1337   //
1338   // Check if all the data was written on the
1339   // disk and, if not, remove the faulty copy.
1340   //
1341 
1342   FlushData(fileStream);
1343 
1344   if (CheckData(fileStream) < 0)
1345   {
1346     #ifdef PANIC
1347     *logofs << "SplitStore: PANIC! Failed to write NX "
1348             << "image file '" << fileName << "'.\n"
1349             << logofs_flush;
1350     #endif
1351 
1352     cerr << "Warning" << ": Failed to write NX "
1353          << "image file '" << fileName << "'.\n";
1354 
1355     goto SplitStoreSaveError;
1356   }
1357 
1358   #ifdef TEST
1359   *logofs << "SplitStore: Saved split to file '" << fileName
1360           << "' with data size " << dataSize << " and "
1361           << "compressed data size " << compressedSize
1362           << ".\n" << logofs_flush;
1363   #endif
1364 
1365   delete fileStream;
1366 
1367   delete [] fileName;
1368   delete [] fileHeader;
1369 
1370   EnableSignals();
1371 
1372   //
1373   // Update the timestamp as the operation
1374   // may have taken some time.
1375   //
1376 
1377   getNewTimestamp();
1378 
1379   return 1;
1380 
1381 SplitStoreSaveError:
1382 
1383   delete fileStream;
1384 
1385   if (fileName != NULL)
1386   {
1387     unlink(fileName);
1388   }
1389 
1390   delete [] fileName;
1391   delete [] fileHeader;
1392 
1393   EnableSignals();
1394 
1395   return -1;
1396 }
1397 
find(Split * split)1398 int SplitStore::find(Split *split)
1399 {
1400   const char *fileName = name(split -> checksum_);
1401 
1402   if (fileName == NULL)
1403   {
1404     return 0;
1405   }
1406 
1407   #ifdef DEBUG
1408   *logofs << "SplitStore: Going to find split OPCODE#"
1409           << (unsigned) split -> store_ -> opcode()
1410           << " in file '" << fileName << "'.\n"
1411           << logofs_flush;
1412   #endif
1413 
1414   //
1415   // Check if the file exists and, at the
1416   // same time, update the modification
1417   // time to prevent its deletion.
1418   //
1419 
1420   if (utime(fileName, NULL) == 0)
1421   {
1422     #ifdef TEST
1423     *logofs << "SplitStore: Found split OPCODE#"
1424             << (unsigned) split -> store_ -> opcode()
1425             << " in file '" << fileName << "'.\n"
1426             << logofs_flush;
1427     #endif
1428 
1429     delete [] fileName;
1430 
1431     return 1;
1432   }
1433 
1434   #ifdef TEST
1435   *logofs << "SplitStore: WARNING! Can't find split "
1436           << "OPCODE#" << (unsigned) split -> store_ ->
1437              opcode() << " in file '" << fileName
1438           << "'.\n" << logofs_flush;
1439   #endif
1440 
1441   delete [] fileName;
1442 
1443   return 0;
1444 }
1445 
load(Split * split)1446 int SplitStore::load(Split *split)
1447 {
1448   //
1449   // Check if loading the image is enabled.
1450   //
1451 
1452   if (split -> load_ == 0)
1453   {
1454     return 0;
1455   }
1456 
1457   const char *fileName = name(split -> checksum_);
1458 
1459   if (fileName == NULL)
1460   {
1461     return 0;
1462   }
1463 
1464   unsigned char fileOpcode;
1465 
1466   int fileSize;
1467   int fileCSize;
1468 
1469   istream *fileStream = NULL;
1470 
1471   unsigned char *fileHeader = NULL;
1472 
1473   DisableSignals();
1474 
1475   #ifdef DEBUG
1476   *logofs << "SplitStore: Going to load split OPCODE#"
1477           << (unsigned int) split -> store_ -> opcode()
1478           << " from file '" << fileName << "'.\n"
1479           << logofs_flush;
1480   #endif
1481 
1482   fileStream = new ifstream(fileName, ios::in | ios::binary);
1483 
1484   if (CheckData(fileStream) < 0)
1485   {
1486     #ifdef TEST
1487     *logofs << "SplitStore: WARNING! Can't open image file '"
1488             << fileName  << "' on disk.\n" << logofs_flush;
1489     #endif
1490 
1491     goto SplitStoreLoadError;
1492   }
1493 
1494   fileHeader = new unsigned char[SPLIT_HEADER_SIZE];
1495 
1496   if (fileHeader == NULL)
1497   {
1498     #ifdef PANIC
1499     *logofs << "SplitStore: PANIC! Cannot allocate space for "
1500             << "NX image header.\n" << logofs_flush;
1501     #endif
1502 
1503     cerr << "Error" << ": Cannot allocate space for "
1504          << "NX image header.\n";
1505 
1506     goto SplitStoreLoadError;
1507   }
1508 
1509   if (GetData(fileStream, fileHeader, SPLIT_HEADER_SIZE) < 0)
1510   {
1511     #ifdef PANIC
1512     *logofs << "SplitStore: PANIC! Cannot read header from "
1513             << "NX image file '" << fileName << "'.\n"
1514             << logofs_flush;
1515     #endif
1516 
1517     cerr << "Warning" << ": Cannot read header from "
1518          << "NX image file '" << fileName << "'.\n";
1519 
1520     goto SplitStoreLoadError;
1521   }
1522 
1523   fileOpcode = *fileHeader;
1524 
1525   fileSize  = GetULONG(fileHeader + 4, false);
1526   fileCSize = GetULONG(fileHeader + 8, false);
1527 
1528   //
1529   // Don't complain if we find that data was saved
1530   // in compressed form even if we were not aware
1531   // of the compressed data size. The remote side
1532   // compresses the data only at the time it starts
1533   // the transferral of the split. We replace our
1534   // copy of the data with whatever we find on the
1535   // disk.
1536   //
1537 
1538   if (fileOpcode != split -> store_ -> opcode() ||
1539           fileSize != split -> d_size_ ||
1540               fileSize > control -> MaximumRequestSize ||
1541                   fileCSize > control -> MaximumRequestSize)
1542 
1543   {
1544     #ifdef TEST
1545     *logofs << "SplitStore: PANIC! Corrupted image file '" << fileName
1546             << "'. Expected " << (unsigned int) split -> store_ -> opcode()
1547             << "/" << split -> d_size_ << "/" << split -> c_size_ << " found "
1548             << (unsigned int) fileOpcode << "/" << fileSize << "/"
1549             << fileCSize << ".\n" << logofs_flush;
1550     #endif
1551 
1552     cerr << "Warning" << ": Corrupted image file '" << fileName
1553          << "'. Expected " << (unsigned int) split -> store_ -> opcode()
1554          << "/" << split -> d_size_ << "/" << split -> c_size_ << " found "
1555          << (unsigned int) fileOpcode << "/" << fileSize << "/"
1556          << fileCSize << ".\n";
1557 
1558     goto SplitStoreLoadError;
1559   }
1560 
1561   //
1562   // Update the data size with the size
1563   // we got from the disk record.
1564   //
1565 
1566   split -> d_size_ = fileSize;
1567   split -> c_size_ = fileCSize;
1568 
1569   unsigned int splitSize;
1570 
1571   if (fileCSize > 0)
1572   {
1573     splitSize = fileCSize;
1574   }
1575   else
1576   {
1577     splitSize = fileSize;
1578   }
1579 
1580   //
1581   // Allocate a new buffer if we didn't
1582   // do that already or if the size is
1583   // different.
1584   //
1585 
1586   if (split -> data_.size() != splitSize)
1587   {
1588     split -> data_.clear();
1589 
1590     split -> data_.resize(splitSize);
1591   }
1592 
1593   if (GetData(fileStream, split -> data_.begin(), splitSize) < 0)
1594   {
1595     #ifdef PANIC
1596     *logofs << "SplitStore: PANIC! Cannot read data from "
1597             << "NX image file '" << fileName << "'.\n"
1598             << logofs_flush;
1599     #endif
1600 
1601     cerr << "Warning" << ": Cannot read data from "
1602          << "NX image file '" << fileName << "'.\n";
1603 
1604     goto SplitStoreLoadError;
1605   }
1606 
1607   delete fileStream;
1608 
1609   delete [] fileHeader;
1610   delete [] fileName;
1611 
1612   EnableSignals();
1613 
1614   //
1615   // Update the timestamp as the operation
1616   // may have taken some time.
1617   //
1618 
1619   getNewTimestamp();
1620 
1621   return 1;
1622 
1623 SplitStoreLoadError:
1624 
1625   delete fileStream;
1626 
1627   unlink(fileName);
1628 
1629   delete [] fileName;
1630   delete [] fileHeader;
1631 
1632   EnableSignals();
1633 
1634   return -1;
1635 }
1636 
pop()1637 Split *CommitStore::pop()
1638 {
1639   if (splits_ -> size() == 0)
1640   {
1641     #ifdef TEST
1642     *logofs << "CommitStore: The commit store is empty.\n"
1643             << logofs_flush;
1644     #endif
1645 
1646     return NULL;
1647   }
1648 
1649   Split *split = *(splits_ -> begin());
1650 
1651   splits_ -> pop_front();
1652 
1653   #ifdef TEST
1654   *logofs << "CommitStore: Removed commit split at the head "
1655           << "of the list with resource " << split -> resource_
1656           << " request " << (unsigned) split -> store_ ->
1657              opcode() << " position " << split -> position_
1658           << ".\n" << logofs_flush;
1659   #endif
1660 
1661   return split;
1662 }
1663 
expand(Split * split,unsigned char * buffer,const int size)1664 int CommitStore::expand(Split *split, unsigned char *buffer, const int size)
1665 {
1666   #ifdef TEST
1667   *logofs << "CommitStore: Expanding split data with "
1668           << size << " bytes to write.\n"
1669           << logofs_flush;
1670   #endif
1671 
1672   #ifdef TEST
1673 
1674   if (size < split -> i_size_ + split -> d_size_)
1675   {
1676     #ifdef PANIC
1677     *logofs << "CommitStore: PANIC! Wrong size of the provided "
1678             << "buffer. It should be " << split -> i_size_ +
1679                split -> d_size_ << " instead of " << size
1680             << ".\n" << logofs_flush;
1681     #endif
1682 
1683     cerr << "Error" << ": Wrong size of the provided "
1684          << "buffer. It should be " << split -> i_size_ +
1685             split -> d_size_ << " instead of " << size
1686          << ".\n";
1687 
1688     HandleAbort();
1689   }
1690 
1691   #endif
1692 
1693   #ifdef DEBUG
1694   *logofs << "CommitStore: Copying " << split -> i_size_
1695           << " bytes of identity.\n" << logofs_flush;
1696   #endif
1697 
1698   memcpy(buffer, split -> identity_.begin(), split -> i_size_);
1699 
1700   //
1701   // Copy data, if any, to the buffer.
1702   //
1703 
1704   if (size > split -> i_size_)
1705   {
1706     //
1707     // Check if message has been stored
1708     // in compressed format.
1709     //
1710 
1711     if (split -> c_size_ == 0)
1712     {
1713       #ifdef DEBUG
1714       *logofs << "CommitStore: Copying " << split -> d_size_
1715               << " bytes of plain data.\n" << logofs_flush;
1716       #endif
1717 
1718       memcpy(buffer + split -> i_size_, split -> data_.begin(), split -> d_size_);
1719     }
1720     else
1721     {
1722       #ifdef DEBUG
1723       *logofs << "CommitStore: Decompressing " << split -> c_size_
1724               << " bytes and copying " << split -> d_size_
1725               << " bytes of data.\n" << logofs_flush;
1726       #endif
1727 
1728       if (compressor_ ->
1729               decompressBuffer(buffer + split -> i_size_,
1730                                    split -> d_size_, split -> data_.begin(),
1731                                        split -> c_size_) < 0)
1732       {
1733         #ifdef PANIC
1734         *logofs << "CommitStore: PANIC! Split data decompression failed.\n"
1735                 << logofs_flush;
1736         #endif
1737 
1738         cerr << "Error" << ": Split data decompression failed.\n";
1739 
1740         return -1;
1741       }
1742     }
1743   }
1744 
1745   return 1;
1746 }
1747 
update(Split * split)1748 int CommitStore::update(Split *split)
1749 {
1750   if (split -> action_ != IS_ADDED)
1751   {
1752     return 0;
1753   }
1754 
1755   //
1756   // We don't need the identity data at
1757   // the encoding side.
1758   //
1759 
1760   if (split -> identity_.size() == 0)
1761   {
1762     #ifdef TEST
1763     *logofs << "SplitStore: Going to update the size "
1764             << "for object at position " << split -> position_
1765             << " with data size " << split -> d_size_
1766             << " and compressed data size " << split ->
1767                c_size_ << ".\n" << logofs_flush;
1768     #endif
1769 
1770     split -> store_ -> updateData(split -> position_, split -> d_size_,
1771                                       split -> c_size_);
1772   }
1773   else
1774   {
1775     #ifdef TEST
1776     *logofs << "SplitStore: Going to update data and size "
1777             << "for object at position " << split -> position_
1778             << " with data size " << split -> d_size_
1779             << " and compressed data size " << split ->
1780                c_size_ << ".\n" << logofs_flush;
1781     #endif
1782 
1783     split -> store_ -> updateData(split -> position_, split -> data_.begin(),
1784                                       split -> d_size_, split -> c_size_);
1785   }
1786 
1787   //
1788   // Unlock message so that we can remove
1789   // or save it on disk at shutdown.
1790   //
1791 
1792   if (split -> action_ == IS_ADDED)
1793   {
1794     split -> store_ -> unlock(split -> position_);
1795 
1796     #ifdef TEST
1797 
1798     validate(split);
1799 
1800     #endif
1801   }
1802 
1803   return 1;
1804 }
1805 
validate(Split * split)1806 int CommitStore::validate(Split *split)
1807 {
1808   MessageStore *store = split -> store_;
1809 
1810   int p, n, s;
1811 
1812   s = store -> cacheSlots;
1813 
1814   for (p = 0, n = 0; p < s; p++)
1815   {
1816     if (store -> getLocks(p) == 1)
1817     {
1818       n++;
1819     }
1820     else if (store -> getLocks(p) != 0)
1821     {
1822       #ifdef PANIC
1823       *logofs << "CommitStore: PANIC! Repository for OPCODE#"
1824               << (unsigned int) store -> opcode() << " has "
1825               << store -> getLocks(p) << " locks for message "
1826               << "at position " << p << ".\n" << logofs_flush;
1827       #endif
1828 
1829       cerr << "Error" << ": Repository for OPCODE#"
1830            << (unsigned int) store -> opcode() << " has "
1831            << store -> getLocks(p) << " locks for message "
1832            << "at position " << p << ".\n";
1833 
1834       HandleAbort();
1835     }
1836   }
1837 
1838   #ifdef TEST
1839   *logofs << "CommitStore: Repository for OPCODE#"
1840           << (unsigned int) store -> opcode()
1841           << " has " << n << " locked messages.\n"
1842           << logofs_flush;
1843   #endif
1844 
1845   return 1;
1846 }
1847