1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 54 Interprocess Communication */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemBuf.h"
19 #include <algorithm>
20
21 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
22
23 Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
24 unsigned int Ipc::Inquirer::LastRequestId = 0;
25
26 /// compare Ipc::StrandCoord using kidId, for std::sort() below
27 static bool
LesserStrandByKidId(const Ipc::StrandCoord & c1,const Ipc::StrandCoord & c2)28 LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
29 {
30 return c1.kidId < c2.kidId;
31 }
32
Inquirer(Request::Pointer aRequest,const StrandCoords & coords,double aTimeout)33 Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
34 double aTimeout):
35 AsyncJob("Ipc::Inquirer"),
36 request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
37 {
38 debugs(54, 5, HERE);
39
40 // order by ascending kid IDs; useful for non-aggregatable stats
41 std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
42 }
43
~Inquirer()44 Ipc::Inquirer::~Inquirer()
45 {
46 debugs(54, 5, HERE);
47 cleanup();
48 }
49
50 void
cleanup()51 Ipc::Inquirer::cleanup()
52 {
53 }
54
55 void
start()56 Ipc::Inquirer::start()
57 {
58 request->requestId = 0;
59 }
60
61 void
inquire()62 Ipc::Inquirer::inquire()
63 {
64 if (pos == strands.end()) {
65 Must(done());
66 return;
67 }
68
69 Must(request->requestId == 0);
70 AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
71 HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL));
72 if (++LastRequestId == 0) // don't use zero value as request->requestId
73 ++LastRequestId;
74 request->requestId = LastRequestId;
75 const int kidId = pos->kidId;
76 debugs(54, 4, HERE << "inquire kid: " << kidId << status());
77 TheRequestsMap[request->requestId] = callback;
78 TypedMsgHdr message;
79 request->pack(message);
80 SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
81 eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
82 this, timeout, 0, false);
83 }
84
85 /// called when a strand is done writing its output
86 void
handleRemoteAck(Response::Pointer response)87 Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
88 {
89 debugs(54, 4, HERE << status());
90 request->requestId = 0;
91 removeTimeoutEvent();
92 if (aggregate(response)) {
93 Must(!done()); // or we should not be called
94 ++pos; // advance after a successful inquiry
95 inquire();
96 } else {
97 mustStop("error");
98 }
99 }
100
101 void
swanSong()102 Ipc::Inquirer::swanSong()
103 {
104 debugs(54, 5, HERE);
105 removeTimeoutEvent();
106 if (request->requestId > 0) {
107 DequeueRequest(request->requestId);
108 request->requestId = 0;
109 }
110 sendResponse();
111 cleanup();
112 }
113
114 bool
doneAll() const115 Ipc::Inquirer::doneAll() const
116 {
117 return pos == strands.end();
118 }
119
120 void
handleException(const std::exception & e)121 Ipc::Inquirer::handleException(const std::exception& e)
122 {
123 debugs(54, 3, HERE << e.what());
124 mustStop("exception");
125 }
126
127 void
callException(const std::exception & e)128 Ipc::Inquirer::callException(const std::exception& e)
129 {
130 debugs(54, 3, HERE);
131 try {
132 handleException(e);
133 } catch (const std::exception& ex) {
134 debugs(54, DBG_CRITICAL, HERE << ex.what());
135 }
136 AsyncJob::callException(e);
137 }
138
139 /// returns and forgets the right Inquirer callback for strand request
140 AsyncCall::Pointer
DequeueRequest(unsigned int requestId)141 Ipc::Inquirer::DequeueRequest(unsigned int requestId)
142 {
143 debugs(54, 3, HERE << " requestId " << requestId);
144 Must(requestId != 0);
145 AsyncCall::Pointer call;
146 RequestsMap::iterator request = TheRequestsMap.find(requestId);
147 if (request != TheRequestsMap.end()) {
148 call = request->second;
149 Must(call != NULL);
150 TheRequestsMap.erase(request);
151 }
152 return call;
153 }
154
155 void
HandleRemoteAck(const Response & response)156 Ipc::Inquirer::HandleRemoteAck(const Response& response)
157 {
158 Must(response.requestId != 0);
159 AsyncCall::Pointer call = DequeueRequest(response.requestId);
160 if (call != NULL) {
161 HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
162 Must(dialer);
163 dialer->arg1 = response.clone();
164 ScheduleCallHere(call);
165 }
166 }
167
168 /// called when we are no longer waiting for the strand to respond
169 void
removeTimeoutEvent()170 Ipc::Inquirer::removeTimeoutEvent()
171 {
172 if (eventFind(&Inquirer::RequestTimedOut, this))
173 eventDelete(&Inquirer::RequestTimedOut, this);
174 }
175
176 /// Ipc::Inquirer::requestTimedOut wrapper
177 void
RequestTimedOut(void * param)178 Ipc::Inquirer::RequestTimedOut(void* param)
179 {
180 debugs(54, 3, HERE);
181 Must(param != NULL);
182 Inquirer* cmi = static_cast<Inquirer*>(param);
183 // use async call to enable job call protection that time events lack
184 CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
185 }
186
187 /// called when the strand failed to respond (or finish responding) in time
188 void
requestTimedOut()189 Ipc::Inquirer::requestTimedOut()
190 {
191 debugs(54, 3, HERE);
192 if (request->requestId != 0) {
193 DequeueRequest(request->requestId);
194 request->requestId = 0;
195 Must(!done()); // or we should not be called
196 ++pos; // advance after a failed inquiry
197 inquire();
198 }
199 }
200
201 const char*
status() const202 Ipc::Inquirer::status() const
203 {
204 static MemBuf buf;
205 buf.reset();
206 buf.appendf(" [request->requestId %u]", request->requestId);
207 buf.terminate();
208 return buf.content();
209 }
210
211