1 /*
2 Copyright (c) 2003, 2013, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef MemoryChannel_H
26 #define MemoryChannel_H
27
28 //===========================================================================
29 //
30 // .DESCRIPTION
31 // Pointer based communication channel for communication between two
32 // thread. It does not copy any data in or out the channel so the
33 // item that is put in can not be used untill the other thread has
34 // given it back. There is no support for detecting the return of a
35 // item. The channel is half-duplex.
36 // For comminication between 1 writer and 1 reader use the MemoryChannel
37 // class, for comminication between multiple writer and 1 reader use the
38 // MemoryChannelMultipleWriter. There is no support for multiple readers.
39 //
40 // .TYPICAL USE:
41 // to communicate between threads.
42 //
43 // .EXAMPLE:
44 // See AsyncFile.C
45 //===========================================================================
46 //
47 //
48 // MemoryChannel( int size= 256);
49 // Constuctor
50 // Parameters:
51 // size : amount of pointer it can hold
52 //
53 // void operator ++ ();
54 // increments the index with one, if size is reached it is set to zero
55 //
56 // virtual void write( T *t);
57 // Puts the item in the channel if the channel is full an error is reported.
58 // Parameters:
59 // t: pointer to item to put in the channel, after this the item
60 // is shared with the other thread.
61 // errors
62 // AFS_ERROR_CHANNALFULL, channel is full
63 //
64 // T* read();
65 // Reads a itemn from the channel, if channel is empty it blocks untill
66 // an item can be read.
67 // return
68 // T : item from the channel
69 //
70 // T* tryRead();
71 // Reads a item from the channel, if channel is empty it returns zero.
72 // return
73 // T : item from the channel or zero if channel is empty.
74 //
75
76 #include "NdbMutex.h"
77 #include "NdbCondition.h"
78 #include <NdbOut.hpp>
79
80 #define JAM_FILE_ID 396
81
82
83
84 template <class T>
85 class MemoryChannel
86 {
87 public:
88 MemoryChannel();
89 virtual ~MemoryChannel();
90
91 void writeChannel(T *t);
92 void writeChannelNoSignal(T *t);
93 T* readChannel();
94 T* tryReadChannel();
95
96 /**
97 * Should be made class using MemoryChannel
98 */
99 struct ListMember
100 {
101 T* m_next;
102 };
103
104 private:
105 Uint32 m_occupancy;
106 T* m_head; // First element in list (e.g will be read by readChannel)
107 T* m_tail;
108 NdbMutex* theMutexPtr;
109 NdbCondition* theConditionPtr;
110
111 template<class U>
112 friend NdbOut& operator<<(NdbOut& out, const MemoryChannel<U> & chn);
113 };
114
115 template <class T>
operator <<(NdbOut & out,const MemoryChannel<T> & chn)116 NdbOut& operator<<(NdbOut& out, const MemoryChannel<T> & chn)
117 {
118 NdbMutex_Lock(chn.theMutexPtr);
119 out << "[ occupancy: " << chn.m_occupancy
120 << " ]";
121 NdbMutex_Unlock(chn.theMutexPtr);
122 return out;
123 }
124
MemoryChannel()125 template <class T> MemoryChannel<T>::MemoryChannel() :
126 m_occupancy(0), m_head(0), m_tail(0)
127 {
128 theMutexPtr = NdbMutex_Create();
129 theConditionPtr = NdbCondition_Create();
130 }
131
~MemoryChannel()132 template <class T> MemoryChannel<T>::~MemoryChannel( )
133 {
134 NdbMutex_Destroy(theMutexPtr);
135 NdbCondition_Destroy(theConditionPtr);
136 }
137
writeChannel(T * t)138 template <class T> void MemoryChannel<T>::writeChannel( T *t)
139 {
140 writeChannelNoSignal(t);
141 NdbCondition_Signal(theConditionPtr);
142 }
143
writeChannelNoSignal(T * t)144 template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)
145 {
146 NdbMutex_Lock(theMutexPtr);
147 if (m_head == 0)
148 {
149 assert(m_occupancy == 0);
150 m_head = m_tail = t;
151 }
152 else
153 {
154 assert(m_tail != 0);
155 m_tail->m_mem_channel.m_next = t;
156 m_tail = t;
157 }
158 t->m_mem_channel.m_next = 0;
159 m_occupancy++;
160 NdbMutex_Unlock(theMutexPtr);
161 }
162
readChannel()163 template <class T> T* MemoryChannel<T>::readChannel()
164 {
165 NdbMutex_Lock(theMutexPtr);
166 while (m_head == 0)
167 {
168 assert(m_occupancy == 0);
169 NdbCondition_Wait(theConditionPtr,
170 theMutexPtr);
171 }
172 assert(m_occupancy > 0);
173 T* tmp = m_head;
174 if (m_head == m_tail)
175 {
176 assert(m_occupancy == 1);
177 m_head = m_tail = 0;
178 }
179 else
180 {
181 m_head = m_head->m_mem_channel.m_next;
182 }
183 m_occupancy--;
184 NdbMutex_Unlock(theMutexPtr);
185 return tmp;
186 }
187
tryReadChannel()188 template <class T> T* MemoryChannel<T>::tryReadChannel()
189 {
190 NdbMutex_Lock(theMutexPtr);
191 T* tmp = m_head;
192 if (m_head != 0)
193 {
194 assert(m_occupancy > 0);
195 if (m_head == m_tail)
196 {
197 assert(m_occupancy == 1);
198 m_head = m_tail = 0;
199 }
200 else
201 {
202 m_head = m_head->m_mem_channel.m_next;
203 }
204 m_occupancy--;
205 }
206 else
207 {
208 assert(m_occupancy == 0);
209 }
210 NdbMutex_Unlock(theMutexPtr);
211 return tmp;
212 }
213
214
215 #undef JAM_FILE_ID
216
217 #endif // MemoryChannel_H
218
219