1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54    Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemBuf.h"
19 #include <algorithm>
20 
21 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
22 
23 Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
24 unsigned int Ipc::Inquirer::LastRequestId = 0;
25 
26 /// compare Ipc::StrandCoord using kidId, for std::sort() below
27 static bool
LesserStrandByKidId(const Ipc::StrandCoord & c1,const Ipc::StrandCoord & c2)28 LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
29 {
30     return c1.kidId < c2.kidId;
31 }
32 
Inquirer(Request::Pointer aRequest,const StrandCoords & coords,double aTimeout)33 Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
34                         double aTimeout):
35     AsyncJob("Ipc::Inquirer"),
36     request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
37 {
38     debugs(54, 5, HERE);
39 
40     // order by ascending kid IDs; useful for non-aggregatable stats
41     std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
42 }
43 
~Inquirer()44 Ipc::Inquirer::~Inquirer()
45 {
46     debugs(54, 5, HERE);
47     cleanup();
48 }
49 
50 void
cleanup()51 Ipc::Inquirer::cleanup()
52 {
53 }
54 
55 void
start()56 Ipc::Inquirer::start()
57 {
58     request->requestId = 0;
59 }
60 
61 void
inquire()62 Ipc::Inquirer::inquire()
63 {
64     if (pos == strands.end()) {
65         Must(done());
66         return;
67     }
68 
69     Must(request->requestId == 0);
70     AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
71                                             HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL));
72     if (++LastRequestId == 0) // don't use zero value as request->requestId
73         ++LastRequestId;
74     request->requestId = LastRequestId;
75     const int kidId = pos->kidId;
76     debugs(54, 4, HERE << "inquire kid: " << kidId << status());
77     TheRequestsMap[request->requestId] = callback;
78     TypedMsgHdr message;
79     request->pack(message);
80     SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
81     eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
82              this, timeout, 0, false);
83 }
84 
85 /// called when a strand is done writing its output
86 void
handleRemoteAck(Response::Pointer response)87 Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
88 {
89     debugs(54, 4, HERE << status());
90     request->requestId = 0;
91     removeTimeoutEvent();
92     if (aggregate(response)) {
93         Must(!done()); // or we should not be called
94         ++pos; // advance after a successful inquiry
95         inquire();
96     } else {
97         mustStop("error");
98     }
99 }
100 
101 void
swanSong()102 Ipc::Inquirer::swanSong()
103 {
104     debugs(54, 5, HERE);
105     removeTimeoutEvent();
106     if (request->requestId > 0) {
107         DequeueRequest(request->requestId);
108         request->requestId = 0;
109     }
110     sendResponse();
111     cleanup();
112 }
113 
114 bool
doneAll() const115 Ipc::Inquirer::doneAll() const
116 {
117     return pos == strands.end();
118 }
119 
120 void
handleException(const std::exception & e)121 Ipc::Inquirer::handleException(const std::exception& e)
122 {
123     debugs(54, 3, HERE << e.what());
124     mustStop("exception");
125 }
126 
127 void
callException(const std::exception & e)128 Ipc::Inquirer::callException(const std::exception& e)
129 {
130     debugs(54, 3, HERE);
131     try {
132         handleException(e);
133     } catch (const std::exception& ex) {
134         debugs(54, DBG_CRITICAL, HERE << ex.what());
135     }
136     AsyncJob::callException(e);
137 }
138 
139 /// returns and forgets the right Inquirer callback for strand request
140 AsyncCall::Pointer
DequeueRequest(unsigned int requestId)141 Ipc::Inquirer::DequeueRequest(unsigned int requestId)
142 {
143     debugs(54, 3, HERE << " requestId " << requestId);
144     Must(requestId != 0);
145     AsyncCall::Pointer call;
146     RequestsMap::iterator request = TheRequestsMap.find(requestId);
147     if (request != TheRequestsMap.end()) {
148         call = request->second;
149         Must(call != NULL);
150         TheRequestsMap.erase(request);
151     }
152     return call;
153 }
154 
155 void
HandleRemoteAck(const Response & response)156 Ipc::Inquirer::HandleRemoteAck(const Response& response)
157 {
158     Must(response.requestId != 0);
159     AsyncCall::Pointer call = DequeueRequest(response.requestId);
160     if (call != NULL) {
161         HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
162         Must(dialer);
163         dialer->arg1 = response.clone();
164         ScheduleCallHere(call);
165     }
166 }
167 
168 /// called when we are no longer waiting for the strand to respond
169 void
removeTimeoutEvent()170 Ipc::Inquirer::removeTimeoutEvent()
171 {
172     if (eventFind(&Inquirer::RequestTimedOut, this))
173         eventDelete(&Inquirer::RequestTimedOut, this);
174 }
175 
176 /// Ipc::Inquirer::requestTimedOut wrapper
177 void
RequestTimedOut(void * param)178 Ipc::Inquirer::RequestTimedOut(void* param)
179 {
180     debugs(54, 3, HERE);
181     Must(param != NULL);
182     Inquirer* cmi = static_cast<Inquirer*>(param);
183     // use async call to enable job call protection that time events lack
184     CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
185 }
186 
187 /// called when the strand failed to respond (or finish responding) in time
188 void
requestTimedOut()189 Ipc::Inquirer::requestTimedOut()
190 {
191     debugs(54, 3, HERE);
192     if (request->requestId != 0) {
193         DequeueRequest(request->requestId);
194         request->requestId = 0;
195         Must(!done()); // or we should not be called
196         ++pos; // advance after a failed inquiry
197         inquire();
198     }
199 }
200 
201 const char*
status() const202 Ipc::Inquirer::status() const
203 {
204     static MemBuf buf;
205     buf.reset();
206     buf.appendf(" [request->requestId %u]", request->requestId);
207     buf.terminate();
208     return buf.content();
209 }
210 
211