1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4.  If not, see <http://www.gnu.org/licenses/>.
16 
17 #include "outgoing_message_manager.h"
18 #include "mutex_lock.h"
19 #include "outgoing_message_info.h"
20 #include <yami4-core/fatal_errors.h>
21 
22 using namespace yami;
23 using namespace details;
24 
outgoing_message_manager()25 outgoing_message_manager::outgoing_message_manager()
26 {
27     mtx_.init();
28 }
29 
~outgoing_message_manager()30 outgoing_message_manager::~outgoing_message_manager()
31 {
32     map_type::iterator it = map_.begin();
33     map_type::iterator end = map_.end();
34     for ( ; it != end; ++it)
35     {
36         outgoing_message_info * outgoing = it->second;
37         it->second = NULL;
38 
39         outgoing->dec_ref_count();
40     }
41 
42     mtx_.clean();
43 }
44 
put(long long message_id,outgoing_message_info * outgoing)45 void outgoing_message_manager::put(long long message_id,
46     outgoing_message_info * outgoing)
47 {
48     mutex_lock lock(mtx_);
49 
50     map_type::const_iterator it = map_.find(message_id);
51 
52     // it is not possible to create the same outgoing message twice
53     if (it != map_.end())
54     {
55         fatal_failure(__FILE__, __LINE__);
56     }
57 
58     map_[message_id] = outgoing;
59     ++(outgoing->ref_count);
60 }
61 
remove(long long message_id)62 bool outgoing_message_manager::remove(long long message_id)
63 {
64     mutex_lock lock(mtx_);
65 
66     map_type::iterator it = map_.find(message_id);
67 
68     // it is acceptable to have a spurious remove - this can be a result
69     // of combination of exception handling and regular cleanup
70     if (it != map_.end())
71     {
72         return do_remove(it);
73     }
74     else
75     {
76         // consider the remove operation to be successful,
77         // as the final effect is as the user expected
78 
79         return true;
80     }
81 }
82 
do_remove(map_type::iterator it)83 bool outgoing_message_manager::do_remove(map_type::iterator it)
84 {
85     outgoing_message_info * outgoing = it->second;
86 
87     if (outgoing->callback_pending)
88     {
89         // do not remove the object if the user callback is pending
90         // -> instead allow the user to recognize this situation
91         // and allow him to handle that at the application level
92         // (perhaps by retrying later) or letting the manager to
93         // remove the object automatically after the last interaction)
94 
95         return false;
96     }
97 
98     it->second = NULL;
99 
100     outgoing->dec_ref_count(/* disown_callback = */ true);
101 
102     map_.erase(it);
103 
104     return true;
105 }
106 
report_replied(long long message_id,std::unique_ptr<parameters> & body)107 void outgoing_message_manager::report_replied(long long message_id,
108     std::unique_ptr<parameters> & body)
109 {
110     mutex_lock lock(mtx_);
111 
112     map_type::iterator it = map_.find(message_id);
113 
114     // it is not a bug if the user tries to operate on non-existing message
115     // (it might be a network junk that should be ignored)
116 
117     if (it != map_.end())
118     {
119         outgoing_message_info & outgoing = *(it->second);
120         bool should_remove = false;
121         {
122             mutex_lock lock_outgoing(outgoing.mtx);
123 
124             // it is possible to get several replies to the same message
125             // (this might be a junk network content),
126             // but only the first reply is taken into account
127 
128             if (outgoing.state == posted || outgoing.state == transmitted)
129             {
130                 message_state previous_state = outgoing.state;
131 
132                 outgoing.state = replied;
133 
134                 if (outgoing.reply_body != NULL)
135                 {
136                     fatal_failure(__FILE__, __LINE__);
137                 }
138 
139                 outgoing.reply_body = body.release();
140 
141                 if (previous_state == posted)
142                 {
143                     outgoing.transmitted.notify();
144                 }
145                 outgoing.completed.notify();
146 
147                 outgoing.process_callback(&mtx_);
148 
149                 // there will be no more interaction with this message
150                 should_remove = true;
151             }
152         }
153 
154         if (should_remove)
155         {
156             do_remove(it);
157         }
158     }
159 }
160 
report_replied(long long message_id,std::unique_ptr<std::vector<char>> & raw_buffer)161 void outgoing_message_manager::report_replied(long long message_id,
162     std::unique_ptr<std::vector<char> > & raw_buffer)
163 {
164     mutex_lock lock(mtx_);
165 
166     map_type::iterator it = map_.find(message_id);
167 
168     // it is not a bug if the user tries to operate on non-existing message
169     // (it might be a network junk that should be ignored)
170 
171     if (it != map_.end())
172     {
173         outgoing_message_info & outgoing = *(it->second);
174         bool should_remove = false;
175         {
176             mutex_lock lock_outgoing(outgoing.mtx);
177 
178             // it is possible to get several replies to the same message
179             // (this might be a junk network content),
180             // but only the first reply is taken into account
181 
182             if (outgoing.state == posted || outgoing.state == transmitted)
183             {
184                 message_state previous_state = outgoing.state;
185 
186                 outgoing.state = replied;
187 
188                 if (outgoing.reply_body != NULL)
189                 {
190                     fatal_failure(__FILE__, __LINE__);
191                 }
192 
193                 outgoing.reply_raw_buffer = raw_buffer.release();
194 
195                 if (previous_state == posted)
196                 {
197                     outgoing.transmitted.notify();
198                 }
199                 outgoing.completed.notify();
200 
201                 outgoing.process_callback(&mtx_);
202 
203                 // there will be no more interaction with this message
204                 should_remove = true;
205             }
206         }
207 
208         if (should_remove)
209         {
210             do_remove(it);
211         }
212     }
213 }
214 
report_rejected(long long message_id,const std::string & reason)215 void outgoing_message_manager::report_rejected(
216     long long message_id, const std::string & reason)
217 {
218     mutex_lock lock(mtx_);
219 
220     map_type::iterator it = map_.find(message_id);
221 
222     // it is not a bug if the user tries to operate on non-existing message
223     // (it might be a network junk that should be ignored)
224 
225     if (it != map_.end())
226     {
227         outgoing_message_info & outgoing = *(it->second);
228         bool should_remove = false;
229         {
230             mutex_lock lock_outgoing(outgoing.mtx);
231 
232             // it is possible to get several rejections to the same message
233             // (this might be a junk network content),
234             // but only the first one is taken into account
235 
236             if (outgoing.state == posted || outgoing.state == transmitted)
237             {
238                 message_state previous_state = outgoing.state;
239 
240                 outgoing.state = rejected;
241                 outgoing.exception_msg = reason;
242 
243                 if (previous_state == posted)
244                 {
245                     outgoing.transmitted.notify();
246                 }
247                 outgoing.completed.notify();
248 
249                 outgoing.process_callback(&mtx_);
250 
251                 // there will be no more interaction with this message
252                 should_remove = true;
253            }
254         }
255 
256         if (should_remove)
257         {
258             do_remove(it);
259         }
260     }
261 }
262