1 /*
2 * Copyright (C) 2008-2020 Codership Oy <info@codership.com>
3 *
4 * $Id$
5 */
6
7 #include "gcs_defrag.hpp"
8
9 #include <errno.h>
10 #include <unistd.h>
11 #include <string.h>
12
13 #define DF_ALLOC() \
14 do { \
15 df->head = static_cast<uint8_t*>(gcs_gcache_malloc(df->cache,df->size));\
16 \
17 if(gu_likely(df->head != NULL)) \
18 df->tail = df->head; \
19 else { \
20 gu_error ("Could not allocate memory for new " \
21 "action of size: %zd", df->size); \
22 return -ENOMEM; \
23 } \
24 } while (0)
25
26 /*!
27 * Handle action fragment
28 *
29 * Unless a whole action is returned, contents of act is undefined
30 *
31 * In order to optimize branch prediction used gu_likely macros and odered and
32 * nested if/else blocks according to branch probability.
33 *
34 * @return 0 - success,
35 * size of action - success, full action received,
36 * negative - error.
37 *
38 * TODO: this function is too long, figure out a way to factor it into several
39 * smaller ones. Note that it is called for every GCS_MSG_ACTION message
40 * so it should be optimal.
41 */
42 ssize_t
gcs_defrag_handle_frag(gcs_defrag_t * df,const gcs_act_frag_t * frg,struct gcs_act * act,bool local)43 gcs_defrag_handle_frag (gcs_defrag_t* df,
44 const gcs_act_frag_t* frg,
45 struct gcs_act* act,
46 bool local)
47 {
48 if (df->received) {
49 /* another fragment of existing action */
50
51 df->frag_no++;
52
53 /* detect possible error condition */
54 if (gu_unlikely((df->sent_id != frg->act_id) ||
55 (df->frag_no != frg->frag_no))) {
56 if (local && df->reset &&
57 (df->sent_id == frg->act_id) && (0 == frg->frag_no)) {
58 /* df->sent_id was aborted halfway and is being taken care of
59 * by the sender thread. Forget about it.
60 * Reinit counters and continue with the new action. */
61 gu_debug ("Local action %lld, size %ld reset.",
62 frg->act_id, frg->act_size);
63 df->frag_no = 0;
64 df->received = 0;
65 df->tail = df->head;
66 df->reset = false;
67
68 if (df->size != frg->act_size) {
69
70 df->size = frg->act_size;
71
72 #ifndef GCS_FOR_GARB
73 if (df->cache !=NULL) {
74 gcache_free (df->cache, df->head);
75 }
76 else {
77 free ((void*)df->head);
78 }
79
80 DF_ALLOC();
81 #endif /* GCS_FOR_GARB */
82 }
83 }
84 else if (frg->act_id == df->sent_id && frg->frag_no < df->frag_no) {
85 /* gh172: tolerate duplicate fragments in production. */
86 gu_warn ("Duplicate fragment %lld:%ld, expected %lld:%ld. "
87 "Skipping.",
88 frg->act_id, frg->frag_no, df->sent_id, df->frag_no);
89 df->frag_no--; // revert counter in hope that we get good frag
90 assert(0);
91 return 0;
92 }
93 else {
94 gu_error ("Unordered fragment received. Protocol error.");
95 gu_error ("Expected: %llu:%ld, received: %llu:%ld",
96 df->sent_id, df->frag_no, frg->act_id, frg->frag_no);
97 gu_error ("Contents: '%.*s'", frg->frag_len, (char*)frg->frag);
98 df->frag_no--; // revert counter in hope that we get good frag
99 assert(0);
100 return -EPROTO;
101 }
102 }
103 }
104 else {
105 /* new action */
106 if (gu_likely(0 == frg->frag_no)) {
107
108 df->size = frg->act_size;
109 df->sent_id = frg->act_id;
110 df->reset = false;
111
112 #ifndef GCS_FOR_GARB
113 DF_ALLOC();
114 #else
115 /* we don't store actions locally at all */
116 df->head = NULL;
117 df->tail = df->head;
118 #endif
119 }
120 else {
121 /* not a first fragment */
122 if (!local && df->reset) {
123 /* can happen after configuration change,
124 just ignore this message calmly */
125 gu_debug ("Ignoring fragment %lld:%ld (size %d) after reset",
126 frg->act_id, frg->frag_no, frg->act_size);
127 return 0;
128 }
129 else {
130 ((char*)frg->frag)[frg->frag_len - 1] = '\0';
131 gu_error ("Unordered fragment received. Protocol error.");
132 gu_error ("Expected: any:0(first), received: %lld:%ld",
133 frg->act_id, frg->frag_no);
134 gu_error ("Contents: '%s', local: %s, reset: %s",
135 (char*)frg->frag, local ? "yes" : "no",
136 df->reset ? "yes" : "no");
137 assert(0);
138 return -EPROTO;
139 }
140 }
141 }
142
143 df->received += frg->frag_len;
144 assert (df->received <= df->size);
145
146 #ifndef GCS_FOR_GARB
147 assert (df->tail);
148 memcpy (df->tail, frg->frag, frg->frag_len);
149 df->tail += frg->frag_len;
150 #else
151 /* we skip memcpy since have not allocated any buffer */
152 assert (NULL == df->tail);
153 assert (NULL == df->head);
154 #endif
155
156 #if 1
157 if (df->received == df->size) {
158 act->buf = df->head;
159 act->buf_len = df->received;
160 gcs_defrag_init (df, df->cache);
161 return act->buf_len;
162 }
163 else {
164 return 0;
165 }
166 #else
167 /* Refs gh185. Above original logic is preserved which relies on resetting
168 * group->frag_reset when local action needs to be resent. However a proper
169 * solution seems to be to use reset flag of own defrag channel (at least
170 * it is per channel, not global like group->frag_reset). This proper logic
171 * is shown below. Note that for it to work gcs_group_handle_act_msg()
172 * must be able to handle -ERESTART return code. */
173 int ret;
174
175 if (df->received == df->size) {
176 act->buf = df->head;
177 act->buf_len = df->received;
178 if (gu_likely(!df->reset))
179 {
180 ret = act->buf_len;
181 }
182 else
183 {
184 /* foreign action should simply never get here, only local actions
185 * are allowed to complete in reset state (to return -ERESTART) to
186 * a sending thread. */
187 assert(local);
188 ret = -ERESTART;
189 }
190 gcs_defrag_init (df, df->cache); // this also clears df->reset flag
191 assert(!df->reset);
192 }
193 else {
194 ret = 0;
195 }
196
197 return ret;
198 #endif
199 }
200