1 /*
2   SPDX-FileCopyrightText: 2007 Klarälvdalens Datakonsult AB
3 
4   SPDX-License-Identifier: LGPL-2.0-or-later
5  */
6 
7 #include <config-kleopatra.h>
8 
9 #include "kdpipeiodevice.h"
10 
11 #include <QDebug>
12 #include <QMutex>
13 #include <QPointer>
14 #include <QThread>
15 #include <QWaitCondition>
16 #include "kleopatra_debug.h"
17 
18 #include <cstring>
19 #include <memory>
20 #include <algorithm>
21 
22 #ifdef Q_OS_WIN32
23 # ifndef NOMINMAX
24 #  define NOMINMAX
25 # endif
26 # include <windows.h>
27 # include <io.h>
28 #else
29 # include <unistd.h>
30 # include <errno.h>
31 #endif
32 
33 #ifndef KDAB_CHECK_THIS
34 # define KDAB_CHECK_CTOR (void)1
35 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
36 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
37 #endif
38 
39 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
40 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
41 
42 const unsigned int BUFFER_SIZE = 4096;
43 const bool ALLOW_QIODEVICE_BUFFERING = true;
44 
45 namespace
46 {
47 KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
48 }
49 
50 #define QDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
51 
52 namespace
53 {
54 
55 class Reader : public QThread
56 {
57     Q_OBJECT
58 public:
59     Reader(int fd, Qt::HANDLE handle);
60     ~Reader() override;
61 
62     qint64 readData(char *data, qint64 maxSize);
63 
bytesInBuffer() const64     unsigned int bytesInBuffer() const
65     {
66         return (wptr + sizeof buffer - rptr) % sizeof buffer;
67     }
68 
bufferFull() const69     bool bufferFull() const
70     {
71         return bytesInBuffer() == sizeof buffer - 1;
72     }
73 
bufferEmpty() const74     bool bufferEmpty() const
75     {
76         return bytesInBuffer() == 0;
77     }
78 
bufferContains(char ch)79     bool bufferContains(char ch)
80     {
81         const unsigned int bib = bytesInBuffer();
82         for (unsigned int i = rptr; i < rptr + bib; ++i)
83             if (buffer[i % sizeof buffer] == ch) {
84                 return true;
85             }
86         return false;
87     }
88 
89     void notifyReadyRead();
90 
91 Q_SIGNALS:
92     void readyRead();
93 
94 protected:
95     void run() override;
96 
97 private:
98     int fd;
99     Qt::HANDLE handle;
100 public:
101     QMutex mutex;
102     QWaitCondition waitForCancelCondition;
103     QWaitCondition bufferNotFullCondition;
104     QWaitCondition bufferNotEmptyCondition;
105     QWaitCondition hasStarted;
106     QWaitCondition readyReadSentCondition;
107     QWaitCondition blockedConsumerIsDoneCondition;
108     bool cancel;
109     bool eof;
110     bool error;
111     bool eofShortCut;
112     int errorCode;
113     bool isReading;
114     bool consumerBlocksOnUs;
115 
116 private:
117     unsigned int rptr, wptr;
118     char buffer[BUFFER_SIZE + 1]; // need to keep one byte free to detect empty state
119 };
120 
Reader(int fd_,Qt::HANDLE handle_)121 Reader::Reader(int fd_, Qt::HANDLE handle_) : QThread(),
122     fd(fd_),
123     handle(handle_),
124     mutex(),
125     bufferNotFullCondition(),
126     bufferNotEmptyCondition(),
127     hasStarted(),
128     cancel(false),
129     eof(false),
130     error(false),
131     eofShortCut(false),
132     errorCode(0),
133     isReading(false),
134     consumerBlocksOnUs(false),
135     rptr(0),
136     wptr(0)
137 {
138 
139 }
140 
~Reader()141 Reader::~Reader() {}
142 
143 class Writer : public QThread
144 {
145     Q_OBJECT
146 public:
147     Writer(int fd, Qt::HANDLE handle);
148     ~Writer() override;
149 
150     qint64 writeData(const char *data, qint64 size);
151 
bytesInBuffer() const152     unsigned int bytesInBuffer() const
153     {
154         return numBytesInBuffer;
155     }
156 
bufferFull() const157     bool bufferFull() const
158     {
159         return numBytesInBuffer == sizeof buffer;
160     }
161 
bufferEmpty() const162     bool bufferEmpty() const
163     {
164         return numBytesInBuffer == 0;
165     }
166 
167 Q_SIGNALS:
168     void bytesWritten(qint64);
169 
170 protected:
171     void run() override;
172 
173 private:
174     int fd;
175     Qt::HANDLE handle;
176 public:
177     QMutex mutex;
178     QWaitCondition bufferEmptyCondition;
179     QWaitCondition bufferNotEmptyCondition;
180     QWaitCondition hasStarted;
181     bool cancel;
182     bool error;
183     int errorCode;
184 private:
185     unsigned int numBytesInBuffer;
186     char buffer[BUFFER_SIZE];
187 };
188 }
189 
Writer(int fd_,Qt::HANDLE handle_)190 Writer::Writer(int fd_, Qt::HANDLE handle_) : QThread(),
191     fd(fd_),
192     handle(handle_),
193     mutex(),
194     bufferEmptyCondition(),
195     bufferNotEmptyCondition(),
196     hasStarted(),
197     cancel(false),
198     error(false),
199     errorCode(0),
200     numBytesInBuffer(0)
201 {
202 
203 }
204 
~Writer()205 Writer::~Writer() {}
206 
207 class KDPipeIODevice::Private : public QObject
208 {
209     Q_OBJECT
210     friend class ::KDPipeIODevice;
211     KDPipeIODevice *const q;
212 public:
213 
214     explicit Private(KDPipeIODevice *qq);
215     ~Private() override;
216 
217     bool doOpen(int, Qt::HANDLE, OpenMode);
218     bool startReaderThread();
219     bool startWriterThread();
220     void stopThreads();
221 
222 public Q_SLOTS:
223     void emitReadyRead();
224 
225 private:
226     int fd;
227     Qt::HANDLE handle;
228     Reader *reader;
229     Writer *writer;
230     bool triedToStartReader;
231     bool triedToStartWriter;
232 };
233 
debugLevel()234 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
235 {
236     return s_debugLevel;
237 }
238 
setDebugLevel(KDPipeIODevice::DebugLevel level)239 void KDPipeIODevice::setDebugLevel(KDPipeIODevice::DebugLevel level)
240 {
241     s_debugLevel = level;
242 }
243 
Private(KDPipeIODevice * qq)244 KDPipeIODevice::Private::Private(KDPipeIODevice *qq) : QObject(qq), q(qq),
245     fd(-1),
246     handle(nullptr),
247     reader(nullptr),
248     writer(nullptr),
249     triedToStartReader(false),
250     triedToStartWriter(false)
251 {
252 
253 }
254 
~Private()255 KDPipeIODevice::Private::~Private()
256 {
257     QDebug("KDPipeIODevice::~Private(): Destroying %p", (void *) q);
258 }
259 
KDPipeIODevice(QObject * p)260 KDPipeIODevice::KDPipeIODevice(QObject *p)
261     : QIODevice(p), d(new Private(this))
262 {
263     KDAB_CHECK_CTOR;
264 }
265 
KDPipeIODevice(int fd,OpenMode mode,QObject * p)266 KDPipeIODevice::KDPipeIODevice(int fd, OpenMode mode, QObject *p)
267     : QIODevice(p), d(new Private(this))
268 {
269     KDAB_CHECK_CTOR;
270     open(fd, mode);
271 }
272 
KDPipeIODevice(Qt::HANDLE handle,OpenMode mode,QObject * p)273 KDPipeIODevice::KDPipeIODevice(Qt::HANDLE handle, OpenMode mode, QObject *p)
274     : QIODevice(p), d(new Private(this))
275 {
276     KDAB_CHECK_CTOR;
277     open(handle, mode);
278 }
279 
~KDPipeIODevice()280 KDPipeIODevice::~KDPipeIODevice()
281 {
282     KDAB_CHECK_DTOR;
283     if (isOpen()) {
284         close();
285     }
286     delete d; d = nullptr;
287 }
288 
open(int fd,OpenMode mode)289 bool KDPipeIODevice::open(int fd, OpenMode mode)
290 {
291     KDAB_CHECK_THIS;
292 #ifdef Q_OS_WIN32
293     return d->doOpen(fd, (HANDLE)_get_osfhandle(fd), mode);
294 #else
295     return d->doOpen(fd, nullptr, mode);
296 #endif
297 }
298 
open(Qt::HANDLE h,OpenMode mode)299 bool KDPipeIODevice::open(Qt::HANDLE h, OpenMode mode)
300 {
301     KDAB_CHECK_THIS;
302 #ifdef Q_OS_WIN32
303     return d->doOpen(-1, h, mode);
304 #else
305     Q_UNUSED(h)
306     Q_UNUSED(mode)
307     Q_ASSERT(!"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows.");
308     return false;
309 #endif
310 }
311 
startReaderThread()312 bool KDPipeIODevice::Private::startReaderThread()
313 {
314     if (triedToStartReader) {
315         return true;
316     }
317     triedToStartReader = true;
318     if (reader && !reader->isRunning() && !reader->isFinished()) {
319         QDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)");
320         LOCKED(reader);
321         QDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)");
322         reader->start(QThread::HighestPriority);
323         QDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)");
324         const bool hasStarted = reader->hasStarted.wait(&reader->mutex, 1000);
325         QDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)");
326 
327         return hasStarted;
328     }
329     return true;
330 }
331 
startWriterThread()332 bool KDPipeIODevice::Private::startWriterThread()
333 {
334     if (triedToStartWriter) {
335         return true;
336     }
337     triedToStartWriter = true;
338     if (writer && !writer->isRunning() && !writer->isFinished()) {
339         LOCKED(writer);
340 
341         writer->start(QThread::HighestPriority);
342         if (!writer->hasStarted.wait(&writer->mutex, 1000)) {
343             return false;
344         }
345     }
346     return true;
347 }
348 
emitReadyRead()349 void KDPipeIODevice::Private::emitReadyRead()
350 {
351     QPointer<Private> thisPointer(this);
352     QDebug("KDPipeIODevice::Private::emitReadyRead %p", (void *) this);
353 
354     Q_EMIT q->readyRead();
355 
356     if (!thisPointer) {
357         return;
358     }
359     if (reader) {
360         QDebug("KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (
361                    void *) this);
362         synchronized(reader) {
363             QDebug("KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (
364                        void *) this);
365             reader->readyReadSentCondition.wakeAll();
366             QDebug("KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", (void *)this, reader->bufferEmpty(), reader->isReading);
367         }
368     }
369     QDebug("KDPipeIODevice::Private::emitReadyRead %p leaving", (void *) this);
370 
371 }
372 
doOpen(int fd_,Qt::HANDLE handle_,OpenMode mode_)373 bool KDPipeIODevice::Private::doOpen(int fd_, Qt::HANDLE handle_, OpenMode mode_)
374 {
375 
376     if (q->isOpen()) {
377         return false;
378     }
379 
380 #ifdef Q_OS_WIN32
381     if (!handle_) {
382         return false;
383     }
384 #else
385     if (fd_ < 0) {
386         return false;
387     }
388 #endif
389 
390     if (!(mode_ & ReadWrite)) {
391         return false;    // need to have at least read -or- write
392     }
393 
394     std::unique_ptr<Reader> reader_;
395     std::unique_ptr<Writer> writer_;
396 
397     if (mode_ & ReadOnly) {
398         reader_ = std::make_unique<Reader>(fd_, handle_);
399         QDebug("KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", (void *)this,
400                (void *)reader_.get(), fd_);
401         connect(reader_.get(), &Reader::readyRead, this, &Private::emitReadyRead,
402                 Qt::QueuedConnection);
403     }
404     if (mode_ & WriteOnly) {
405         writer_ = std::make_unique<Writer>(fd_, handle_);
406         QDebug("KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d",
407                (void *)this, (void *)writer_.get(), fd_);
408         connect(writer_.get(), &Writer::bytesWritten, q, &QIODevice::bytesWritten,
409                 Qt::QueuedConnection);
410     }
411 
412     // commit to *this:
413     fd = fd_;
414     handle = handle_;
415     reader = reader_.release();
416     writer = writer_.release();
417 
418     q->setOpenMode(mode_ | Unbuffered);
419     return true;
420 }
421 
descriptor() const422 int KDPipeIODevice::descriptor() const
423 {
424     KDAB_CHECK_THIS;
425     return d->fd;
426 }
427 
handle() const428 Qt::HANDLE KDPipeIODevice::handle() const
429 {
430     KDAB_CHECK_THIS;
431     return d->handle;
432 }
433 
bytesAvailable() const434 qint64 KDPipeIODevice::bytesAvailable() const
435 {
436     KDAB_CHECK_THIS;
437     const qint64 base = QIODevice::bytesAvailable();
438     if (!d->triedToStartReader) {
439         d->startReaderThread();
440         return base;
441     }
442     if (d->reader) {
443         synchronized(d->reader) {
444             const qint64 inBuffer = d->reader->bytesInBuffer();
445             return base + inBuffer;
446         }
447     }
448     return base;
449 }
450 
bytesToWrite() const451 qint64 KDPipeIODevice::bytesToWrite() const
452 {
453     KDAB_CHECK_THIS;
454     d->startWriterThread();
455     const qint64 base = QIODevice::bytesToWrite();
456     if (d->writer) {
457         synchronized(d->writer) return base + d->writer->bytesInBuffer();
458     }
459     return base;
460 }
461 
canReadLine() const462 bool KDPipeIODevice::canReadLine() const
463 {
464     KDAB_CHECK_THIS;
465     d->startReaderThread();
466     if (QIODevice::canReadLine()) {
467         return true;
468     }
469     if (d->reader) {
470         synchronized(d->reader) return d->reader->bufferContains('\n');
471     }
472     return true;
473 }
474 
isSequential() const475 bool KDPipeIODevice::isSequential() const
476 {
477     return true;
478 }
479 
atEnd() const480 bool KDPipeIODevice::atEnd() const
481 {
482     KDAB_CHECK_THIS;
483     d->startReaderThread();
484     if (!QIODevice::atEnd()) {
485         QDebug("%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", (void *)this, static_cast<long>(bytesAvailable()));
486         return false;
487     }
488     if (!isOpen()) {
489         return true;
490     }
491     if (d->reader->eofShortCut) {
492         return true;
493     }
494     LOCKED(d->reader);
495     const bool eof = (d->reader->error || d->reader->eof) && d->reader->bufferEmpty();
496     if (!eof) {
497         if (!d->reader->error && !d->reader->eof) {
498             QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof",
499                    (void *)(this));
500         }
501         if (!d->reader->bufferEmpty()) {
502             QDebug("%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()",
503                    (void *)  this);
504         }
505     }
506     return eof;
507 }
508 
waitForBytesWritten(int msecs)509 bool KDPipeIODevice::waitForBytesWritten(int msecs)
510 {
511     KDAB_CHECK_THIS;
512     d->startWriterThread();
513     Writer *const w = d->writer;
514     if (!w) {
515         return true;
516     }
517     LOCKED(w);
518     QDebug("KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area",
519            (void *)this, (void *) w);
520     return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait(&w->mutex, msecs);
521 }
522 
waitForReadyRead(int msecs)523 bool KDPipeIODevice::waitForReadyRead(int msecs)
524 {
525     KDAB_CHECK_THIS;
526     QDebug("KDPipeIODEvice::waitForReadyRead()(%p)", (void *) this);
527     d->startReaderThread();
528     if (ALLOW_QIODEVICE_BUFFERING) {
529         if (bytesAvailable() > 0) {
530             return true;
531         }
532     }
533     Reader *const r = d->reader;
534     if (!r || r->eofShortCut) {
535         return true;
536     }
537     LOCKED(r);
538     if (r->bytesInBuffer() != 0 || r->eof || r->error) {
539         return true;
540     }
541     Q_ASSERT(false);   // ### wtf?
542     return r->bufferNotEmptyCondition.wait(&r->mutex, msecs);
543 }
544 
545 template <typename T>
546 class TemporaryValue
547 {
548 public:
TemporaryValue(T & var_,const T & tv)549     TemporaryValue(T &var_, const T &tv) : var(var_), oldValue(var_)
550     {
551         var = tv;
552     }
~TemporaryValue()553     ~TemporaryValue()
554     {
555         var = oldValue;
556     }
557 private:
558     T &var;
559     const T oldValue;
560 };
561 
readWouldBlock() const562 bool KDPipeIODevice::readWouldBlock() const
563 {
564     d->startReaderThread();
565     LOCKED(d->reader);
566     return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
567 }
568 
writeWouldBlock() const569 bool KDPipeIODevice::writeWouldBlock() const
570 {
571     d->startWriterThread();
572     LOCKED(d->writer);
573     return !d->writer->bufferEmpty() && !d->writer->error;
574 }
575 
readData(char * data,qint64 maxSize)576 qint64 KDPipeIODevice::readData(char *data, qint64 maxSize)
577 {
578     KDAB_CHECK_THIS;
579     QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld", (void *)this, data, maxSize);
580     d->startReaderThread();
581     Reader *const r = d->reader;
582 
583     Q_ASSERT(r);
584 
585     //assert( r->isRunning() ); // wrong (might be eof, error)
586     Q_ASSERT(data || maxSize == 0);
587     Q_ASSERT(maxSize >= 0);
588 
589     if (r->eofShortCut) {
590         QDebug("%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", (void *)this);
591         return 0;
592     }
593 
594     if (maxSize < 0) {
595         maxSize = 0;
596     }
597 
598     if (ALLOW_QIODEVICE_BUFFERING) {
599         if (bytesAvailable() > 0) {
600             maxSize = std::min(maxSize, bytesAvailable());    // don't block
601         }
602     }
603     QDebug("%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", (void *) this);
604     LOCKED(r);
605     QDebug("%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", (void *) this);
606 
607     r->readyReadSentCondition.wakeAll();
608     if (/* maxSize > 0 && */ r->bufferEmpty() &&  !r->error && !r->eof) {   // ### block on maxSize == 0?
609         QDebug("%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", (void *) this);
610         const TemporaryValue<bool> tmp(d->reader->consumerBlocksOnUs, true);
611         r->bufferNotEmptyCondition.wait(&r->mutex);
612         r->blockedConsumerIsDoneCondition.wakeAll();
613         QDebug("%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)",
614                (void *) this);
615     }
616 
617     if (r->bufferEmpty()) {
618         QDebug("%p: KDPipeIODevice::readData: got empty buffer, signal eof", (void *) this);
619         // woken with an empty buffer must mean either EOF or error:
620         Q_ASSERT(r->eof || r->error);
621         r->eofShortCut = true;
622         return r->eof ? 0 : -1;
623     }
624 
625     QDebug("%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes",
626            (void *)this, maxSize);
627     const qint64 bytesRead = r->readData(data, maxSize);
628     QDebug("%p: KDPipeIODevice::readData: read %lld bytes", (void *)this, bytesRead);
629     QDebug("%p (fd=%d): KDPipeIODevice::readData: %s", (void *)this, d->fd, data);
630 
631     return bytesRead;
632 }
633 
readData(char * data,qint64 maxSize)634 qint64 Reader::readData(char *data, qint64 maxSize)
635 {
636     qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr;
637     if (numRead > maxSize) {
638         numRead = maxSize;
639     }
640 
641     QDebug("%p: KDPipeIODevice::readData: data=%s, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
642            (void *)this, data, maxSize, rptr, wptr, bytesInBuffer(), numRead);
643 
644     memcpy(data, buffer + rptr, numRead);
645 
646     rptr = (rptr + numRead) % sizeof buffer;
647 
648     if (!bufferFull()) {
649         QDebug("%p: KDPipeIODevice::readData: signal bufferNotFullCondition", (void *) this);
650         bufferNotFullCondition.wakeAll();
651     }
652 
653     return numRead;
654 }
655 
writeData(const char * data,qint64 size)656 qint64 KDPipeIODevice::writeData(const char *data, qint64 size)
657 {
658     KDAB_CHECK_THIS;
659     d->startWriterThread();
660     Writer *const w = d->writer;
661 
662     Q_ASSERT(w);
663     Q_ASSERT(w->error || w->isRunning());
664     Q_ASSERT(data || size == 0);
665     Q_ASSERT(size >= 0);
666 
667     LOCKED(w);
668 
669     while (!w->error && !w->bufferEmpty()) {
670         QDebug("%p: KDPipeIODevice::writeData: wait for empty buffer", (void *) this);
671         w->bufferEmptyCondition.wait(&w->mutex);
672         QDebug("%p: KDPipeIODevice::writeData: empty buffer signaled", (void *) this);
673 
674     }
675     if (w->error) {
676         return -1;
677     }
678 
679     Q_ASSERT(w->bufferEmpty());
680 
681     return w->writeData(data, size);
682 }
683 
writeData(const char * data,qint64 size)684 qint64 Writer::writeData(const char *data, qint64 size)
685 {
686     Q_ASSERT(bufferEmpty());
687 
688     if (size > static_cast<qint64>(sizeof buffer)) {
689         size = sizeof buffer;
690     }
691 
692     memcpy(buffer, data, size);
693 
694     numBytesInBuffer = size;
695 
696     if (!bufferEmpty()) {
697         bufferNotEmptyCondition.wakeAll();
698     }
699     return size;
700 }
701 
stopThreads()702 void KDPipeIODevice::Private::stopThreads()
703 {
704     if (triedToStartWriter) {
705         if (writer && q->bytesToWrite() > 0) {
706             q->waitForBytesWritten(-1);
707         }
708 
709         Q_ASSERT(q->bytesToWrite() == 0);
710     }
711     if (Reader *&r = reader) {
712         disconnect(r, &Reader::readyRead, this, &Private::emitReadyRead);
713         synchronized(r) {
714             // tell thread to cancel:
715             r->cancel = true;
716             // and wake it, so it can terminate:
717             r->waitForCancelCondition.wakeAll();
718             r->bufferNotFullCondition.wakeAll();
719             r->readyReadSentCondition.wakeAll();
720         }
721     }
722     if (Writer *&w = writer) {
723         synchronized(w) {
724             // tell thread to cancel:
725             w->cancel = true;
726             // and wake it, so it can terminate:
727             w->bufferNotEmptyCondition.wakeAll();
728         }
729     }
730 }
731 
close()732 void KDPipeIODevice::close()
733 {
734     KDAB_CHECK_THIS;
735     QDebug("KDPipeIODevice::close(%p)", (void *) this);
736     if (!isOpen()) {
737         return;
738     }
739 
740     // tell clients we're about to close:
741     Q_EMIT aboutToClose();
742     d->stopThreads();
743 
744 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = nullptr; delete t2; }
745     QDebug("KPipeIODevice::close(%p): wait and closing writer %p", (void *)this, (void *) d->writer);
746     waitAndDelete(d->writer);
747     QDebug("KPipeIODevice::close(%p): wait and closing reader %p", (void *)this, (void *) d->reader);
748     if (d->reader) {
749         LOCKED(d->reader);
750         d->reader->readyReadSentCondition.wakeAll();
751     }
752     waitAndDelete(d->reader);
753 #undef waitAndDelete
754 #ifdef Q_OS_WIN32
755     if (d->fd != -1) {
756         _close(d->fd);
757     } else {
758         CloseHandle(d->handle);
759     }
760 #else
761     ::close(d->fd);
762 #endif
763 
764     setOpenMode(NotOpen);
765     d->fd = -1;
766     d->handle = nullptr;
767 }
768 
run()769 void Reader::run()
770 {
771 
772     LOCKED(this);
773 
774     // too bad QThread doesn't have that itself; a signal isn't enough
775     hasStarted.wakeAll();
776 
777     QDebug("%p: Reader::run: started", (void *) this);
778 
779     while (true) {
780         if (!cancel && (eof || error)) {
781             //notify the client until the buffer is empty and then once
782             //again so he receives eof/error. After that, wait for him
783             //to cancel
784             const bool wasEmpty = bufferEmpty();
785             QDebug("%p: Reader::run: received eof(%d) or error(%d), waking everyone", (void *)this, eof, error);
786             notifyReadyRead();
787             if (!cancel && wasEmpty) {
788                 waitForCancelCondition.wait(&mutex);
789             }
790         } else if (!cancel && !bufferFull() && !bufferEmpty()) {
791             QDebug("%p: Reader::run: buffer no longer empty, waking everyone", (void *) this);
792             notifyReadyRead();
793         }
794 
795         while (!cancel && !error && bufferFull()) {
796             notifyReadyRead();
797             if (!cancel && bufferFull()) {
798                 QDebug("%p: Reader::run: buffer is full, going to sleep", (void *)this);
799                 bufferNotFullCondition.wait(&mutex);
800             }
801         }
802 
803         if (cancel) {
804             QDebug("%p: Reader::run: detected cancel", (void *)this);
805             goto leave;
806         }
807 
808         if (!eof && !error) {
809             if (rptr == wptr) { // optimize for larger chunks in case the buffer is empty
810                 rptr = wptr = 0;
811             }
812 
813             unsigned int numBytes = (rptr + sizeof buffer - wptr - 1) % sizeof buffer;
814             if (numBytes > sizeof buffer - wptr) {
815                 numBytes = sizeof buffer - wptr;
816             }
817 
818             QDebug("%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", (void *)this, rptr, wptr, numBytes);
819 
820             Q_ASSERT(numBytes > 0);
821 
822             QDebug("%p: Reader::run: trying to read %d bytes from fd %d", (void *)this, numBytes, fd);
823 #ifdef Q_OS_WIN32
824             isReading = true;
825             mutex.unlock();
826             DWORD numRead;
827             const bool ok = ReadFile(handle, buffer + wptr, numBytes, &numRead, 0);
828             mutex.lock();
829             isReading = false;
830             if (ok) {
831                 if (numRead == 0) {
832                     QDebug("%p: Reader::run: got eof (numRead==0)", (void *) this);
833                     eof = true;
834                 }
835             } else { // !ok
836                 errorCode = static_cast<int>(GetLastError());
837                 if (errorCode == ERROR_BROKEN_PIPE) {
838                     Q_ASSERT(numRead == 0);
839                     QDebug("%p: Reader::run: got eof (broken pipe)", (void *) this);
840                     eof = true;
841                 } else {
842                     Q_ASSERT(numRead == 0);
843                     QDebug("%p: Reader::run: got error: %s (%d)", (void *) this, strerror(errorCode), errorCode);
844                     error = true;
845                 }
846             }
847 #else
848             qint64 numRead;
849             mutex.unlock();
850             do {
851                 numRead = ::read(fd, buffer + wptr, numBytes);
852             } while (numRead == -1 && errno == EINTR);
853             mutex.lock();
854 
855             if (numRead < 0) {
856                 errorCode = errno;
857                 error = true;
858                 QDebug("%p: Reader::run: got error: %d", (void *)this, errorCode);
859             } else if (numRead == 0) {
860                 QDebug("%p: Reader::run: eof detected", (void *)this);
861                 eof = true;
862             }
863 #endif
864             QDebug("%p (fd=%d): Reader::run: read %ld bytes", (void *) this, fd, static_cast<long>(numRead));
865             QDebug("%p (fd=%d): Reader::run: %s", (void *)this, fd, buffer);
866 
867             if (numRead > 0) {
868                 QDebug("%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", (void *)this, rptr, wptr);
869                 wptr = (wptr + numRead) % sizeof buffer;
870                 QDebug("%p: Reader::run: buffer after:  rptr=%4d, wptr=%4d", (void *)this, rptr, wptr);
871             }
872         }
873     }
874 leave:
875     QDebug("%p: Reader::run: terminated", (void *)this);
876 }
877 
notifyReadyRead()878 void Reader::notifyReadyRead()
879 {
880     QDebug("notifyReadyRead: %d bytes available", bytesInBuffer());
881     Q_ASSERT(!cancel);
882 
883     if (consumerBlocksOnUs) {
884         bufferNotEmptyCondition.wakeAll();
885         blockedConsumerIsDoneCondition.wait(&mutex);
886         return;
887     }
888     QDebug("notifyReadyRead: Q_EMIT signal");
889     Q_EMIT readyRead();
890     readyReadSentCondition.wait(&mutex);
891     QDebug("notifyReadyRead: returning from waiting, leave");
892 }
893 
run()894 void Writer::run()
895 {
896 
897     LOCKED(this);
898 
899     // too bad QThread doesn't have that itself; a signal isn't enough
900     hasStarted.wakeAll();
901 
902     qCDebug(KLEOPATRA_LOG) << this << "Writer::run: started";
903 
904     while (true) {
905 
906         while (!cancel && bufferEmpty()) {
907             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
908             bufferEmptyCondition.wakeAll();
909             Q_EMIT bytesWritten(0);
910             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, going to sleep";
911             bufferNotEmptyCondition.wait(&mutex);
912             qCDebug(KLEOPATRA_LOG) << this << "Writer::run: woke up";
913         }
914 
915         if (cancel) {
916             qCDebug(KLEOPATRA_LOG) << this <<  "Writer::run: detected cancel";
917             goto leave;
918         }
919 
920         Q_ASSERT(numBytesInBuffer > 0);
921 
922         qCDebug(KLEOPATRA_LOG) << this << "Writer::run: Trying to write " << numBytesInBuffer << "bytes";
923         qint64 totalWritten = 0;
924         do {
925             mutex.unlock();
926 #ifdef Q_OS_WIN32
927             DWORD numWritten;
928             QDebug("%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:",
929                    (void *) this, fd, numBytesInBuffer, buffer);
930             QDebug("%p (fd=%d): Writer::run: Going into WriteFile", (void *) this, fd);
931             if (!WriteFile(handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0)) {
932                 mutex.lock();
933                 errorCode = static_cast<int>(GetLastError());
934                 QDebug("%p: Writer::run: got error code: %d", (void *) this, errorCode);
935                 error = true;
936                 goto leave;
937             }
938 #else
939             qint64 numWritten;
940             do {
941                 numWritten = ::write(fd, buffer + totalWritten, numBytesInBuffer - totalWritten);
942             } while (numWritten == -1 && errno == EINTR);
943 
944             if (numWritten < 0) {
945                 mutex.lock();
946                 errorCode = errno;
947                 QDebug("%p: Writer::run: got error code: %s (%d)", (void *)this, strerror(errorCode), errorCode);
948                 error = true;
949                 goto leave;
950             }
951 #endif
952             QDebug("%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", (void *)this, fd, numBytesInBuffer, buffer);
953             totalWritten += numWritten;
954             mutex.lock();
955         } while (totalWritten < numBytesInBuffer);
956 
957         qCDebug(KLEOPATRA_LOG) << this << "Writer::run: wrote " << totalWritten << "bytes";
958         numBytesInBuffer = 0;
959         qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
960         bufferEmptyCondition.wakeAll();
961         Q_EMIT bytesWritten(totalWritten);
962     }
963 leave:
964     qCDebug(KLEOPATRA_LOG) << this << "Writer::run: terminating";
965     numBytesInBuffer = 0;
966     qCDebug(KLEOPATRA_LOG) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
967     bufferEmptyCondition.wakeAll();
968     Q_EMIT bytesWritten(0);
969 }
970 
971 // static
makePairOfConnectedPipes()972 std::pair<KDPipeIODevice *, KDPipeIODevice *> KDPipeIODevice::makePairOfConnectedPipes()
973 {
974     KDPipeIODevice *read = nullptr;
975     KDPipeIODevice *write = nullptr;
976 #ifdef Q_OS_WIN32
977     HANDLE rh;
978     HANDLE wh;
979     SECURITY_ATTRIBUTES sa;
980     memset(&sa, 0, sizeof(sa));
981     sa.nLength = sizeof(sa);
982     sa.bInheritHandle = TRUE;
983     if (CreatePipe(&rh, &wh, &sa, BUFFER_SIZE)) {
984         read = new KDPipeIODevice;
985         read->open(rh, ReadOnly);
986         write = new KDPipeIODevice;
987         write->open(wh, WriteOnly);
988     }
989 #else
990     int fds[2];
991     if (pipe(fds) == 0) {
992         read = new KDPipeIODevice;
993         read->open(fds[0], ReadOnly);
994         write = new KDPipeIODevice;
995         write->open(fds[1], WriteOnly);
996     }
997 #endif
998     return std::make_pair(read, write);
999 }
1000 
1001 #ifdef KDAB_DEFINE_CHECKS
KDAB_DEFINE_CHECKS(KDPipeIODevice)1002 KDAB_DEFINE_CHECKS(KDPipeIODevice)
1003 {
1004     if (!isOpen()) {
1005         Q_ASSERT(openMode() == NotOpen);
1006         Q_ASSERT(!d->reader);
1007         Q_ASSERT(!d->writer);
1008 #ifdef Q_OS_WIN32
1009         Q_ASSERT(!d->handle);
1010 #else
1011         Q_ASSERT(d->fd < 0);
1012 #endif
1013     } else {
1014         Q_ASSERT(openMode() != NotOpen);
1015         Q_ASSERT(openMode() & ReadWrite);
1016         if (openMode() & ReadOnly) {
1017             Q_ASSERT(d->reader);
1018             synchronized(d->reader)
1019             Q_ASSERT(d->reader->eof || d->reader->error || d->reader->isRunning());
1020         }
1021         if (openMode() & WriteOnly) {
1022             Q_ASSERT(d->writer);
1023             synchronized(d->writer)
1024             Q_ASSERT(d->writer->error || d->writer->isRunning());
1025         }
1026 #ifdef Q_OS_WIN32
1027         Q_ASSERT(d->handle);
1028 #else
1029         Q_ASSERT(d->fd >= 0);
1030 #endif
1031     }
1032 }
1033 #endif // KDAB_DEFINE_CHECKS
1034 
1035 #include "kdpipeiodevice.moc"
1036