1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #ifndef STREAM_H
4 #define STREAM_H
5 
6 #include "base/i2-base.hpp"
7 #include "base/object.hpp"
8 #include <boost/signals2.hpp>
9 #include <condition_variable>
10 #include <mutex>
11 
12 namespace icinga
13 {
14 
15 class String;
16 class Stream;
17 
18 enum ConnectionRole
19 {
20 	RoleClient,
21 	RoleServer
22 };
23 
24 struct StreamReadContext
25 {
~StreamReadContexticinga::StreamReadContext26 	~StreamReadContext()
27 	{
28 		free(Buffer);
29 	}
30 
31 	bool FillFromStream(const intrusive_ptr<Stream>& stream, bool may_wait);
32 	void DropData(size_t count);
33 
34 	char *Buffer{nullptr};
35 	size_t Size{0};
36 	bool MustRead{true};
37 	bool Eof{false};
38 };
39 
40 enum StreamReadStatus
41 {
42 	StatusNewItem,
43 	StatusNeedData,
44 	StatusEof
45 };
46 
47 /**
48  * A stream.
49  *
50  * @ingroup base
51  */
52 class Stream : public Object
53 {
54 public:
55 	DECLARE_PTR_TYPEDEFS(Stream);
56 
57 	/**
58 	 * Reads data from the stream without removing it from the stream buffer.
59 	 *
60 	 * @param buffer The buffer where data should be stored. May be nullptr if you're
61 	 *		 not actually interested in the data.
62 	 * @param count The number of bytes to read from the queue.
63 	 * @param allow_partial Whether to allow partial reads.
64 	 * @returns The number of bytes actually read.
65 	 */
66 	virtual size_t Peek(void *buffer, size_t count, bool allow_partial = false);
67 
68 	/**
69 	 * Reads data from the stream.
70 	 *
71 	 * @param buffer The buffer where data should be stored. May be nullptr if you're
72 	 *		 not actually interested in the data.
73 	 * @param count The number of bytes to read from the queue.
74 	 * @param allow_partial Whether to allow partial reads.
75 	 * @returns The number of bytes actually read.
76 	 */
77 	virtual size_t Read(void *buffer, size_t count, bool allow_partial = false) = 0;
78 
79 	/**
80 	 * Writes data to the stream.
81 	 *
82 	 * @param buffer The data that is to be written.
83 	 * @param count The number of bytes to write.
84 	 * @returns The number of bytes written
85 	 */
86 	virtual void Write(const void *buffer, size_t count) = 0;
87 
88 	/**
89 	 * Causes the stream to be closed (via Close()) once all pending data has been
90 	 * written.
91 	 */
92 	virtual void Shutdown();
93 
94 	/**
95 	 * Closes the stream and releases resources.
96 	 */
97 	virtual void Close();
98 
99 	/**
100 	 * Checks whether we've reached the end-of-file condition.
101 	 *
102 	 * @returns true if EOF.
103 	 */
104 	virtual bool IsEof() const = 0;
105 
106 	/**
107 	 * Waits until data can be read from the stream.
108 	 * Optionally with a timeout.
109 	 */
110 	bool WaitForData();
111 	bool WaitForData(int timeout);
112 
113 	virtual bool SupportsWaiting() const;
114 
115 	virtual bool IsDataAvailable() const;
116 
117 	void RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler);
118 
119 	StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);
120 
121 protected:
122 	void SignalDataAvailable();
123 
124 private:
125 	boost::signals2::signal<void(const Stream::Ptr&)> OnDataAvailable;
126 
127 	std::mutex m_Mutex;
128 	std::condition_variable m_CV;
129 };
130 
131 }
132 
133 #endif /* STREAM_H */
134