1 /*
2  * Copyright (C) 2008-2020 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 #include "gcs_defrag.hpp"
8 
9 #include <errno.h>
10 #include <unistd.h>
11 #include <string.h>
12 
13 #define DF_ALLOC()                                              \
14     do {                                                        \
15         df->head = static_cast<uint8_t*>(gcs_gcache_malloc(df->cache,df->size));\
16                                                                 \
17         if(gu_likely(df->head != NULL))                         \
18             df->tail = df->head;                                \
19         else {                                                  \
20             gu_error ("Could not allocate memory for new "      \
21                       "action of size: %zd", df->size);         \
22             return -ENOMEM;                                     \
23         }                                                       \
24     } while (0)
25 
26 /*!
27  * Handle action fragment
28  *
29  * Unless a whole action is returned, contents of act is undefined
30  *
31  * In order to optimize branch prediction used gu_likely macros and odered and
32  * nested if/else blocks according to branch probability.
33  *
34  * @return 0              - success,
35  *         size of action - success, full action received,
36  *         negative       - error.
37  *
38  * TODO: this function is too long, figure out a way to factor it into several
39  *       smaller ones. Note that it is called for every GCS_MSG_ACTION message
40  *       so it should be optimal.
41  */
42 ssize_t
gcs_defrag_handle_frag(gcs_defrag_t * df,const gcs_act_frag_t * frg,struct gcs_act * act,bool local)43 gcs_defrag_handle_frag (gcs_defrag_t*         df,
44                         const gcs_act_frag_t* frg,
45                         struct gcs_act*       act,
46                         bool                  local)
47 {
48     if (df->received) {
49         /* another fragment of existing action */
50 
51         df->frag_no++;
52 
53         /* detect possible error condition */
54         if (gu_unlikely((df->sent_id != frg->act_id) ||
55                         (df->frag_no != frg->frag_no))) {
56             if (local && df->reset &&
57                 (df->sent_id == frg->act_id) && (0 == frg->frag_no)) {
58                 /* df->sent_id was aborted halfway and is being taken care of
59                  * by the sender thread. Forget about it.
60                  * Reinit counters and continue with the new action. */
61                 gu_debug ("Local action %lld, size %ld reset.",
62                           frg->act_id, frg->act_size);
63                 df->frag_no  = 0;
64                 df->received = 0;
65                 df->tail     = df->head;
66                 df->reset    = false;
67 
68                 if (df->size != frg->act_size) {
69 
70                     df->size = frg->act_size;
71 
72 #ifndef GCS_FOR_GARB
73                     if (df->cache !=NULL) {
74                         gcache_free (df->cache, df->head);
75                     }
76                     else {
77                         free ((void*)df->head);
78                     }
79 
80                     DF_ALLOC();
81 #endif /* GCS_FOR_GARB */
82                 }
83             }
84             else if (frg->act_id == df->sent_id && frg->frag_no < df->frag_no) {
85                 /* gh172: tolerate duplicate fragments in production. */
86                 gu_warn ("Duplicate fragment %lld:%ld, expected %lld:%ld. "
87                          "Skipping.",
88                          frg->act_id, frg->frag_no, df->sent_id, df->frag_no);
89                 df->frag_no--; // revert counter in hope that we get good frag
90                 assert(0);
91                 return 0;
92             }
93             else {
94                 gu_error ("Unordered fragment received. Protocol error.");
95                 gu_error ("Expected: %llu:%ld, received: %llu:%ld",
96                           df->sent_id, df->frag_no, frg->act_id, frg->frag_no);
97                 gu_error ("Contents: '%.*s'", frg->frag_len, (char*)frg->frag);
98                 df->frag_no--; // revert counter in hope that we get good frag
99                 assert(0);
100                 return -EPROTO;
101             }
102         }
103     }
104     else {
105         /* new action */
106         if (gu_likely(0 == frg->frag_no)) {
107 
108             df->size    = frg->act_size;
109             df->sent_id = frg->act_id;
110             df->reset   = false;
111 
112 #ifndef GCS_FOR_GARB
113             DF_ALLOC();
114 #else
115             /* we don't store actions locally at all */
116             df->head = NULL;
117             df->tail = df->head;
118 #endif
119         }
120         else {
121             /* not a first fragment */
122             if (!local && df->reset) {
123                 /* can happen after configuration change,
124                    just ignore this message calmly */
125                 gu_debug ("Ignoring fragment %lld:%ld (size %d) after reset",
126                           frg->act_id, frg->frag_no, frg->act_size);
127                 return 0;
128             }
129             else {
130                 ((char*)frg->frag)[frg->frag_len - 1] = '\0';
131                 gu_error ("Unordered fragment received. Protocol error.");
132                 gu_error ("Expected: any:0(first), received: %lld:%ld",
133                           frg->act_id, frg->frag_no);
134                 gu_error ("Contents: '%s', local: %s, reset: %s",
135                           (char*)frg->frag, local ? "yes" : "no",
136                           df->reset ? "yes" : "no");
137                 assert(0);
138                 return -EPROTO;
139             }
140         }
141     }
142 
143     df->received += frg->frag_len;
144     assert (df->received <= df->size);
145 
146 #ifndef GCS_FOR_GARB
147     assert (df->tail);
148     memcpy (df->tail, frg->frag, frg->frag_len);
149     df->tail += frg->frag_len;
150 #else
151     /* we skip memcpy since have not allocated any buffer */
152     assert (NULL == df->tail);
153     assert (NULL == df->head);
154 #endif
155 
156 #if 1
157     if (df->received == df->size) {
158         act->buf     = df->head;
159         act->buf_len = df->received;
160         gcs_defrag_init (df, df->cache);
161         return act->buf_len;
162     }
163     else {
164         return 0;
165     }
166 #else
167     /* Refs gh185. Above original logic is preserved which relies on resetting
168      * group->frag_reset when local action needs to be resent. However a proper
169      * solution seems to be to use reset flag of own defrag channel (at least
170      * it is per channel, not global like group->frag_reset). This proper logic
171      * is shown below. Note that for it to work gcs_group_handle_act_msg()
172      * must be able to handle -ERESTART return code. */
173     int ret;
174 
175     if (df->received == df->size) {
176         act->buf     = df->head;
177         act->buf_len = df->received;
178         if (gu_likely(!df->reset))
179         {
180             ret = act->buf_len;
181         }
182         else
183         {
184             /* foreign action should simply never get here, only local actions
185              * are allowed to complete in reset state (to return -ERESTART) to
186              * a sending thread. */
187             assert(local);
188             ret = -ERESTART;
189         }
190         gcs_defrag_init (df, df->cache); // this also clears df->reset flag
191         assert(!df->reset);
192     }
193     else {
194         ret = 0;
195     }
196 
197     return ret;
198 #endif
199 }
200