1 // Utilities for io redirection.
2 #include "config.h"  // IWYU pragma: keep
3 
4 #include "io.h"
5 
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <stddef.h>
9 #include <stdio.h>
10 #include <unistd.h>
11 
12 #include <cstring>
13 #include <cwchar>
14 
15 #include "common.h"
16 #include "exec.h"
17 #include "fallback.h"  // IWYU pragma: keep
18 #include "fd_monitor.h"
19 #include "iothread.h"
20 #include "path.h"
21 #include "redirection.h"
22 #include "wutil.h"  // IWYU pragma: keep
23 
24 /// File redirection error message.
25 #define FILE_ERROR _(L"An error occurred while redirecting file '%ls'")
26 #define NOCLOB_ERROR _(L"The file '%ls' already exists")
27 
28 /// Base open mode to pass to calls to open.
29 #define OPEN_MASK 0666
30 
31 /// Provide the fd monitor used for background fillthread operations.
fd_monitor()32 static fd_monitor_t &fd_monitor() {
33     // Deliberately leaked to avoid shutdown dtors.
34     static auto fdm = new fd_monitor_t();
35     return *fdm;
36 }
37 
38 io_data_t::~io_data_t() = default;
39 io_pipe_t::~io_pipe_t() = default;
40 io_fd_t::~io_fd_t() = default;
41 io_close_t::~io_close_t() = default;
42 io_file_t::~io_file_t() = default;
43 io_bufferfill_t::~io_bufferfill_t() = default;
44 
print() const45 void io_close_t::print() const { std::fwprintf(stderr, L"close %d\n", fd); }
46 
print() const47 void io_fd_t::print() const { std::fwprintf(stderr, L"FD map %d -> %d\n", source_fd, fd); }
48 
print() const49 void io_file_t::print() const { std::fwprintf(stderr, L"file %d -> %d\n", file_fd_.fd(), fd); }
50 
print() const51 void io_pipe_t::print() const {
52     std::fwprintf(stderr, L"pipe {%d} (input: %s) -> %d\n", source_fd, is_input_ ? "yes" : "no",
53                   fd);
54 }
55 
print() const56 void io_bufferfill_t::print() const {
57     std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd);
58 }
59 
read_once(int fd,acquired_lock<separated_buffer_t> & buffer)60 ssize_t io_buffer_t::read_once(int fd, acquired_lock<separated_buffer_t> &buffer) {
61     assert(fd >= 0 && "Invalid fd");
62     errno = 0;
63     char bytes[4096 * 4];
64 
65     // We want to swallow EINTR only; in particular EAGAIN needs to be returned back to the caller.
66     ssize_t amt;
67     do {
68         amt = read(fd, bytes, sizeof bytes);
69     } while (amt < 0 && errno == EINTR);
70     if (amt < 0 && errno != EAGAIN) {
71         wperror(L"read");
72     } else if (amt > 0) {
73         buffer->append(bytes, static_cast<size_t>(amt));
74     }
75     return amt;
76 }
77 
begin_filling(autoclose_fd_t fd)78 void io_buffer_t::begin_filling(autoclose_fd_t fd) {
79     ASSERT_IS_MAIN_THREAD();
80     assert(!fillthread_running() && "Already have a fillthread");
81 
82     // We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is
83     // owned by another process, or something else writing in fish.
84     // Pass fd to an fd_monitor. It will add fd to its select() loop, and give us a callback when
85     // the fd is readable, or when our item is poked. The usual path is that we will get called
86     // back, read a bit from the fd, and append it to the buffer. Eventually the write end of the
87     // pipe will be closed - probably the other process exited - and fd will be widowed; read() will
88     // then return 0 and we will stop reading.
89     // In exotic circumstances the write end of the pipe will not be closed; this may happen in
90     // e.g.:
91     //   cmd ( background & ; echo hi )
92     // Here the background process will inherit the write end of the pipe and hold onto it forever.
93     // In this case, when complete_background_fillthread() is called, the callback will be invoked
94     // with item_wake_reason_t::poke, and we will notice that the shutdown flag is set (this
95     // indicates that the command substitution is done); in this case we will read until we get
96     // EAGAIN and then give up.
97 
98     // Construct a promise. We will fulfill it in our fill thread, and wait for it in
99     // complete_background_fillthread(). Note that TSan complains if the promise's dtor races with
100     // the future's call to wait(), so we store the promise, not just its future (#7681).
101     auto promise = std::make_shared<std::promise<void>>();
102     this->fill_waiter_ = promise;
103 
104     // Run our function to read until the receiver is closed.
105     // It's OK to capture 'this' by value because 'this' waits for the promise in its dtor.
106     fd_monitor_item_t item;
107     item.fd = std::move(fd);
108     item.callback = [this, promise](autoclose_fd_t &fd, item_wake_reason_t reason) {
109         ASSERT_IS_BACKGROUND_THREAD();
110         // Only check the shutdown flag if we timed out or were poked.
111         // It's important that if select() indicated we were readable, that we call select() again
112         // allowing it to time out. Note the typical case is that the fd will be closed, in which
113         // case select will return immediately.
114         bool done = false;
115         if (reason == item_wake_reason_t::readable) {
116             // select() reported us as readable; read a bit.
117             auto buffer = buffer_.acquire();
118             ssize_t ret = read_once(fd.fd(), buffer);
119             done = (ret == 0 || (ret < 0 && errno != EAGAIN));
120         } else if (shutdown_fillthread_) {
121             // Here our caller asked us to shut down; read while we keep getting data.
122             // This will stop when the fd is closed or if we get EAGAIN.
123             auto buffer = buffer_.acquire();
124             ssize_t ret;
125             do {
126                 ret = read_once(fd.fd(), buffer);
127             } while (ret > 0);
128             done = true;
129         }
130         if (done) {
131             fd.close();
132             promise->set_value();
133         }
134     };
135     this->item_id_ = fd_monitor().add(std::move(item));
136 }
137 
complete_background_fillthread_and_take_buffer()138 separated_buffer_t io_buffer_t::complete_background_fillthread_and_take_buffer() {
139     // Mark that our fillthread is done, then wake it up.
140     ASSERT_IS_MAIN_THREAD();
141     assert(fillthread_running() && "Should have a fillthread");
142     assert(this->item_id_ > 0 && "Should have a valid item ID");
143     shutdown_fillthread_ = true;
144     fd_monitor().poke_item(this->item_id_);
145 
146     // Wait for the fillthread to fulfill its promise, and then clear the future so we know we no
147     // longer have one.
148     fill_waiter_->get_future().wait();
149     fill_waiter_.reset();
150 
151     // Return our buffer, transferring ownership.
152     auto locked_buff = buffer_.acquire();
153     separated_buffer_t result = std::move(*locked_buff);
154     locked_buff->clear();
155     return result;
156 }
157 
create(size_t buffer_limit,int target)158 shared_ptr<io_bufferfill_t> io_bufferfill_t::create(size_t buffer_limit, int target) {
159     assert(target >= 0 && "Invalid target fd");
160 
161     // Construct our pipes.
162     auto pipes = make_autoclose_pipes();
163     if (!pipes) {
164         return nullptr;
165     }
166     // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is
167     // because our fillthread needs to poll to decide if it should shut down, and also accept input
168     // from direct buffer transfers.
169     if (make_fd_nonblocking(pipes->read.fd())) {
170         FLOGF(warning, PIPE_ERROR);
171         wperror(L"fcntl");
172         return nullptr;
173     }
174     // Our fillthread gets the read end of the pipe; out_pipe gets the write end.
175     auto buffer = std::make_shared<io_buffer_t>(buffer_limit);
176     buffer->begin_filling(std::move(pipes->read));
177     return std::make_shared<io_bufferfill_t>(target, std::move(pipes->write), buffer);
178 }
179 
finish(std::shared_ptr<io_bufferfill_t> && filler)180 separated_buffer_t io_bufferfill_t::finish(std::shared_ptr<io_bufferfill_t> &&filler) {
181     // The io filler is passed in. This typically holds the only instance of the write side of the
182     // pipe used by the buffer's fillthread (except for that side held by other processes). Get the
183     // buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe.
184     // Then allow the buffer to finish.
185     assert(filler && "Null pointer in finish");
186     auto buffer = filler->buffer();
187     filler.reset();
188     return buffer->complete_background_fillthread_and_take_buffer();
189 }
190 
~io_buffer_t()191 io_buffer_t::~io_buffer_t() {
192     assert(!fillthread_running() && "io_buffer_t destroyed with outstanding fillthread");
193 }
194 
remove(const shared_ptr<const io_data_t> & element)195 void io_chain_t::remove(const shared_ptr<const io_data_t> &element) {
196     // See if you can guess why std::find doesn't work here.
197     for (auto iter = this->begin(); iter != this->end(); ++iter) {
198         if (*iter == element) {
199             this->erase(iter);
200             break;
201         }
202     }
203 }
204 
push_back(io_data_ref_t element)205 void io_chain_t::push_back(io_data_ref_t element) {
206     // Ensure we never push back NULL.
207     assert(element.get() != nullptr);
208     std::vector<io_data_ref_t>::push_back(std::move(element));
209 }
210 
append(const io_chain_t & chain)211 void io_chain_t::append(const io_chain_t &chain) {
212     assert(&chain != this && "Cannot append self to self");
213     this->insert(this->end(), chain.begin(), chain.end());
214 }
215 
append_from_specs(const redirection_spec_list_t & specs,const wcstring & pwd)216 bool io_chain_t::append_from_specs(const redirection_spec_list_t &specs, const wcstring &pwd) {
217     bool have_error = false;
218     for (const auto &spec : specs) {
219         switch (spec.mode) {
220             case redirection_mode_t::fd: {
221                 if (spec.is_close()) {
222                     this->push_back(make_unique<io_close_t>(spec.fd));
223                 } else {
224                     auto target_fd = spec.get_target_as_fd();
225                     assert(target_fd.has_value() &&
226                            "fd redirection should have been validated already");
227                     this->push_back(make_unique<io_fd_t>(spec.fd, *target_fd));
228                 }
229                 break;
230             }
231             default: {
232                 // We have a path-based redireciton. Resolve it to a file.
233                 // Mark it as CLO_EXEC because we don't want it to be open in any child.
234                 wcstring path = path_apply_working_directory(spec.target, pwd);
235                 int oflags = spec.oflags();
236                 autoclose_fd_t file{wopen_cloexec(path, oflags, OPEN_MASK)};
237                 if (!file.valid()) {
238                     if ((oflags & O_EXCL) && (errno == EEXIST)) {
239                         FLOGF(warning, NOCLOB_ERROR, spec.target.c_str());
240                     } else {
241                         FLOGF(warning, FILE_ERROR, spec.target.c_str());
242                         if (should_flog(warning)) wperror(L"open");
243                     }
244                     // If opening a file fails, insert a closed FD instead of the file redirection
245                     // and return false. This lets execution potentially recover and at least gives
246                     // the shell a chance to gracefully regain control of the shell (see #7038).
247                     this->push_back(make_unique<io_close_t>(spec.fd));
248                     have_error = true;
249                     break;
250                 }
251                 this->push_back(std::make_shared<io_file_t>(spec.fd, std::move(file)));
252                 break;
253             }
254         }
255     }
256     return !have_error;
257 }
258 
print() const259 void io_chain_t::print() const {
260     if (this->empty()) {
261         std::fwprintf(stderr, L"Empty chain %p\n", this);
262         return;
263     }
264 
265     std::fwprintf(stderr, L"Chain %p (%ld items):\n", this, static_cast<long>(this->size()));
266     for (size_t i = 0; i < this->size(); i++) {
267         const auto &io = this->at(i);
268         if (io == nullptr) {
269             std::fwprintf(stderr, L"\t(null)\n");
270         } else {
271             std::fwprintf(stderr, L"\t%lu: fd:%d, ", static_cast<unsigned long>(i), io->fd);
272             io->print();
273         }
274     }
275 }
276 
io_for_fd(int fd) const277 shared_ptr<const io_data_t> io_chain_t::io_for_fd(int fd) const {
278     for (auto iter = rbegin(); iter != rend(); ++iter) {
279         const auto &data = *iter;
280         if (data->fd == fd) {
281             return data;
282         }
283     }
284     return nullptr;
285 }
286 
append_narrow_buffer(const separated_buffer_t & buffer)287 void output_stream_t::append_narrow_buffer(const separated_buffer_t &buffer) {
288     for (const auto &rhs_elem : buffer.elements()) {
289         append_with_separation(str2wcstring(rhs_elem.contents), rhs_elem.separation);
290     }
291 }
292 
append_with_separation(const wchar_t * s,size_t len,separation_type_t type)293 void output_stream_t::append_with_separation(const wchar_t *s, size_t len, separation_type_t type) {
294     append(s, len);
295     if (type == separation_type_t::explicitly) {
296         append(L'\n');
297     }
298 }
299 
contents() const300 const wcstring &output_stream_t::contents() const { return g_empty_string; }
301 
flush_and_check_error()302 int output_stream_t::flush_and_check_error() { return STATUS_CMD_OK; }
303 
append(const wchar_t * s,size_t amt)304 void fd_output_stream_t::append(const wchar_t *s, size_t amt) {
305     if (errored_) return;
306     int res = wwrite_to_fd(s, amt, this->fd_);
307     if (res < 0) {
308         // TODO: this error is too aggressive, e.g. if we got SIGINT we should not complain.
309         if (errno != EPIPE) {
310             wperror(L"write");
311         }
312         errored_ = true;
313     }
314 }
315 
flush_and_check_error()316 int fd_output_stream_t::flush_and_check_error() {
317     // Return a generic 1 on any write failure.
318     return errored_ ? STATUS_CMD_ERROR : STATUS_CMD_OK;
319 }
320 
append(const wchar_t *,size_t)321 void null_output_stream_t::append(const wchar_t *, size_t) {}
322 
append(const wchar_t * s,size_t amt)323 void string_output_stream_t::append(const wchar_t *s, size_t amt) { contents_.append(s, amt); }
324 
contents() const325 const wcstring &string_output_stream_t::contents() const { return contents_; }
326 
append(const wchar_t * s,size_t amt)327 void buffered_output_stream_t::append(const wchar_t *s, size_t amt) {
328     buffer_->append(wcs2string(s, amt));
329 }
330 
append_with_separation(const wchar_t * s,size_t len,separation_type_t type)331 void buffered_output_stream_t::append_with_separation(const wchar_t *s, size_t len,
332                                                       separation_type_t type) {
333     buffer_->append(wcs2string(s, len), type);
334 }
335 
flush_and_check_error()336 int buffered_output_stream_t::flush_and_check_error() {
337     if (buffer_->discarded()) {
338         return STATUS_READ_TOO_MUCH;
339     }
340     return 0;
341 }
342