1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "Debug.h"
12 #include "fs/rock/RockHeaderUpdater.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
15 #include "Store.h"
16 #include "StoreMetaUnpacker.h"
17
18 CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
19
HeaderUpdater(const Rock::SwapDir::Pointer & aStore,const Ipc::StoreMapUpdate & anUpdate)20 Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate):
21 AsyncJob("Rock::HeaderUpdater"),
22 store(aStore),
23 update(anUpdate),
24 reader(),
25 writer(),
26 bytesRead(0),
27 staleSwapHeaderSize(0),
28 staleSplicingPointNext(-1)
29 {
30 // TODO: Consider limiting the number of concurrent store updates.
31 }
32
33 bool
doneAll() const34 Rock::HeaderUpdater::doneAll() const
35 {
36 return !reader && !writer && AsyncJob::doneAll();
37 }
38
39 void
swanSong()40 Rock::HeaderUpdater::swanSong()
41 {
42 if (update.stale || update.fresh)
43 store->map->abortUpdating(update);
44
45 if (reader) {
46 reader->close(StoreIOState::readerDone);
47 reader = nullptr;
48 }
49
50 if (writer) {
51 writer->close(StoreIOState::writerGone);
52 // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53 // Also required to avoid IoState destructor assertions.
54 // We can do this because we closed update earlier or aborted it above.
55 dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56 writer = nullptr;
57 }
58
59 AsyncJob::swanSong();
60 }
61
62 void
start()63 Rock::HeaderUpdater::start()
64 {
65 Must(update.entry);
66 Must(update.stale);
67 Must(update.fresh);
68 startReading();
69 }
70
71 void
startReading()72 Rock::HeaderUpdater::startReading()
73 {
74 reader = store->openStoreIO(
75 *update.entry,
76 nullptr, // unused; see StoreIOState::file_callback
77 &NoteDoneReading,
78 this);
79 readMore("need swap entry metadata");
80 }
81
82 void
stopReading(const char * why)83 Rock::HeaderUpdater::stopReading(const char *why)
84 {
85 debugs(47, 7, why);
86
87 Must(reader);
88 const IoState &rockReader = dynamic_cast<IoState&>(*reader);
89 update.stale.splicingPoint = rockReader.splicingPoint;
90 staleSplicingPointNext = rockReader.staleSplicingPointNext;
91 debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
92 " body continues at " << staleSplicingPointNext);
93
94 reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
95 reader = nullptr; // so that swanSong() does not try to close again
96 }
97
98 void
NoteRead(void * data,const char * buf,ssize_t result,StoreIOState::Pointer)99 Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
100 {
101 IoCbParams io(buf, result);
102 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
103 CallJobHere1(47, 7,
104 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
105 Rock::HeaderUpdater,
106 noteRead,
107 io);
108 }
109
110 void
noteRead(const Rock::HeaderUpdater::IoCbParams result)111 Rock::HeaderUpdater::noteRead(const Rock::HeaderUpdater::IoCbParams result)
112 {
113 debugs(47, 7, result.size);
114 if (!result.size) { // EOF
115 stopReading("eof");
116 } else {
117 Must(result.size > 0);
118 bytesRead += result.size;
119 readerBuffer.rawAppendFinish(result.buf, result.size);
120 exchangeBuffer.append(readerBuffer);
121 debugs(47, 7, "accumulated " << exchangeBuffer.length());
122 }
123
124 parseReadBytes();
125 }
126
127 void
readMore(const char * why)128 Rock::HeaderUpdater::readMore(const char *why)
129 {
130 debugs(47, 7, "from " << bytesRead << " because " << why);
131 Must(reader);
132 readerBuffer.clear();
133 storeRead(reader,
134 readerBuffer.rawAppendStart(store->slotSize),
135 store->slotSize,
136 bytesRead,
137 &NoteRead,
138 this);
139 }
140
141 void
NoteDoneReading(void * data,int errflag,StoreIOState::Pointer)142 Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer)
143 {
144 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
145 CallJobHere1(47, 7,
146 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
147 Rock::HeaderUpdater,
148 noteDoneReading,
149 errflag);
150 }
151
152 void
noteDoneReading(int errflag)153 Rock::HeaderUpdater::noteDoneReading(int errflag)
154 {
155 debugs(47, 5, errflag << " writer=" << writer);
156 if (!reader) {
157 Must(!errflag); // we only initiate successful closures
158 Must(writer); // otherwise we would be done() and would not be called
159 } else {
160 reader = nullptr; // we are done reading
161 Must(errflag); // any external closures ought to be errors
162 mustStop("read error");
163 }
164 }
165
166 void
startWriting()167 Rock::HeaderUpdater::startWriting()
168 {
169 writer = store->createUpdateIO(
170 update,
171 nullptr, // unused; see StoreIOState::file_callback
172 &NoteDoneWriting,
173 this);
174 Must(writer);
175
176 IoState &rockWriter = dynamic_cast<IoState&>(*writer);
177 rockWriter.staleSplicingPointNext = staleSplicingPointNext;
178
179 off_t offset = 0; // current writing offset (for debugging)
180
181 {
182 debugs(20, 7, "fresh store meta for " << *update.entry);
183 const char *freshSwapHeader = update.entry->getSerialisedMetaData();
184 const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz;
185 Must(freshSwapHeader);
186 writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
187 offset += freshSwapHeaderSize;
188 xfree(freshSwapHeader);
189 }
190
191 {
192 debugs(20, 7, "fresh HTTP header @ " << offset);
193 MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack();
194 writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
195 offset += httpHeader->contentSize();
196 delete httpHeader;
197 }
198
199 {
200 debugs(20, 7, "moved HTTP body prefix @ " << offset);
201 writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
202 offset += exchangeBuffer.length();
203 exchangeBuffer.clear();
204 }
205
206 debugs(20, 7, "wrote " << offset);
207
208 writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
209 }
210
211 void
NoteDoneWriting(void * data,int errflag,StoreIOState::Pointer)212 Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer)
213 {
214 CallJobHere1(47, 7,
215 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
216 Rock::HeaderUpdater,
217 noteDoneWriting,
218 errflag);
219 }
220
221 void
noteDoneWriting(int errflag)222 Rock::HeaderUpdater::noteDoneWriting(int errflag)
223 {
224 debugs(47, 5, errflag << " reader=" << reader);
225 Must(!errflag);
226 Must(!reader); // if we wrote everything, then we must have read everything
227
228 Must(writer);
229 IoState &rockWriter = dynamic_cast<IoState&>(*writer);
230 update.fresh.splicingPoint = rockWriter.splicingPoint;
231 debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
232 store->map->closeForUpdating(update);
233 rockWriter.writeableAnchor_ = nullptr;
234 writer = nullptr; // we are done writing
235
236 Must(doneAll());
237 }
238
239 void
parseReadBytes()240 Rock::HeaderUpdater::parseReadBytes()
241 {
242 if (!staleSwapHeaderSize) {
243 StoreMetaUnpacker aBuilder(
244 exchangeBuffer.rawContent(),
245 exchangeBuffer.length(),
246 &staleSwapHeaderSize);
247 // Squid assumes that metadata always fits into a single db slot
248 aBuilder.checkBuffer(); // cannot update an entry with invalid metadata
249 debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
250 Must(staleSwapHeaderSize > 0);
251 exchangeBuffer.consume(staleSwapHeaderSize);
252 }
253
254 const size_t staleHttpHeaderSize = headersEnd(
255 exchangeBuffer.rawContent(),
256 exchangeBuffer.length());
257 debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
258 if (!staleHttpHeaderSize) {
259 readMore("need more stale HTTP reply header data");
260 return;
261 }
262
263 exchangeBuffer.consume(staleHttpHeaderSize);
264 debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
265
266 stopReading("read the last HTTP header slot");
267 startWriting();
268 }
269
270