1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 16    Cache Manager API */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "comm/Write.h"
16 #include "CommCalls.h"
17 #include "errorpage.h"
18 #include "HttpReply.h"
19 #include "HttpRequest.h"
20 #include "ipc/UdsOp.h"
21 #include "mgr/ActionWriter.h"
22 #include "mgr/Command.h"
23 #include "mgr/Inquirer.h"
24 #include "mgr/IntParam.h"
25 #include "mgr/Request.h"
26 #include "mgr/Response.h"
27 #include "SquidTime.h"
28 #include <memory>
29 #include <algorithm>
30 
31 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
32 
Inquirer(Action::Pointer anAction,const Request & aCause,const Ipc::StrandCoords & coords)33 Mgr::Inquirer::Inquirer(Action::Pointer anAction,
34                         const Request &aCause, const Ipc::StrandCoords &coords):
35     Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
36     aggrAction(anAction)
37 {
38     conn = aCause.conn;
39     Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
40 
41     debugs(16, 5, HERE << conn << " action: " << aggrAction);
42 
43     closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
44                        CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
45     comm_add_close_handler(conn->fd, closer);
46 }
47 
48 /// closes our copy of the client HTTP connection socket
49 void
cleanup()50 Mgr::Inquirer::cleanup()
51 {
52     if (Comm::IsConnOpen(conn)) {
53         removeCloseHandler();
54         conn->close();
55     }
56 }
57 
58 void
removeCloseHandler()59 Mgr::Inquirer::removeCloseHandler()
60 {
61     if (closer != NULL) {
62         comm_remove_close_handler(conn->fd, closer);
63         closer = NULL;
64     }
65 }
66 
67 void
start()68 Mgr::Inquirer::start()
69 {
70     debugs(16, 5, HERE);
71     Ipc::Inquirer::start();
72     Must(Comm::IsConnOpen(conn));
73     Must(aggrAction != NULL);
74 
75     std::unique_ptr<MemBuf> replyBuf;
76     if (strands.empty()) {
77         const char *url = aggrAction->command().params.httpUri.termedBuf();
78         const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initIpc);
79         auto *req = HttpRequest::FromUrlXXX(url, mx);
80         ErrorState err(ERR_INVALID_URL, Http::scNotFound, req);
81         std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
82         replyBuf.reset(reply->pack());
83     } else {
84         std::unique_ptr<HttpReply> reply(new HttpReply);
85         reply->setHeaders(Http::scOkay, NULL, "text/plain", -1, squid_curtime, squid_curtime);
86         reply->header.putStr(Http::HdrType::CONNECTION, "close"); // until we chunk response
87         replyBuf.reset(reply->pack());
88     }
89     writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
90                        CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
91     Comm::Write(conn, replyBuf.get(), writer);
92 }
93 
94 /// called when we wrote the response header
95 void
noteWroteHeader(const CommIoCbParams & params)96 Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
97 {
98     debugs(16, 5, HERE);
99     writer = NULL;
100     Must(params.flag == Comm::OK);
101     Must(params.conn.getRaw() == conn.getRaw());
102     Must(params.size != 0);
103     // start inquiries at the initial pos
104     inquire();
105 }
106 
107 /// called when the HTTP client or some external force closed our socket
108 void
noteCommClosed(const CommCloseCbParams & params)109 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
110 {
111     debugs(16, 5, HERE);
112     Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
113     conn = NULL;
114     mustStop("commClosed");
115 }
116 
117 bool
aggregate(Ipc::Response::Pointer aResponse)118 Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
119 {
120     Mgr::Response& response = static_cast<Response&>(*aResponse);
121     if (response.hasAction())
122         aggrAction->add(response.getAction());
123     return true;
124 }
125 
126 void
sendResponse()127 Mgr::Inquirer::sendResponse()
128 {
129     if (!strands.empty() && aggrAction->aggregatable()) {
130         removeCloseHandler();
131         AsyncJob::Start(new ActionWriter(aggrAction, conn));
132         conn = NULL; // should not close because we passed it to ActionWriter
133     }
134 }
135 
136 bool
doneAll() const137 Mgr::Inquirer::doneAll() const
138 {
139     return !writer && Ipc::Inquirer::doneAll();
140 }
141 
142 Ipc::StrandCoords
applyQueryParams(const Ipc::StrandCoords & aStrands,const QueryParams & aParams)143 Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
144 {
145     Ipc::StrandCoords sc;
146 
147     QueryParam::Pointer processesParam = aParams.get("processes");
148     QueryParam::Pointer workersParam = aParams.get("workers");
149 
150     if (processesParam == NULL || workersParam == NULL) {
151         if (processesParam != NULL) {
152             IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
153             if (param != NULL && param->type == QueryParam::ptInt) {
154                 const std::vector<int>& processes = param->value();
155                 for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
156                         iter != aStrands.end(); ++iter) {
157                     if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
158                         sc.push_back(*iter);
159                 }
160             }
161         } else if (workersParam != NULL) {
162             IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
163             if (param != NULL && param->type == QueryParam::ptInt) {
164                 const std::vector<int>& workers = param->value();
165                 for (int i = 0; i < (int)aStrands.size(); ++i) {
166                     if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
167                         sc.push_back(aStrands[i]);
168                 }
169             }
170         } else {
171             sc = aStrands;
172         }
173     }
174 
175     debugs(16, 4, HERE << "strands kid IDs = ");
176     for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
177         debugs(16, 4, HERE << iter->kidId);
178     }
179 
180     return sc;
181 }
182 
183