1 /*
2    Copyright (c) 2003, 2013, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef MemoryChannel_H
26 #define MemoryChannel_H
27 
28 //===========================================================================
29 //
30 // .DESCRIPTION
31 //              Pointer based communication channel for communication between two
32 //              thread. It does not copy any data in or out the channel so the
33 //              item that is put in can not be used untill the other thread has
34 //              given it back. There is no support for detecting the return of a
35 //              item. The channel is half-duplex.
36 //              For comminication between 1 writer and 1 reader use the MemoryChannel
37 //              class, for comminication between multiple writer and 1 reader use the
38 //              MemoryChannelMultipleWriter. There is no support for multiple readers.
39 //
40 // .TYPICAL USE:
41 //              to communicate between threads.
42 //
43 // .EXAMPLE:
44 //              See AsyncFile.C
45 //===========================================================================
46 //
47 //
48 // MemoryChannel( int size= 256);
49 //   Constuctor
50 // Parameters:
51 //      size : amount of pointer it can hold
52 //
53 // void operator ++ ();
54 //   increments the index with one, if size is reached it is set to zero
55 //
56 // virtual void write( T *t);
57 //   Puts the item in the channel if the channel is full an error is reported.
58 // Parameters:
59 //      t: pointer to item to put in the channel, after this the item
60 //                        is shared with the other thread.
61 // errors
62 //                      AFS_ERROR_CHANNALFULL, channel is full
63 //
64 // T* read();
65 //      Reads a itemn from the channel, if channel is empty it blocks untill
66 //              an item can be read.
67 // return
68 //                      T : item from the channel
69 //
70 // T* tryRead();
71 //      Reads a item from the channel, if channel is empty it returns zero.
72 // return
73 //                      T : item from the channel or zero if channel is empty.
74 //
75 
76 #include "NdbMutex.h"
77 #include "NdbCondition.h"
78 #include <NdbOut.hpp>
79 
80 #define JAM_FILE_ID 396
81 
82 
83 
84 template <class T>
85 class MemoryChannel
86 {
87 public:
88   MemoryChannel();
89   virtual ~MemoryChannel();
90 
91   void writeChannel(T *t);
92   void writeChannelNoSignal(T *t);
93   T* readChannel();
94   T* tryReadChannel();
95 
96   /**
97    * Should be made class using MemoryChannel
98    */
99   struct ListMember
100   {
101     T* m_next;
102   };
103 
104 private:
105   Uint32 m_occupancy;
106   T* m_head; // First element in list (e.g will be read by readChannel)
107   T* m_tail;
108   NdbMutex* theMutexPtr;
109   NdbCondition* theConditionPtr;
110 
111   template<class U>
112   friend NdbOut& operator<<(NdbOut& out, const MemoryChannel<U> & chn);
113 };
114 
115 template <class T>
operator <<(NdbOut & out,const MemoryChannel<T> & chn)116 NdbOut& operator<<(NdbOut& out, const MemoryChannel<T> & chn)
117 {
118   NdbMutex_Lock(chn.theMutexPtr);
119   out << "[ occupancy: " << chn.m_occupancy
120       << " ]";
121   NdbMutex_Unlock(chn.theMutexPtr);
122   return out;
123 }
124 
MemoryChannel()125 template <class T> MemoryChannel<T>::MemoryChannel() :
126   m_occupancy(0), m_head(0), m_tail(0)
127 {
128   theMutexPtr = NdbMutex_Create();
129   theConditionPtr = NdbCondition_Create();
130 }
131 
~MemoryChannel()132 template <class T> MemoryChannel<T>::~MemoryChannel( )
133 {
134   NdbMutex_Destroy(theMutexPtr);
135   NdbCondition_Destroy(theConditionPtr);
136 }
137 
writeChannel(T * t)138 template <class T> void MemoryChannel<T>::writeChannel( T *t)
139 {
140   writeChannelNoSignal(t);
141   NdbCondition_Signal(theConditionPtr);
142 }
143 
writeChannelNoSignal(T * t)144 template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)
145 {
146   NdbMutex_Lock(theMutexPtr);
147   if (m_head == 0)
148   {
149     assert(m_occupancy == 0);
150     m_head = m_tail = t;
151   }
152   else
153   {
154     assert(m_tail != 0);
155     m_tail->m_mem_channel.m_next = t;
156     m_tail = t;
157   }
158   t->m_mem_channel.m_next = 0;
159   m_occupancy++;
160   NdbMutex_Unlock(theMutexPtr);
161 }
162 
readChannel()163 template <class T> T* MemoryChannel<T>::readChannel()
164 {
165   NdbMutex_Lock(theMutexPtr);
166   while (m_head == 0)
167   {
168     assert(m_occupancy == 0);
169     NdbCondition_Wait(theConditionPtr,
170                       theMutexPtr);
171   }
172   assert(m_occupancy > 0);
173   T* tmp = m_head;
174   if (m_head == m_tail)
175   {
176     assert(m_occupancy == 1);
177     m_head = m_tail = 0;
178   }
179   else
180   {
181     m_head = m_head->m_mem_channel.m_next;
182   }
183   m_occupancy--;
184   NdbMutex_Unlock(theMutexPtr);
185   return tmp;
186 }
187 
tryReadChannel()188 template <class T> T* MemoryChannel<T>::tryReadChannel()
189 {
190   NdbMutex_Lock(theMutexPtr);
191   T* tmp = m_head;
192   if (m_head != 0)
193   {
194     assert(m_occupancy > 0);
195     if (m_head == m_tail)
196     {
197       assert(m_occupancy == 1);
198       m_head = m_tail = 0;
199     }
200     else
201     {
202       m_head = m_head->m_mem_channel.m_next;
203     }
204     m_occupancy--;
205   }
206   else
207   {
208     assert(m_occupancy == 0);
209   }
210   NdbMutex_Unlock(theMutexPtr);
211   return tmp;
212 }
213 
214 
215 #undef JAM_FILE_ID
216 
217 #endif // MemoryChannel_H
218 
219