1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16
17 #include "outgoing_message_manager.h"
18 #include "mutex_lock.h"
19 #include "outgoing_message_info.h"
20 #include <yami4-core/fatal_errors.h>
21
22 using namespace yami;
23 using namespace details;
24
outgoing_message_manager()25 outgoing_message_manager::outgoing_message_manager()
26 {
27 mtx_.init();
28 }
29
~outgoing_message_manager()30 outgoing_message_manager::~outgoing_message_manager()
31 {
32 map_type::iterator it = map_.begin();
33 map_type::iterator end = map_.end();
34 for ( ; it != end; ++it)
35 {
36 outgoing_message_info * outgoing = it->second;
37 it->second = NULL;
38
39 outgoing->dec_ref_count();
40 }
41
42 mtx_.clean();
43 }
44
put(long long message_id,outgoing_message_info * outgoing)45 void outgoing_message_manager::put(long long message_id,
46 outgoing_message_info * outgoing)
47 {
48 mutex_lock lock(mtx_);
49
50 map_type::const_iterator it = map_.find(message_id);
51
52 // it is not possible to create the same outgoing message twice
53 if (it != map_.end())
54 {
55 fatal_failure(__FILE__, __LINE__);
56 }
57
58 map_[message_id] = outgoing;
59 ++(outgoing->ref_count);
60 }
61
remove(long long message_id)62 bool outgoing_message_manager::remove(long long message_id)
63 {
64 mutex_lock lock(mtx_);
65
66 map_type::iterator it = map_.find(message_id);
67
68 // it is acceptable to have a spurious remove - this can be a result
69 // of combination of exception handling and regular cleanup
70 if (it != map_.end())
71 {
72 return do_remove(it);
73 }
74 else
75 {
76 // consider the remove operation to be successful,
77 // as the final effect is as the user expected
78
79 return true;
80 }
81 }
82
do_remove(map_type::iterator it)83 bool outgoing_message_manager::do_remove(map_type::iterator it)
84 {
85 outgoing_message_info * outgoing = it->second;
86
87 if (outgoing->callback_pending)
88 {
89 // do not remove the object if the user callback is pending
90 // -> instead allow the user to recognize this situation
91 // and allow him to handle that at the application level
92 // (perhaps by retrying later) or letting the manager to
93 // remove the object automatically after the last interaction)
94
95 return false;
96 }
97
98 it->second = NULL;
99
100 outgoing->dec_ref_count(/* disown_callback = */ true);
101
102 map_.erase(it);
103
104 return true;
105 }
106
report_replied(long long message_id,std::unique_ptr<parameters> & body)107 void outgoing_message_manager::report_replied(long long message_id,
108 std::unique_ptr<parameters> & body)
109 {
110 mutex_lock lock(mtx_);
111
112 map_type::iterator it = map_.find(message_id);
113
114 // it is not a bug if the user tries to operate on non-existing message
115 // (it might be a network junk that should be ignored)
116
117 if (it != map_.end())
118 {
119 outgoing_message_info & outgoing = *(it->second);
120 bool should_remove = false;
121 {
122 mutex_lock lock_outgoing(outgoing.mtx);
123
124 // it is possible to get several replies to the same message
125 // (this might be a junk network content),
126 // but only the first reply is taken into account
127
128 if (outgoing.state == posted || outgoing.state == transmitted)
129 {
130 message_state previous_state = outgoing.state;
131
132 outgoing.state = replied;
133
134 if (outgoing.reply_body != NULL)
135 {
136 fatal_failure(__FILE__, __LINE__);
137 }
138
139 outgoing.reply_body = body.release();
140
141 if (previous_state == posted)
142 {
143 outgoing.transmitted.notify();
144 }
145 outgoing.completed.notify();
146
147 outgoing.process_callback(&mtx_);
148
149 // there will be no more interaction with this message
150 should_remove = true;
151 }
152 }
153
154 if (should_remove)
155 {
156 do_remove(it);
157 }
158 }
159 }
160
report_replied(long long message_id,std::unique_ptr<std::vector<char>> & raw_buffer)161 void outgoing_message_manager::report_replied(long long message_id,
162 std::unique_ptr<std::vector<char> > & raw_buffer)
163 {
164 mutex_lock lock(mtx_);
165
166 map_type::iterator it = map_.find(message_id);
167
168 // it is not a bug if the user tries to operate on non-existing message
169 // (it might be a network junk that should be ignored)
170
171 if (it != map_.end())
172 {
173 outgoing_message_info & outgoing = *(it->second);
174 bool should_remove = false;
175 {
176 mutex_lock lock_outgoing(outgoing.mtx);
177
178 // it is possible to get several replies to the same message
179 // (this might be a junk network content),
180 // but only the first reply is taken into account
181
182 if (outgoing.state == posted || outgoing.state == transmitted)
183 {
184 message_state previous_state = outgoing.state;
185
186 outgoing.state = replied;
187
188 if (outgoing.reply_body != NULL)
189 {
190 fatal_failure(__FILE__, __LINE__);
191 }
192
193 outgoing.reply_raw_buffer = raw_buffer.release();
194
195 if (previous_state == posted)
196 {
197 outgoing.transmitted.notify();
198 }
199 outgoing.completed.notify();
200
201 outgoing.process_callback(&mtx_);
202
203 // there will be no more interaction with this message
204 should_remove = true;
205 }
206 }
207
208 if (should_remove)
209 {
210 do_remove(it);
211 }
212 }
213 }
214
report_rejected(long long message_id,const std::string & reason)215 void outgoing_message_manager::report_rejected(
216 long long message_id, const std::string & reason)
217 {
218 mutex_lock lock(mtx_);
219
220 map_type::iterator it = map_.find(message_id);
221
222 // it is not a bug if the user tries to operate on non-existing message
223 // (it might be a network junk that should be ignored)
224
225 if (it != map_.end())
226 {
227 outgoing_message_info & outgoing = *(it->second);
228 bool should_remove = false;
229 {
230 mutex_lock lock_outgoing(outgoing.mtx);
231
232 // it is possible to get several rejections to the same message
233 // (this might be a junk network content),
234 // but only the first one is taken into account
235
236 if (outgoing.state == posted || outgoing.state == transmitted)
237 {
238 message_state previous_state = outgoing.state;
239
240 outgoing.state = rejected;
241 outgoing.exception_msg = reason;
242
243 if (previous_state == posted)
244 {
245 outgoing.transmitted.notify();
246 }
247 outgoing.completed.notify();
248
249 outgoing.process_callback(&mtx_);
250
251 // there will be no more interaction with this message
252 should_remove = true;
253 }
254 }
255
256 if (should_remove)
257 {
258 do_remove(it);
259 }
260 }
261 }
262