1 /*
2 Copyright (c) 2018 Contributors as noted in the AUTHORS file
3 
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10 
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20 
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25 
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_GENERIC_MTRIE_IMPL_HPP_INCLUDED__
31 #define __ZMQ_GENERIC_MTRIE_IMPL_HPP_INCLUDED__
32 
33 
34 #include <stdlib.h>
35 
36 #include <new>
37 #include <algorithm>
38 #include <list>
39 
40 #include "err.hpp"
41 #include "macros.hpp"
42 #include "generic_mtrie.hpp"
43 
44 namespace zmq
45 {
46 template <typename T>
generic_mtrie_t()47 generic_mtrie_t<T>::generic_mtrie_t () :
48     _pipes (0),
49     _min (0),
50     _count (0),
51     _live_nodes (0)
52 {
53 }
54 
~generic_mtrie_t()55 template <typename T> generic_mtrie_t<T>::~generic_mtrie_t ()
56 {
57     LIBZMQ_DELETE (_pipes);
58 
59     if (_count == 1) {
60         zmq_assert (_next.node);
61         LIBZMQ_DELETE (_next.node);
62     } else if (_count > 1) {
63         for (unsigned short i = 0; i != _count; ++i) {
64             LIBZMQ_DELETE (_next.table[i]);
65         }
66         free (_next.table);
67     }
68 }
69 
70 template <typename T>
add(prefix_t prefix_,size_t size_,value_t * pipe_)71 bool generic_mtrie_t<T>::add (prefix_t prefix_, size_t size_, value_t *pipe_)
72 {
73     generic_mtrie_t<value_t> *it = this;
74 
75     while (size_) {
76         const unsigned char c = *prefix_;
77 
78         if (c < it->_min || c >= it->_min + it->_count) {
79             //  The character is out of range of currently handled
80             //  characters. We have to extend the table.
81             if (!it->_count) {
82                 it->_min = c;
83                 it->_count = 1;
84                 it->_next.node = NULL;
85             } else if (it->_count == 1) {
86                 const unsigned char oldc = it->_min;
87                 generic_mtrie_t *oldp = it->_next.node;
88                 it->_count = (it->_min < c ? c - it->_min : it->_min - c) + 1;
89                 it->_next.table = static_cast<generic_mtrie_t **> (
90                   malloc (sizeof (generic_mtrie_t *) * it->_count));
91                 alloc_assert (it->_next.table);
92                 for (unsigned short i = 0; i != it->_count; ++i)
93                     it->_next.table[i] = 0;
94                 it->_min = std::min (it->_min, c);
95                 it->_next.table[oldc - it->_min] = oldp;
96             } else if (it->_min < c) {
97                 //  The new character is above the current character range.
98                 const unsigned short old_count = it->_count;
99                 it->_count = c - it->_min + 1;
100                 it->_next.table = static_cast<generic_mtrie_t **> (realloc (
101                   it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
102                 alloc_assert (it->_next.table);
103                 for (unsigned short i = old_count; i != it->_count; i++)
104                     it->_next.table[i] = NULL;
105             } else {
106                 //  The new character is below the current character range.
107                 const unsigned short old_count = it->_count;
108                 it->_count = (it->_min + old_count) - c;
109                 it->_next.table = static_cast<generic_mtrie_t **> (realloc (
110                   it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
111                 alloc_assert (it->_next.table);
112                 memmove (it->_next.table + it->_min - c, it->_next.table,
113                          old_count * sizeof (generic_mtrie_t *));
114                 for (unsigned short i = 0; i != it->_min - c; i++)
115                     it->_next.table[i] = NULL;
116                 it->_min = c;
117             }
118         }
119 
120         //  If next node does not exist, create one.
121         if (it->_count == 1) {
122             if (!it->_next.node) {
123                 it->_next.node = new (std::nothrow) generic_mtrie_t;
124                 alloc_assert (it->_next.node);
125                 ++(it->_live_nodes);
126             }
127 
128             ++prefix_;
129             --size_;
130             it = it->_next.node;
131         } else {
132             if (!it->_next.table[c - it->_min]) {
133                 it->_next.table[c - it->_min] =
134                   new (std::nothrow) generic_mtrie_t;
135                 alloc_assert (it->_next.table[c - it->_min]);
136                 ++(it->_live_nodes);
137             }
138 
139             ++prefix_;
140             --size_;
141             it = it->_next.table[c - it->_min];
142         }
143     }
144 
145     //  We are at the node corresponding to the prefix. We are done.
146     const bool result = !it->_pipes;
147     if (!it->_pipes) {
148         it->_pipes = new (std::nothrow) pipes_t;
149         alloc_assert (it->_pipes);
150     }
151     it->_pipes->insert (pipe_);
152 
153     return result;
154 }
155 
156 template <typename T>
157 template <typename Arg>
rm(value_t * pipe_,void (* func_)(prefix_t data_,size_t size_,Arg arg_),Arg arg_,bool call_on_uniq_)158 void generic_mtrie_t<T>::rm (value_t *pipe_,
159                              void (*func_) (prefix_t data_,
160                                             size_t size_,
161                                             Arg arg_),
162                              Arg arg_,
163                              bool call_on_uniq_)
164 {
165     //  This used to be implemented as a non-tail recursive travesal of the trie,
166     //  which means remote clients controlled the depth of the recursion and the
167     //  stack size.
168     //  To simulate the non-tail recursion, with post-recursion changes depending on
169     //  the result of the recursive call, a stack is used to re-visit the same node
170     //  and operate on it again after children have been visisted.
171     //  A boolean is used to record whether the node had already been visited and to
172     //  determine if the pre- or post- children visit actions have to be taken.
173     //  In the case of a node with (N > 1) children, the node has to be re-visited
174     //  N times, in the correct order after each child visit.
175     std::list<struct iter> stack;
176     unsigned char *buff = NULL;
177     size_t maxbuffsize = 0;
178     struct iter it = {this, NULL, NULL, 0, 0, 0, 0, false};
179     stack.push_back (it);
180 
181     while (!stack.empty ()) {
182         it = stack.back ();
183         stack.pop_back ();
184 
185         if (!it.processed_for_removal) {
186             //  Remove the subscription from this node.
187             if (it.node->_pipes && it.node->_pipes->erase (pipe_)) {
188                 if (!call_on_uniq_ || it.node->_pipes->empty ()) {
189                     func_ (buff, it.size, arg_);
190                 }
191 
192                 if (it.node->_pipes->empty ()) {
193                     LIBZMQ_DELETE (it.node->_pipes);
194                 }
195             }
196 
197             //  Adjust the buffer.
198             if (it.size >= maxbuffsize) {
199                 maxbuffsize = it.size + 256;
200                 buff =
201                   static_cast<unsigned char *> (realloc (buff, maxbuffsize));
202                 alloc_assert (buff);
203             }
204 
205             switch (it.node->_count) {
206                 case 0:
207                     //  If there are no subnodes in the trie, we are done with this node
208                     //  pre-processing.
209                     break;
210                 case 1: {
211                     //  If there's one subnode (optimisation).
212 
213                     buff[it.size] = it.node->_min;
214                     //  Mark this node as pre-processed and push it, so that the next
215                     //  visit after the operation on the child can do the removals.
216                     it.processed_for_removal = true;
217                     stack.push_back (it);
218                     struct iter next = {it.node->_next.node,
219                                         NULL,
220                                         NULL,
221                                         ++it.size,
222                                         0,
223                                         0,
224                                         0,
225                                         false};
226                     stack.push_back (next);
227                     break;
228                 }
229                 default: {
230                     //  If there are multiple subnodes.
231                     //  When first visiting this node, initialize the new_min/max parameters
232                     //  which will then be used after each child has been processed, on the
233                     //  post-children iterations.
234                     if (it.current_child == 0) {
235                         //  New min non-null character in the node table after the removal
236                         it.new_min = it.node->_min + it.node->_count - 1;
237                         //  New max non-null character in the node table after the removal
238                         it.new_max = it.node->_min;
239                     }
240 
241                     //  Mark this node as pre-processed and push it, so that the next
242                     //  visit after the operation on the child can do the removals.
243                     buff[it.size] = it.node->_min + it.current_child;
244                     it.processed_for_removal = true;
245                     stack.push_back (it);
246                     if (it.node->_next.table[it.current_child]) {
247                         struct iter next = {
248                           it.node->_next.table[it.current_child],
249                           NULL,
250                           NULL,
251                           it.size + 1,
252                           0,
253                           0,
254                           0,
255                           false};
256                         stack.push_back (next);
257                     }
258                 }
259             }
260         } else {
261             //  Reset back for the next time, in case this node doesn't get deleted.
262             //  This is done unconditionally, unlike when setting this variable to true.
263             it.processed_for_removal = false;
264 
265             switch (it.node->_count) {
266                 case 0:
267                     //  If there are no subnodes in the trie, we are done with this node
268                     //  post-processing.
269                     break;
270                 case 1:
271                     //  If there's one subnode (optimisation).
272 
273                     //  Prune the node if it was made redundant by the removal
274                     if (it.node->_next.node->is_redundant ()) {
275                         LIBZMQ_DELETE (it.node->_next.node);
276                         it.node->_count = 0;
277                         --it.node->_live_nodes;
278                         zmq_assert (it.node->_live_nodes == 0);
279                     }
280                     break;
281                 default:
282                     //  If there are multiple subnodes.
283                     {
284                         if (it.node->_next.table[it.current_child]) {
285                             //  Prune redundant nodes from the mtrie
286                             if (it.node->_next.table[it.current_child]
287                                   ->is_redundant ()) {
288                                 LIBZMQ_DELETE (
289                                   it.node->_next.table[it.current_child]);
290 
291                                 zmq_assert (it.node->_live_nodes > 0);
292                                 --it.node->_live_nodes;
293                             } else {
294                                 //  The node is not redundant, so it's a candidate for being
295                                 //  the new min/max node.
296                                 //
297                                 //  We loop through the node array from left to right, so the
298                                 //  first non-null, non-redundant node encountered is the new
299                                 //  minimum index. Conversely, the last non-redundant, non-null
300                                 //  node encountered is the new maximum index.
301                                 if (it.current_child + it.node->_min
302                                     < it.new_min)
303                                     it.new_min =
304                                       it.current_child + it.node->_min;
305                                 if (it.current_child + it.node->_min
306                                     > it.new_max)
307                                     it.new_max =
308                                       it.current_child + it.node->_min;
309                             }
310                         }
311 
312                         //  If there are more children to visit, push again the current
313                         //  node, so that pre-processing can happen on the next child.
314                         //  If we are done, reset the child index so that the ::rm is
315                         //  fully idempotent.
316                         ++it.current_child;
317                         if (it.current_child >= it.node->_count)
318                             it.current_child = 0;
319                         else {
320                             stack.push_back (it);
321                             continue;
322                         }
323 
324                         //  All children have been visited and removed if needed, and
325                         //  all pre- and post-visit operations have been carried.
326                         //  Resize/free the node table if needed.
327                         zmq_assert (it.node->_count > 1);
328 
329                         //  Free the node table if it's no longer used.
330                         switch (it.node->_live_nodes) {
331                             case 0:
332                                 free (it.node->_next.table);
333                                 it.node->_next.table = NULL;
334                                 it.node->_count = 0;
335                                 break;
336                             case 1:
337                                 //  Compact the node table if possible
338 
339                                 //  If there's only one live node in the table we can
340                                 //  switch to using the more compact single-node
341                                 //  representation
342                                 zmq_assert (it.new_min == it.new_max);
343                                 zmq_assert (it.new_min >= it.node->_min);
344                                 zmq_assert (it.new_min
345                                             < it.node->_min + it.node->_count);
346                                 {
347                                     generic_mtrie_t *node =
348                                       it.node->_next
349                                         .table[it.new_min - it.node->_min];
350                                     zmq_assert (node);
351                                     free (it.node->_next.table);
352                                     it.node->_next.node = node;
353                                 }
354                                 it.node->_count = 1;
355                                 it.node->_min = it.new_min;
356                                 break;
357                             default:
358                                 if (it.new_min > it.node->_min
359                                     || it.new_max < it.node->_min
360                                                       + it.node->_count - 1) {
361                                     zmq_assert (it.new_max - it.new_min + 1
362                                                 > 1);
363 
364                                     generic_mtrie_t **old_table =
365                                       it.node->_next.table;
366                                     zmq_assert (it.new_min > it.node->_min
367                                                 || it.new_max
368                                                      < it.node->_min
369                                                          + it.node->_count - 1);
370                                     zmq_assert (it.new_min >= it.node->_min);
371                                     zmq_assert (it.new_max
372                                                 <= it.node->_min
373                                                      + it.node->_count - 1);
374                                     zmq_assert (it.new_max - it.new_min + 1
375                                                 < it.node->_count);
376 
377                                     it.node->_count =
378                                       it.new_max - it.new_min + 1;
379                                     it.node->_next.table =
380                                       static_cast<generic_mtrie_t **> (
381                                         malloc (sizeof (generic_mtrie_t *)
382                                                 * it.node->_count));
383                                     alloc_assert (it.node->_next.table);
384 
385                                     memmove (it.node->_next.table,
386                                              old_table
387                                                + (it.new_min - it.node->_min),
388                                              sizeof (generic_mtrie_t *)
389                                                * it.node->_count);
390                                     free (old_table);
391 
392                                     it.node->_min = it.new_min;
393                                 }
394                         }
395                     }
396             }
397         }
398     }
399 
400     free (buff);
401 }
402 
403 template <typename T>
404 typename generic_mtrie_t<T>::rm_result
rm(prefix_t prefix_,size_t size_,value_t * pipe_)405 generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
406 {
407     //  This used to be implemented as a non-tail recursive travesal of the trie,
408     //  which means remote clients controlled the depth of the recursion and the
409     //  stack size.
410     //  To simulate the non-tail recursion, with post-recursion changes depending on
411     //  the result of the recursive call, a stack is used to re-visit the same node
412     //  and operate on it again after children have been visisted.
413     //  A boolean is used to record whether the node had already been visited and to
414     //  determine if the pre- or post- children visit actions have to be taken.
415     rm_result ret = not_found;
416     std::list<struct iter> stack;
417     struct iter it = {this, NULL, prefix_, size_, 0, 0, 0, false};
418     stack.push_back (it);
419 
420     while (!stack.empty ()) {
421         it = stack.back ();
422         stack.pop_back ();
423 
424         if (!it.processed_for_removal) {
425             if (!it.size) {
426                 if (!it.node->_pipes) {
427                     ret = not_found;
428                     continue;
429                 }
430 
431                 typename pipes_t::size_type erased =
432                   it.node->_pipes->erase (pipe_);
433                 if (it.node->_pipes->empty ()) {
434                     zmq_assert (erased == 1);
435                     LIBZMQ_DELETE (it.node->_pipes);
436                     ret = last_value_removed;
437                     continue;
438                 }
439 
440                 ret = (erased == 1) ? values_remain : not_found;
441                 continue;
442             }
443 
444             it.current_child = *it.prefix;
445             if (!it.node->_count || it.current_child < it.node->_min
446                 || it.current_child >= it.node->_min + it.node->_count) {
447                 ret = not_found;
448                 continue;
449             }
450 
451             it.next_node =
452               it.node->_count == 1
453                 ? it.node->_next.node
454                 : it.node->_next.table[it.current_child - it.node->_min];
455             if (!it.next_node) {
456                 ret = not_found;
457                 continue;
458             }
459 
460             it.processed_for_removal = true;
461             stack.push_back (it);
462             struct iter next = {
463               it.next_node, NULL, it.prefix + 1, it.size - 1, 0, 0, 0, false};
464             stack.push_back (next);
465         } else {
466             it.processed_for_removal = false;
467 
468             if (it.next_node->is_redundant ()) {
469                 LIBZMQ_DELETE (it.next_node);
470                 zmq_assert (it.node->_count > 0);
471 
472                 if (it.node->_count == 1) {
473                     it.node->_next.node = NULL;
474                     it.node->_count = 0;
475                     --it.node->_live_nodes;
476                     zmq_assert (it.node->_live_nodes == 0);
477                 } else {
478                     it.node->_next.table[it.current_child - it.node->_min] = 0;
479                     zmq_assert (it.node->_live_nodes > 1);
480                     --it.node->_live_nodes;
481 
482                     //  Compact the table if possible
483                     if (it.node->_live_nodes == 1) {
484                         //  If there's only one live node in the table we can
485                         //  switch to using the more compact single-node
486                         //  representation
487                         unsigned short i;
488                         for (i = 0; i < it.node->_count; ++i)
489                             if (it.node->_next.table[i])
490                                 break;
491 
492                         zmq_assert (i < it.node->_count);
493                         it.node->_min += i;
494                         it.node->_count = 1;
495                         generic_mtrie_t *oldp = it.node->_next.table[i];
496                         free (it.node->_next.table);
497                         it.node->_next.table = NULL;
498                         it.node->_next.node = oldp;
499                     } else if (it.current_child == it.node->_min) {
500                         //  We can compact the table "from the left"
501                         unsigned short i;
502                         for (i = 1; i < it.node->_count; ++i)
503                             if (it.node->_next.table[i])
504                                 break;
505 
506                         zmq_assert (i < it.node->_count);
507                         it.node->_min += i;
508                         it.node->_count -= i;
509                         generic_mtrie_t **old_table = it.node->_next.table;
510                         it.node->_next.table =
511                           static_cast<generic_mtrie_t **> (malloc (
512                             sizeof (generic_mtrie_t *) * it.node->_count));
513                         alloc_assert (it.node->_next.table);
514                         memmove (it.node->_next.table, old_table + i,
515                                  sizeof (generic_mtrie_t *) * it.node->_count);
516                         free (old_table);
517                     } else if (it.current_child
518                                == it.node->_min + it.node->_count - 1) {
519                         //  We can compact the table "from the right"
520                         unsigned short i;
521                         for (i = 1; i < it.node->_count; ++i)
522                             if (it.node->_next.table[it.node->_count - 1 - i])
523                                 break;
524 
525                         zmq_assert (i < it.node->_count);
526                         it.node->_count -= i;
527                         generic_mtrie_t **old_table = it.node->_next.table;
528                         it.node->_next.table =
529                           static_cast<generic_mtrie_t **> (malloc (
530                             sizeof (generic_mtrie_t *) * it.node->_count));
531                         alloc_assert (it.node->_next.table);
532                         memmove (it.node->_next.table, old_table,
533                                  sizeof (generic_mtrie_t *) * it.node->_count);
534                         free (old_table);
535                     }
536                 }
537             }
538         }
539     }
540 
541     return ret;
542 }
543 
544 template <typename T>
545 template <typename Arg>
match(prefix_t data_,size_t size_,void (* func_)(value_t * pipe_,Arg arg_),Arg arg_)546 void generic_mtrie_t<T>::match (prefix_t data_,
547                                 size_t size_,
548                                 void (*func_) (value_t *pipe_, Arg arg_),
549                                 Arg arg_)
550 {
551     for (generic_mtrie_t *current = this; current; data_++, size_--) {
552         //  Signal the pipes attached to this node.
553         if (current->_pipes) {
554             for (typename pipes_t::iterator it = current->_pipes->begin (),
555                                             end = current->_pipes->end ();
556                  it != end; ++it) {
557                 func_ (*it, arg_);
558             }
559         }
560 
561         //  If we are at the end of the message, there's nothing more to match.
562         if (!size_)
563             break;
564 
565         //  If there are no subnodes in the trie, return.
566         if (current->_count == 0)
567             break;
568 
569         if (current->_count == 1) {
570             //  If there's one subnode (optimisation).
571             if (data_[0] != current->_min) {
572                 break;
573             }
574             current = current->_next.node;
575         } else {
576             //  If there are multiple subnodes.
577             if (data_[0] < current->_min
578                 || data_[0] >= current->_min + current->_count) {
579                 break;
580             }
581             current = current->_next.table[data_[0] - current->_min];
582         }
583     }
584 }
585 
is_redundant() const586 template <typename T> bool generic_mtrie_t<T>::is_redundant () const
587 {
588     return !_pipes && _live_nodes == 0;
589 }
590 }
591 
592 
593 #endif
594