1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "Debug.h"
12 #include "fs/rock/RockHeaderUpdater.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
15 #include "Store.h"
16 #include "StoreMetaUnpacker.h"
17 
18 CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
19 
HeaderUpdater(const Rock::SwapDir::Pointer & aStore,const Ipc::StoreMapUpdate & anUpdate)20 Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate):
21     AsyncJob("Rock::HeaderUpdater"),
22     store(aStore),
23     update(anUpdate),
24     reader(),
25     writer(),
26     bytesRead(0),
27     staleSwapHeaderSize(0),
28     staleSplicingPointNext(-1)
29 {
30     // TODO: Consider limiting the number of concurrent store updates.
31 }
32 
33 bool
doneAll() const34 Rock::HeaderUpdater::doneAll() const
35 {
36     return !reader && !writer && AsyncJob::doneAll();
37 }
38 
39 void
swanSong()40 Rock::HeaderUpdater::swanSong()
41 {
42     if (update.stale || update.fresh)
43         store->map->abortUpdating(update);
44 
45     if (reader) {
46         reader->close(StoreIOState::readerDone);
47         reader = nullptr;
48     }
49 
50     if (writer) {
51         writer->close(StoreIOState::writerGone);
52         // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53         // Also required to avoid IoState destructor assertions.
54         // We can do this because we closed update earlier or aborted it above.
55         dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56         writer = nullptr;
57     }
58 
59     AsyncJob::swanSong();
60 }
61 
62 void
start()63 Rock::HeaderUpdater::start()
64 {
65     Must(update.entry);
66     Must(update.stale);
67     Must(update.fresh);
68     startReading();
69 }
70 
71 void
startReading()72 Rock::HeaderUpdater::startReading()
73 {
74     reader = store->openStoreIO(
75                  *update.entry,
76                  nullptr, // unused; see StoreIOState::file_callback
77                  &NoteDoneReading,
78                  this);
79     readMore("need swap entry metadata");
80 }
81 
82 void
stopReading(const char * why)83 Rock::HeaderUpdater::stopReading(const char *why)
84 {
85     debugs(47, 7, why);
86 
87     Must(reader);
88     const IoState &rockReader = dynamic_cast<IoState&>(*reader);
89     update.stale.splicingPoint = rockReader.splicingPoint;
90     staleSplicingPointNext = rockReader.staleSplicingPointNext;
91     debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
92            " body continues at " << staleSplicingPointNext);
93 
94     reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
95     reader = nullptr; // so that swanSong() does not try to close again
96 }
97 
98 void
NoteRead(void * data,const char * buf,ssize_t result,StoreIOState::Pointer)99 Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
100 {
101     IoCbParams io(buf, result);
102     // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
103     CallJobHere1(47, 7,
104                  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
105                  Rock::HeaderUpdater,
106                  noteRead,
107                  io);
108 }
109 
110 void
noteRead(const Rock::HeaderUpdater::IoCbParams result)111 Rock::HeaderUpdater::noteRead(const Rock::HeaderUpdater::IoCbParams result)
112 {
113     debugs(47, 7, result.size);
114     if (!result.size) { // EOF
115         stopReading("eof");
116     } else {
117         Must(result.size > 0);
118         bytesRead += result.size;
119         readerBuffer.rawAppendFinish(result.buf, result.size);
120         exchangeBuffer.append(readerBuffer);
121         debugs(47, 7, "accumulated " << exchangeBuffer.length());
122     }
123 
124     parseReadBytes();
125 }
126 
127 void
readMore(const char * why)128 Rock::HeaderUpdater::readMore(const char *why)
129 {
130     debugs(47, 7, "from " << bytesRead << " because " << why);
131     Must(reader);
132     readerBuffer.clear();
133     storeRead(reader,
134               readerBuffer.rawAppendStart(store->slotSize),
135               store->slotSize,
136               bytesRead,
137               &NoteRead,
138               this);
139 }
140 
141 void
NoteDoneReading(void * data,int errflag,StoreIOState::Pointer)142 Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer)
143 {
144     // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
145     CallJobHere1(47, 7,
146                  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
147                  Rock::HeaderUpdater,
148                  noteDoneReading,
149                  errflag);
150 }
151 
152 void
noteDoneReading(int errflag)153 Rock::HeaderUpdater::noteDoneReading(int errflag)
154 {
155     debugs(47, 5, errflag << " writer=" << writer);
156     if (!reader) {
157         Must(!errflag); // we only initiate successful closures
158         Must(writer); // otherwise we would be done() and would not be called
159     } else {
160         reader = nullptr; // we are done reading
161         Must(errflag); // any external closures ought to be errors
162         mustStop("read error");
163     }
164 }
165 
166 void
startWriting()167 Rock::HeaderUpdater::startWriting()
168 {
169     writer = store->createUpdateIO(
170                  update,
171                  nullptr, // unused; see StoreIOState::file_callback
172                  &NoteDoneWriting,
173                  this);
174     Must(writer);
175 
176     IoState &rockWriter = dynamic_cast<IoState&>(*writer);
177     rockWriter.staleSplicingPointNext = staleSplicingPointNext;
178 
179     off_t offset = 0; // current writing offset (for debugging)
180 
181     {
182         debugs(20, 7, "fresh store meta for " << *update.entry);
183         const char *freshSwapHeader = update.entry->getSerialisedMetaData();
184         const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz;
185         Must(freshSwapHeader);
186         writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
187         offset += freshSwapHeaderSize;
188         xfree(freshSwapHeader);
189     }
190 
191     {
192         debugs(20, 7, "fresh HTTP header @ " << offset);
193         MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack();
194         writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
195         offset += httpHeader->contentSize();
196         delete httpHeader;
197     }
198 
199     {
200         debugs(20, 7, "moved HTTP body prefix @ " << offset);
201         writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
202         offset += exchangeBuffer.length();
203         exchangeBuffer.clear();
204     }
205 
206     debugs(20, 7, "wrote " << offset);
207 
208     writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
209 }
210 
211 void
NoteDoneWriting(void * data,int errflag,StoreIOState::Pointer)212 Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer)
213 {
214     CallJobHere1(47, 7,
215                  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
216                  Rock::HeaderUpdater,
217                  noteDoneWriting,
218                  errflag);
219 }
220 
221 void
noteDoneWriting(int errflag)222 Rock::HeaderUpdater::noteDoneWriting(int errflag)
223 {
224     debugs(47, 5, errflag << " reader=" << reader);
225     Must(!errflag);
226     Must(!reader); // if we wrote everything, then we must have read everything
227 
228     Must(writer);
229     IoState &rockWriter = dynamic_cast<IoState&>(*writer);
230     update.fresh.splicingPoint = rockWriter.splicingPoint;
231     debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
232     store->map->closeForUpdating(update);
233     rockWriter.writeableAnchor_ = nullptr;
234     writer = nullptr; // we are done writing
235 
236     Must(doneAll());
237 }
238 
239 void
parseReadBytes()240 Rock::HeaderUpdater::parseReadBytes()
241 {
242     if (!staleSwapHeaderSize) {
243         StoreMetaUnpacker aBuilder(
244             exchangeBuffer.rawContent(),
245             exchangeBuffer.length(),
246             &staleSwapHeaderSize);
247         // Squid assumes that metadata always fits into a single db slot
248         aBuilder.checkBuffer(); // cannot update an entry with invalid metadata
249         debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
250         Must(staleSwapHeaderSize > 0);
251         exchangeBuffer.consume(staleSwapHeaderSize);
252     }
253 
254     const size_t staleHttpHeaderSize = headersEnd(
255                                            exchangeBuffer.rawContent(),
256                                            exchangeBuffer.length());
257     debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
258     if (!staleHttpHeaderSize) {
259         readMore("need more stale HTTP reply header data");
260         return;
261     }
262 
263     exchangeBuffer.consume(staleHttpHeaderSize);
264     debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
265 
266     stopReading("read the last HTTP header slot");
267     startWriting();
268 }
269 
270