1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef MemoryChannel_H
26 #define MemoryChannel_H
27
28 //===========================================================================
29 //
30 // .DESCRIPTION
31 // Pointer based communication channel for communication between two
32 // thread. It does not copy any data in or out the channel so the
33 // item that is put in can not be used untill the other thread has
34 // given it back. There is no support for detecting the return of a
35 // item. The channel is half-duplex.
36 // For comminication between 1 writer and 1 reader use the MemoryChannel
37 // class, for comminication between multiple writer and 1 reader use the
38 // MemoryChannelMultipleWriter. There is no support for multiple readers.
39 //
40 // .TYPICAL USE:
41 // to communicate between threads.
42 //
43 // .EXAMPLE:
44 // See AsyncFile.C
45 //===========================================================================
46 //
47 //
48 // MemoryChannel( int size= 256);
49 // Constuctor
50 // Parameters:
51 // size : amount of pointer it can hold
52 //
53 // void operator ++ ();
54 // increments the index with one, if size is reached it is set to zero
55 //
56 // virtual void write( T *t);
57 // Puts the item in the channel if the channel is full an error is reported.
58 // Parameters:
59 // t: pointer to item to put in the channel, after this the item
60 // is shared with the other thread.
61 // errors
62 // AFS_ERROR_CHANNALFULL, channel is full
63 //
64 // T* read();
65 // Reads a itemn from the channel, if channel is empty it blocks untill
66 // an item can be read.
67 // return
68 // T : item from the channel
69 //
70 // T* tryRead();
71 // Reads a item from the channel, if channel is empty it returns zero.
72 // return
73 // T : item from the channel or zero if channel is empty.
74 //
75
76 #include "NdbMutex.h"
77 #include "NdbCondition.h"
78 #include <NdbOut.hpp>
79
80
81 template <class T>
82 class MemoryChannel
83 {
84 public:
85 MemoryChannel();
86 virtual ~MemoryChannel();
87
88 void writeChannel(T *t);
89 void writeChannelNoSignal(T *t);
90 T* readChannel();
91 T* tryReadChannel();
92
93 /**
94 * Should be made class using MemoryChannel
95 */
96 struct ListMember
97 {
98 T* m_next;
99 };
100
101 private:
102 Uint32 m_occupancy;
103 T* m_head; // First element in list (e.g will be read by readChannel)
104 T* m_tail;
105 NdbMutex* theMutexPtr;
106 NdbCondition* theConditionPtr;
107
108 template<class U>
109 friend NdbOut& operator<<(NdbOut& out, const MemoryChannel<U> & chn);
110 };
111
112 template <class T>
operator <<(NdbOut & out,const MemoryChannel<T> & chn)113 NdbOut& operator<<(NdbOut& out, const MemoryChannel<T> & chn)
114 {
115 NdbMutex_Lock(chn.theMutexPtr);
116 out << "[ occupancy: " << chn.m_occupancy
117 << " ]";
118 NdbMutex_Unlock(chn.theMutexPtr);
119 return out;
120 }
121
MemoryChannel()122 template <class T> MemoryChannel<T>::MemoryChannel() :
123 m_occupancy(0), m_head(0), m_tail(0)
124 {
125 theMutexPtr = NdbMutex_Create();
126 theConditionPtr = NdbCondition_Create();
127 }
128
~MemoryChannel()129 template <class T> MemoryChannel<T>::~MemoryChannel( )
130 {
131 NdbMutex_Destroy(theMutexPtr);
132 NdbCondition_Destroy(theConditionPtr);
133 }
134
writeChannel(T * t)135 template <class T> void MemoryChannel<T>::writeChannel( T *t)
136 {
137 writeChannelNoSignal(t);
138 NdbCondition_Signal(theConditionPtr);
139 }
140
writeChannelNoSignal(T * t)141 template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)
142 {
143 NdbMutex_Lock(theMutexPtr);
144 if (m_head == 0)
145 {
146 assert(m_occupancy == 0);
147 m_head = m_tail = t;
148 }
149 else
150 {
151 assert(m_tail != 0);
152 m_tail->m_mem_channel.m_next = t;
153 m_tail = t;
154 }
155 t->m_mem_channel.m_next = 0;
156 m_occupancy++;
157 NdbMutex_Unlock(theMutexPtr);
158 }
159
readChannel()160 template <class T> T* MemoryChannel<T>::readChannel()
161 {
162 NdbMutex_Lock(theMutexPtr);
163 while (m_head == 0)
164 {
165 assert(m_occupancy == 0);
166 NdbCondition_Wait(theConditionPtr,
167 theMutexPtr);
168 }
169 assert(m_occupancy > 0);
170 T* tmp = m_head;
171 if (m_head == m_tail)
172 {
173 assert(m_occupancy == 1);
174 m_head = m_tail = 0;
175 }
176 else
177 {
178 m_head = m_head->m_mem_channel.m_next;
179 }
180 m_occupancy--;
181 NdbMutex_Unlock(theMutexPtr);
182 return tmp;
183 }
184
tryReadChannel()185 template <class T> T* MemoryChannel<T>::tryReadChannel()
186 {
187 NdbMutex_Lock(theMutexPtr);
188 T* tmp = m_head;
189 if (m_head != 0)
190 {
191 assert(m_occupancy > 0);
192 if (m_head == m_tail)
193 {
194 assert(m_occupancy == 1);
195 m_head = m_tail = 0;
196 }
197 else
198 {
199 m_head = m_head->m_mem_channel.m_next;
200 }
201 m_occupancy--;
202 }
203 else
204 {
205 assert(m_occupancy == 0);
206 }
207 NdbMutex_Unlock(theMutexPtr);
208 return tmp;
209 }
210
211 #endif // MemoryChannel_H
212
213