1 /*
2 SPDX-FileCopyrightText: 2007 Klarälvdalens Datakonsult AB
3
4 SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6
7 #include <config-kleopatra.h>
8
9 #include "kdpipeiodevice.h"
10
11 #include <QDebug>
12 #include <QMutex>
13 #include <QPointer>
14 #include <QThread>
15 #include <QWaitCondition>
16 #include "kleopatra_debug.h"
17
18 #include <cstring>
19 #include <memory>
20 #include <algorithm>
21
22 #ifdef Q_OS_WIN32
23 # ifndef NOMINMAX
24 # define NOMINMAX
25 # endif
26 # include <windows.h>
27 # include <io.h>
28 #else
29 # include <unistd.h>
30 # include <errno.h>
31 #endif
32
33 #ifndef KDAB_CHECK_THIS
34 # define KDAB_CHECK_CTOR (void)1
35 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
36 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
37 #endif
38
39 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
40 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
41
42 const unsigned int BUFFER_SIZE = 4096;
43 const bool ALLOW_QIODEVICE_BUFFERING = true;
44
45 namespace
46 {
47 KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
48 }
49
50 #define QDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
51
52 namespace
53 {
54
55 class Reader : public QThread
56 {
57 Q_OBJECT
58 public:
59 Reader(int fd, Qt::HANDLE handle);
60 ~Reader() override;
61
62 qint64 readData(char *data, qint64 maxSize);
63
bytesInBuffer() const64 unsigned int bytesInBuffer() const
65 {
66 return (wptr + sizeof buffer - rptr) % sizeof buffer;
67 }
68
bufferFull() const69 bool bufferFull() const
70 {
71 return bytesInBuffer() == sizeof buffer - 1;
72 }
73
bufferEmpty() const74 bool bufferEmpty() const
75 {
76 return bytesInBuffer() == 0;
77 }
78
bufferContains(char ch)79 bool bufferContains(char ch)
80 {
81 const unsigned int bib = bytesInBuffer();
82 for (unsigned int i = rptr; i < rptr + bib; ++i)
83 if (buffer[i % sizeof buffer] == ch) {
84 return true;
85 }
86 return false;
87 }
88
89 void notifyReadyRead();
90
91 Q_SIGNALS:
92 void readyRead();
93
94 protected:
95 void run() override;
96
97 private:
98 int fd;
99 Qt::HANDLE handle;
100 public:
101 QMutex mutex;
102 QWaitCondition waitForCancelCondition;
103 QWaitCondition bufferNotFullCondition;
104 QWaitCondition bufferNotEmptyCondition;
105 QWaitCondition hasStarted;
106 QWaitCondition readyReadSentCondition;
107 QWaitCondition blockedConsumerIsDoneCondition;
108 bool cancel;
109 bool eof;
110 bool error;
111 bool eofShortCut;
112 int errorCode;
113 bool isReading;
114 bool consumerBlocksOnUs;
115
116 private:
117 unsigned int rptr, wptr;
118 char buffer[BUFFER_SIZE + 1]; // need to keep one byte free to detect empty state
119 };
120
Reader(int fd_,Qt::HANDLE handle_)121 Reader::Reader(int fd_, Qt::HANDLE handle_) : QThread(),
122 fd(fd_),
123 handle(handle_),
124 mutex(),
125 bufferNotFullCondition(),
126 bufferNotEmptyCondition(),
127 hasStarted(),
128 cancel(false),
129 eof(false),
130 error(false),
131 eofShortCut(false),
132 errorCode(0),
133 isReading(false),
134 consumerBlocksOnUs(false),
135 rptr(0),
136 wptr(0)
137 {
138
139 }
140
~Reader()141 Reader::~Reader() {}
142
143 class Writer : public QThread
144 {
145 Q_OBJECT
146 public:
147 Writer(int fd, Qt::HANDLE handle);
148 ~Writer() override;
149
150 qint64 writeData(const char *data, qint64 size);
151
bytesInBuffer() const152 unsigned int bytesInBuffer() const
153 {
154 return numBytesInBuffer;
155 }
156
bufferFull() const157 bool bufferFull() const
158 {
159 return numBytesInBuffer == sizeof buffer;
160 }
161
bufferEmpty() const162 bool bufferEmpty() const
163 {
164 return numBytesInBuffer == 0;
165 }
166
167 Q_SIGNALS:
168 void bytesWritten(qint64);
169
170 protected:
171 void run() override;
172
173 private:
174 int fd;
175 Qt::HANDLE handle;
176 public:
177 QMutex mutex;
178 QWaitCondition bufferEmptyCondition;
179 QWaitCondition bufferNotEmptyCondition;
180 QWaitCondition hasStarted;
181 bool cancel;
182 bool error;
183 int errorCode;
184 private:
185 unsigned int numBytesInBuffer;
186 char buffer[BUFFER_SIZE];
187 };
188 }
189
Writer(int fd_,Qt::HANDLE handle_)190 Writer::Writer(int fd_, Qt::HANDLE handle_) : QThread(),
191 fd(fd_),
192 handle(handle_),
193 mutex(),
194 bufferEmptyCondition(),
195 bufferNotEmptyCondition(),
196 hasStarted(),
197 cancel(false),
198 error(false),
199 errorCode(0),
200 numBytesInBuffer(0)
201 {
202
203 }
204
~Writer()205 Writer::~Writer() {}
206
207 class KDPipeIODevice::Private : public QObject
208 {
209 Q_OBJECT
210 friend class ::KDPipeIODevice;
211 KDPipeIODevice *const q;
212 public:
213
214 explicit Private(KDPipeIODevice *qq);
215 ~Private() override;
216
217 bool doOpen(int, Qt::HANDLE, OpenMode);
218 bool startReaderThread();
219 bool startWriterThread();
220 void stopThreads();
221
222 public Q_SLOTS:
223 void emitReadyRead();
224
225 private:
226 int fd;
227 Qt::HANDLE handle;
228 Reader *reader;
229 Writer *writer;
230 bool triedToStartReader;
231 bool triedToStartWriter;
232 };
233
debugLevel()234 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
235 {
236 return s_debugLevel;
237 }
238
setDebugLevel(KDPipeIODevice::DebugLevel level)239 void KDPipeIODevice::setDebugLevel(KDPipeIODevice::DebugLevel level)
240 {
241 s_debugLevel = level;
242 }
243
Private(KDPipeIODevice * qq)244 KDPipeIODevice::Private::Private(KDPipeIODevice *qq) : QObject(qq), q(qq),
245 fd(-1),
246 handle(nullptr),
247 reader(nullptr),
248 writer(nullptr),
249 triedToStartReader(false),
250 triedToStartWriter(false)
251 {
252
253 }
254
~Private()255 KDPipeIODevice::Private::~Private()
256 {
257 QDebug("KDPipeIODevice::~Private(): Destroying %p", (void *) q);
258 }
259
KDPipeIODevice(QObject * p)260 KDPipeIODevice::KDPipeIODevice(QObject *p)
261 : QIODevice(p), d(new Private(this))
262 {
263 KDAB_CHECK_CTOR;
264 }
265
KDPipeIODevice(int fd,OpenMode mode,QObject * p)266 KDPipeIODevice::KDPipeIODevice(int fd, OpenMode mode, QObject *p)
267 : QIODevice(p), d(new Private(this))
268 {
269 KDAB_CHECK_CTOR;
270 open(fd, mode);
271 }
272
KDPipeIODevice(Qt::HANDLE handle,OpenMode mode,QObject * p)273 KDPipeIODevice::KDPipeIODevice(Qt::HANDLE handle, OpenMode mode, QObject *p)
274 : QIODevice(p), d(new Private(this))
275 {
276 KDAB_CHECK_CTOR;
277 open(handle, mode);
278 }
279
~KDPipeIODevice()280 KDPipeIODevice::~KDPipeIODevice()
281 {
282 KDAB_CHECK_DTOR;
283 if (isOpen()) {
284 close();
285 }
286 delete d; d = nullptr;
287 }
288
open(int fd,OpenMode mode)289 bool KDPipeIODevice::open(int fd, OpenMode mode)
290 {
291 KDAB_CHECK_THIS;
292 #ifdef Q_OS_WIN32
293 return d->doOpen(fd, (HANDLE)_get_osfhandle(fd), mode);
294 #else
295 return d->doOpen(fd, nullptr, mode);
296 #endif
297 }
298
open(Qt::HANDLE h,OpenMode mode)299 bool KDPipeIODevice::open(Qt::HANDLE h, OpenMode mode)
300 {
301 KDAB_CHECK_THIS;
302 #ifdef Q_OS_WIN32
303 return d->doOpen(-1, h, mode);
304 #else
305 Q_UNUSED(h)
306 Q_UNUSED(mode)
307 Q_ASSERT(!"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows.");
308 return false;
309 #endif
310 }
311
startReaderThread()312 bool KDPipeIODevice::Private::startReaderThread()
313 {
314 if (triedToStartReader) {
315 return true;
316 }
317 triedToStartReader = true;
318 if (reader && !reader->isRunning() && !reader->isFinished()) {
319 QDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)");
320 LOCKED(reader);
321 QDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)");
322 reader->start(QThread::HighestPriority);
323 QDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)");
324 const bool hasStarted = reader->hasStarted.wait(&reader->mutex, 1000);
325 QDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)");
326
327 return hasStarted;
328 }
329 return true;
330 }
331
startWriterThread()332 bool KDPipeIODevice::Private::startWriterThread()
333 {
334 if (triedToStartWriter) {
335 return true;
336 }
337 triedToStartWriter = true;
338 if (writer && !writer->isRunning() && !writer->isFinished()) {
339 LOCKED(writer);
340
341 writer->start(QThread::HighestPriority);
342 if (!writer->hasStarted.wait(&writer->mutex, 1000)) {
343 return false;
344 }
345 }
346 return true;
347 }
348
emitReadyRead()349 void KDPipeIODevice::Private::emitReadyRead()
350 {
351 QPointer<Private> thisPointer(this);
352 QDebug("KDPipeIODevice::Private::emitReadyRead %p", (void *) this);
353
354 Q_EMIT q->readyRead();
355
356 if (!thisPointer) {
357 return;
358 }
359 if (reader) {
360 QDebug("KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (
361 void *) this);
362 synchronized(reader) {
363 QDebug("KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (
364 void *) this);
365 reader->readyReadSentCondition.wakeAll();
366 QDebug("KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", (void *)this, reader->bufferEmpty(), reader->isReading);
367 }
368 }
369 QDebug("KDPipeIODevice::Private::emitReadyRead %p leaving", (void *) this);
370
371 }
372
doOpen(int fd_,Qt::HANDLE handle_,OpenMode mode_)373 bool KDPipeIODevice::Private::doOpen(int fd_, Qt::HANDLE handle_, OpenMode mode_)
374 {
375
376 if (q->isOpen()) {
377 return false;
378 }
379
380 #ifdef Q_OS_WIN32
381 if (!handle_) {
382 return false;
383 }
384 #else
385 if (fd_ < 0) {
386 return false;
387 }
388 #endif
389
390 if (!(mode_ & ReadWrite)) {
391 return false; // need to have at least read -or- write
392 }
393
394 std::unique_ptr<Reader> reader_;
395 std::unique_ptr<Writer> writer_;
396
397 if (mode_ & ReadOnly) {
398 reader_ = std::make_unique<Reader>(fd_, handle_);
399 QDebug("KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", (void *)this,
400 (void *)reader_.get(), fd_);
401 connect(reader_.get(), &Reader::readyRead, this, &Private::emitReadyRead,
402 Qt::QueuedConnection);
403 }
404 if (mode_ & WriteOnly) {
405 writer_ = std::make_unique<Writer>(fd_, handle_);
406 QDebug("KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d",
407 (void *)this, (void *)writer_.get(), fd_);
408 connect(writer_.get(), &Writer::bytesWritten, q, &QIODevice::bytesWritten,
409 Qt::QueuedConnection);
410 }
411
412 // commit to *this:
413 fd = fd_;
414 handle = handle_;
415 reader = reader_.release();
416 writer = writer_.release();
417
418 q->setOpenMode(mode_ | Unbuffered);
419 return true;
420 }
421
descriptor() const422 int KDPipeIODevice::descriptor() const
423 {
424 KDAB_CHECK_THIS;
425 return d->fd;
426 }
427
handle() const428 Qt::HANDLE KDPipeIODevice::handle() const
429 {
430 KDAB_CHECK_THIS;
431 return d->handle;
432 }
433
bytesAvailable() const434 qint64 KDPipeIODevice::bytesAvailable() const
435 {
436 KDAB_CHECK_THIS;
437 const qint64 base = QIODevice::bytesAvailable();
438 if (!d->triedToStartReader) {
439 d->startReaderThread();
440 return base;
441 }
442 if (d->reader) {
443 synchronized(d->reader) {
444 const qint64 inBuffer = d->reader->bytesInBuffer();
445 return base + inBuffer;
446 }
447 }
448 return base;
449 }
450
bytesToWrite() const451 qint64 KDPipeIODevice::bytesToWrite() const
452 {
453 KDAB_CHECK_THIS;
454 d->startWriterThread();
455 const qint64 base = QIODevice::bytesToWrite();
456 if (d->writer) {
457 synchronized(d->writer) return base + d->writer->bytesInBuffer();
458 }
459 return base;
460 }
461
canReadLine() const462 bool KDPipeIODevice::canReadLine() const
463 {
464 KDAB_CHECK_THIS;
465 d->startReaderThread();
466 if (QIODevice::canReadLine()) {
467 return true;
468 }
469 if (d->reader) {
470 synchronized(d->reader) return d->reader->bufferContains('\n');
471 }
472 return true;
473 }
474
isSequential() const475 bool KDPipeIODevice::isSequential() const
476 {
477 return true;
478 }
479
atEnd() const480 bool KDPipeIODevice::atEnd() const
481 {
482 KDAB_CHECK_THIS;
483 d->startReaderThread();
484 if (!QIODevice::atEnd()) {
485 QDebug("%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", (void *)this, static_cast<long>(bytesAvailable()));
486 return false;
487 }
488 if (!isOpen()) {
489 return true;
490 }
491 if (d->reader->eofShortCut) {
492 return true;
493 }
494 LOCKED(d->reader);
495 const bool eof = (d->reader->error || d->reader->eof) && d->reader->bufferEmpty();
496 if (!eof) {
497 if (!d->reader->error && !d->reader->eof) {
498 QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof",
499 (void *)(this));
500 }
501 if (!d->reader->bufferEmpty()) {
502 QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()",
503 (void *) this);
504 }
505 }
506 return eof;
507 }
508
waitForBytesWritten(int msecs)509 bool KDPipeIODevice::waitForBytesWritten(int msecs)
510 {
511 KDAB_CHECK_THIS;
512 d->startWriterThread();
513 Writer *const w = d->writer;
514 if (!w) {
515 return true;
516 }
517 LOCKED(w);
518 QDebug("KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area",
519 (void *)this, (void *) w);
520 return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait(&w->mutex, msecs);
521 }
522
waitForReadyRead(int msecs)523 bool KDPipeIODevice::waitForReadyRead(int msecs)
524 {
525 KDAB_CHECK_THIS;
526 QDebug("KDPipeIODEvice::waitForReadyRead()(%p)", (void *) this);
527 d->startReaderThread();
528 if (ALLOW_QIODEVICE_BUFFERING) {
529 if (bytesAvailable() > 0) {
530 return true;
531 }
532 }
533 Reader *const r = d->reader;
534 if (!r || r->eofShortCut) {
535 return true;
536 }
537 LOCKED(r);
538 if (r->bytesInBuffer() != 0 || r->eof || r->error) {
539 return true;
540 }
541 Q_ASSERT(false); // ### wtf?
542 return r->bufferNotEmptyCondition.wait(&r->mutex, msecs);
543 }
544
545 template <typename T>
546 class TemporaryValue
547 {
548 public:
TemporaryValue(T & var_,const T & tv)549 TemporaryValue(T &var_, const T &tv) : var(var_), oldValue(var_)
550 {
551 var = tv;
552 }
~TemporaryValue()553 ~TemporaryValue()
554 {
555 var = oldValue;
556 }
557 private:
558 T &var;
559 const T oldValue;
560 };
561
readWouldBlock() const562 bool KDPipeIODevice::readWouldBlock() const
563 {
564 d->startReaderThread();
565 LOCKED(d->reader);
566 return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
567 }
568
writeWouldBlock() const569 bool KDPipeIODevice::writeWouldBlock() const
570 {
571 d->startWriterThread();
572 LOCKED(d->writer);
573 return !d->writer->bufferEmpty() && !d->writer->error;
574 }
575
readData(char * data,qint64 maxSize)576 qint64 KDPipeIODevice::readData(char *data, qint64 maxSize)
577 {
578 KDAB_CHECK_THIS;
579 QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld", (void *)this, data, maxSize);
580 d->startReaderThread();
581 Reader *const r = d->reader;
582
583 Q_ASSERT(r);
584
585 //assert( r->isRunning() ); // wrong (might be eof, error)
586 Q_ASSERT(data || maxSize == 0);
587 Q_ASSERT(maxSize >= 0);
588
589 if (r->eofShortCut) {
590 QDebug("%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", (void *)this);
591 return 0;
592 }
593
594 if (maxSize < 0) {
595 maxSize = 0;
596 }
597
598 if (ALLOW_QIODEVICE_BUFFERING) {
599 if (bytesAvailable() > 0) {
600 maxSize = std::min(maxSize, bytesAvailable()); // don't block
601 }
602 }
603 QDebug("%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", (void *) this);
604 LOCKED(r);
605 QDebug("%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", (void *) this);
606
607 r->readyReadSentCondition.wakeAll();
608 if (/* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof) { // ### block on maxSize == 0?
609 QDebug("%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", (void *) this);
610 const TemporaryValue<bool> tmp(d->reader->consumerBlocksOnUs, true);
611 r->bufferNotEmptyCondition.wait(&r->mutex);
612 r->blockedConsumerIsDoneCondition.wakeAll();
613 QDebug("%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)",
614 (void *) this);
615 }
616
617 if (r->bufferEmpty()) {
618 QDebug("%p: KDPipeIODevice::readData: got empty buffer, signal eof", (void *) this);
619 // woken with an empty buffer must mean either EOF or error:
620 Q_ASSERT(r->eof || r->error);
621 r->eofShortCut = true;
622 return r->eof ? 0 : -1;
623 }
624
625 QDebug("%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes",
626 (void *)this, maxSize);
627 const qint64 bytesRead = r->readData(data, maxSize);
628 QDebug("%p: KDPipeIODevice::readData: read %lld bytes", (void *)this, bytesRead);
629 QDebug("%p (fd=%d): KDPipeIODevice::readData: %s", (void *)this, d->fd, data);
630
631 return bytesRead;
632 }
633
readData(char * data,qint64 maxSize)634 qint64 Reader::readData(char *data, qint64 maxSize)
635 {
636 qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr;
637 if (numRead > maxSize) {
638 numRead = maxSize;
639 }
640
641 QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
642 (void *)this, data, maxSize, rptr, wptr, bytesInBuffer(), numRead);
643
644 memcpy(data, buffer + rptr, numRead);
645
646 rptr = (rptr + numRead) % sizeof buffer;
647
648 if (!bufferFull()) {
649 QDebug("%p: KDPipeIODevice::readData: signal bufferNotFullCondition", (void *) this);
650 bufferNotFullCondition.wakeAll();
651 }
652
653 return numRead;
654 }
655
writeData(const char * data,qint64 size)656 qint64 KDPipeIODevice::writeData(const char *data, qint64 size)
657 {
658 KDAB_CHECK_THIS;
659 d->startWriterThread();
660 Writer *const w = d->writer;
661
662 Q_ASSERT(w);
663 Q_ASSERT(w->error || w->isRunning());
664 Q_ASSERT(data || size == 0);
665 Q_ASSERT(size >= 0);
666
667 LOCKED(w);
668
669 while (!w->error && !w->bufferEmpty()) {
670 QDebug("%p: KDPipeIODevice::writeData: wait for empty buffer", (void *) this);
671 w->bufferEmptyCondition.wait(&w->mutex);
672 QDebug("%p: KDPipeIODevice::writeData: empty buffer signaled", (void *) this);
673
674 }
675 if (w->error) {
676 return -1;
677 }
678
679 Q_ASSERT(w->bufferEmpty());
680
681 return w->writeData(data, size);
682 }
683
writeData(const char * data,qint64 size)684 qint64 Writer::writeData(const char *data, qint64 size)
685 {
686 Q_ASSERT(bufferEmpty());
687
688 if (size > static_cast<qint64>(sizeof buffer)) {
689 size = sizeof buffer;
690 }
691
692 memcpy(buffer, data, size);
693
694 numBytesInBuffer = size;
695
696 if (!bufferEmpty()) {
697 bufferNotEmptyCondition.wakeAll();
698 }
699 return size;
700 }
701
stopThreads()702 void KDPipeIODevice::Private::stopThreads()
703 {
704 if (triedToStartWriter) {
705 if (writer && q->bytesToWrite() > 0) {
706 q->waitForBytesWritten(-1);
707 }
708
709 Q_ASSERT(q->bytesToWrite() == 0);
710 }
711 if (Reader *&r = reader) {
712 disconnect(r, &Reader::readyRead, this, &Private::emitReadyRead);
713 synchronized(r) {
714 // tell thread to cancel:
715 r->cancel = true;
716 // and wake it, so it can terminate:
717 r->waitForCancelCondition.wakeAll();
718 r->bufferNotFullCondition.wakeAll();
719 r->readyReadSentCondition.wakeAll();
720 }
721 }
722 if (Writer *&w = writer) {
723 synchronized(w) {
724 // tell thread to cancel:
725 w->cancel = true;
726 // and wake it, so it can terminate:
727 w->bufferNotEmptyCondition.wakeAll();
728 }
729 }
730 }
731
close()732 void KDPipeIODevice::close()
733 {
734 KDAB_CHECK_THIS;
735 QDebug("KDPipeIODevice::close(%p)", (void *) this);
736 if (!isOpen()) {
737 return;
738 }
739
740 // tell clients we're about to close:
741 Q_EMIT aboutToClose();
742 d->stopThreads();
743
744 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = nullptr; delete t2; }
745 QDebug("KPipeIODevice::close(%p): wait and closing writer %p", (void *)this, (void *) d->writer);
746 waitAndDelete(d->writer);
747 QDebug("KPipeIODevice::close(%p): wait and closing reader %p", (void *)this, (void *) d->reader);
748 if (d->reader) {
749 LOCKED(d->reader);
750 d->reader->readyReadSentCondition.wakeAll();
751 }
752 waitAndDelete(d->reader);
753 #undef waitAndDelete
754 #ifdef Q_OS_WIN32
755 if (d->fd != -1) {
756 _close(d->fd);
757 } else {
758 CloseHandle(d->handle);
759 }
760 #else
761 ::close(d->fd);
762 #endif
763
764 setOpenMode(NotOpen);
765 d->fd = -1;
766 d->handle = nullptr;
767 }
768
run()769 void Reader::run()
770 {
771
772 LOCKED(this);
773
774 // too bad QThread doesn't have that itself; a signal isn't enough
775 hasStarted.wakeAll();
776
777 QDebug("%p: Reader::run: started", (void *) this);
778
779 while (true) {
780 if (!cancel && (eof || error)) {
781 //notify the client until the buffer is empty and then once
782 //again so he receives eof/error. After that, wait for him
783 //to cancel
784 const bool wasEmpty = bufferEmpty();
785 QDebug("%p: Reader::run: received eof(%d) or error(%d), waking everyone", (void *)this, eof, error);
786 notifyReadyRead();
787 if (!cancel && wasEmpty) {
788 waitForCancelCondition.wait(&mutex);
789 }
790 } else if (!cancel && !bufferFull() && !bufferEmpty()) {
791 QDebug("%p: Reader::run: buffer no longer empty, waking everyone", (void *) this);
792 notifyReadyRead();
793 }
794
795 while (!cancel && !error && bufferFull()) {
796 notifyReadyRead();
797 if (!cancel && bufferFull()) {
798 QDebug("%p: Reader::run: buffer is full, going to sleep", (void *)this);
799 bufferNotFullCondition.wait(&mutex);
800 }
801 }
802
803 if (cancel) {
804 QDebug("%p: Reader::run: detected cancel", (void *)this);
805 goto leave;
806 }
807
808 if (!eof && !error) {
809 if (rptr == wptr) { // optimize for larger chunks in case the buffer is empty
810 rptr = wptr = 0;
811 }
812
813 unsigned int numBytes = (rptr + sizeof buffer - wptr - 1) % sizeof buffer;
814 if (numBytes > sizeof buffer - wptr) {
815 numBytes = sizeof buffer - wptr;
816 }
817
818 QDebug("%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", (void *)this, rptr, wptr, numBytes);
819
820 Q_ASSERT(numBytes > 0);
821
822 QDebug("%p: Reader::run: trying to read %d bytes from fd %d", (void *)this, numBytes, fd);
823 #ifdef Q_OS_WIN32
824 isReading = true;
825 mutex.unlock();
826 DWORD numRead;
827 const bool ok = ReadFile(handle, buffer + wptr, numBytes, &numRead, 0);
828 mutex.lock();
829 isReading = false;
830 if (ok) {
831 if (numRead == 0) {
832 QDebug("%p: Reader::run: got eof (numRead==0)", (void *) this);
833 eof = true;
834 }
835 } else { // !ok
836 errorCode = static_cast<int>(GetLastError());
837 if (errorCode == ERROR_BROKEN_PIPE) {
838 Q_ASSERT(numRead == 0);
839 QDebug("%p: Reader::run: got eof (broken pipe)", (void *) this);
840 eof = true;
841 } else {
842 Q_ASSERT(numRead == 0);
843 QDebug("%p: Reader::run: got error: %s (%d)", (void *) this, strerror(errorCode), errorCode);
844 error = true;
845 }
846 }
847 #else
848 qint64 numRead;
849 mutex.unlock();
850 do {
851 numRead = ::read(fd, buffer + wptr, numBytes);
852 } while (numRead == -1 && errno == EINTR);
853 mutex.lock();
854
855 if (numRead < 0) {
856 errorCode = errno;
857 error = true;
858 QDebug("%p: Reader::run: got error: %d", (void *)this, errorCode);
859 } else if (numRead == 0) {
860 QDebug("%p: Reader::run: eof detected", (void *)this);
861 eof = true;
862 }
863 #endif
864 QDebug("%p (fd=%d): Reader::run: read %ld bytes", (void *) this, fd, static_cast<long>(numRead));
865 QDebug("%p (fd=%d): Reader::run: %s", (void *)this, fd, buffer);
866
867 if (numRead > 0) {
868 QDebug("%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", (void *)this, rptr, wptr);
869 wptr = (wptr + numRead) % sizeof buffer;
870 QDebug("%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", (void *)this, rptr, wptr);
871 }
872 }
873 }
874 leave:
875 QDebug("%p: Reader::run: terminated", (void *)this);
876 }
877
notifyReadyRead()878 void Reader::notifyReadyRead()
879 {
880 QDebug("notifyReadyRead: %d bytes available", bytesInBuffer());
881 Q_ASSERT(!cancel);
882
883 if (consumerBlocksOnUs) {
884 bufferNotEmptyCondition.wakeAll();
885 blockedConsumerIsDoneCondition.wait(&mutex);
886 return;
887 }
888 QDebug("notifyReadyRead: Q_EMIT signal");
889 Q_EMIT readyRead();
890 readyReadSentCondition.wait(&mutex);
891 QDebug("notifyReadyRead: returning from waiting, leave");
892 }
893
run()894 void Writer::run()
895 {
896
897 LOCKED(this);
898
899 // too bad QThread doesn't have that itself; a signal isn't enough
900 hasStarted.wakeAll();
901
902 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: started";
903
904 while (true) {
905
906 while (!cancel && bufferEmpty()) {
907 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
908 bufferEmptyCondition.wakeAll();
909 Q_EMIT bytesWritten(0);
910 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, going to sleep";
911 bufferNotEmptyCondition.wait(&mutex);
912 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: woke up";
913 }
914
915 if (cancel) {
916 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: detected cancel";
917 goto leave;
918 }
919
920 Q_ASSERT(numBytesInBuffer > 0);
921
922 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: Trying to write " << numBytesInBuffer << "bytes";
923 qint64 totalWritten = 0;
924 do {
925 mutex.unlock();
926 #ifdef Q_OS_WIN32
927 DWORD numWritten;
928 QDebug("%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:",
929 (void *) this, fd, numBytesInBuffer, buffer);
930 QDebug("%p (fd=%d): Writer::run: Going into WriteFile", (void *) this, fd);
931 if (!WriteFile(handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0)) {
932 mutex.lock();
933 errorCode = static_cast<int>(GetLastError());
934 QDebug("%p: Writer::run: got error code: %d", (void *) this, errorCode);
935 error = true;
936 goto leave;
937 }
938 #else
939 qint64 numWritten;
940 do {
941 numWritten = ::write(fd, buffer + totalWritten, numBytesInBuffer - totalWritten);
942 } while (numWritten == -1 && errno == EINTR);
943
944 if (numWritten < 0) {
945 mutex.lock();
946 errorCode = errno;
947 QDebug("%p: Writer::run: got error code: %s (%d)", (void *)this, strerror(errorCode), errorCode);
948 error = true;
949 goto leave;
950 }
951 #endif
952 QDebug("%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", (void *)this, fd, numBytesInBuffer, buffer);
953 totalWritten += numWritten;
954 mutex.lock();
955 } while (totalWritten < numBytesInBuffer);
956
957 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: wrote " << totalWritten << "bytes";
958 numBytesInBuffer = 0;
959 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
960 bufferEmptyCondition.wakeAll();
961 Q_EMIT bytesWritten(totalWritten);
962 }
963 leave:
964 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: terminating";
965 numBytesInBuffer = 0;
966 qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
967 bufferEmptyCondition.wakeAll();
968 Q_EMIT bytesWritten(0);
969 }
970
971 // static
makePairOfConnectedPipes()972 std::pair<KDPipeIODevice *, KDPipeIODevice *> KDPipeIODevice::makePairOfConnectedPipes()
973 {
974 KDPipeIODevice *read = nullptr;
975 KDPipeIODevice *write = nullptr;
976 #ifdef Q_OS_WIN32
977 HANDLE rh;
978 HANDLE wh;
979 SECURITY_ATTRIBUTES sa;
980 memset(&sa, 0, sizeof(sa));
981 sa.nLength = sizeof(sa);
982 sa.bInheritHandle = TRUE;
983 if (CreatePipe(&rh, &wh, &sa, BUFFER_SIZE)) {
984 read = new KDPipeIODevice;
985 read->open(rh, ReadOnly);
986 write = new KDPipeIODevice;
987 write->open(wh, WriteOnly);
988 }
989 #else
990 int fds[2];
991 if (pipe(fds) == 0) {
992 read = new KDPipeIODevice;
993 read->open(fds[0], ReadOnly);
994 write = new KDPipeIODevice;
995 write->open(fds[1], WriteOnly);
996 }
997 #endif
998 return std::make_pair(read, write);
999 }
1000
1001 #ifdef KDAB_DEFINE_CHECKS
KDAB_DEFINE_CHECKS(KDPipeIODevice)1002 KDAB_DEFINE_CHECKS(KDPipeIODevice)
1003 {
1004 if (!isOpen()) {
1005 Q_ASSERT(openMode() == NotOpen);
1006 Q_ASSERT(!d->reader);
1007 Q_ASSERT(!d->writer);
1008 #ifdef Q_OS_WIN32
1009 Q_ASSERT(!d->handle);
1010 #else
1011 Q_ASSERT(d->fd < 0);
1012 #endif
1013 } else {
1014 Q_ASSERT(openMode() != NotOpen);
1015 Q_ASSERT(openMode() & ReadWrite);
1016 if (openMode() & ReadOnly) {
1017 Q_ASSERT(d->reader);
1018 synchronized(d->reader)
1019 Q_ASSERT(d->reader->eof || d->reader->error || d->reader->isRunning());
1020 }
1021 if (openMode() & WriteOnly) {
1022 Q_ASSERT(d->writer);
1023 synchronized(d->writer)
1024 Q_ASSERT(d->writer->error || d->writer->isRunning());
1025 }
1026 #ifdef Q_OS_WIN32
1027 Q_ASSERT(d->handle);
1028 #else
1029 Q_ASSERT(d->fd >= 0);
1030 #endif
1031 }
1032 }
1033 #endif // KDAB_DEFINE_CHECKS
1034
1035 #include "kdpipeiodevice.moc"
1036