1 // 2 // C++ Implementation: %{MODULE} 3 // 4 // Description: 5 // 6 // 7 // Author: %{AUTHOR} <%{EMAIL}>, (C) %{YEAR} 8 // 9 // Copyright: See COPYING file that comes with this distribution 10 // 11 // 12 /*************************************************************************** 13 * * 14 * This program is free software; you can redistribute it and/or modify * 15 * it under the terms of the GNU General Public License as published by * 16 * the Free Software Foundation; either version 2 of the License, or * 17 * (at your option) any later version. * 18 * * 19 ***************************************************************************/ 20 21 #include "ADM_default.h" 22 #include "ADM_threads.h" 23 #include "ADM_transfert.h" 24 25 #define HIGH_LVL ((TRANSFERT_BUFFER*2)/3) 26 #define LOW_LVL (TRANSFERT_BUFFER/3) 27 // minimum amount of audio buffer we need 28 #define MIN_REQUIRED (1024*1024) 29 //#define MPLEX_D 30 #define threadFailure(x) if(!(x)){printf("Condition "#x" failed at line %d\n",__LINE__);dumpStatus();ADM_assert(0);} 31 32 //**************** Transfert ******************* 33 // *** Lot of race here : FIXME 34 #if 0 35 Transfert::Transfert(uint32_t minBuffer) 36 { 37 cond=new admCond(&mutex); 38 clientCond=new admCond(&mutex); 39 _minRequired=minBuffer; 40 buffer=new uint8_t[TRANSFERT_BUFFER]; 41 42 aborted=0; 43 transfered_r=0; 44 transfered_w=0; 45 46 head=tail=0; 47 48 } 49 Transfert::~Transfert() 50 { 51 delete cond; 52 delete clientCond; 53 delete [] buffer; 54 55 } 56 uint8_t Transfert::dumpStatus(void) 57 { 58 printf("Mplex threading status : \n"); 59 printf("Aborted :%d\n",aborted); 60 printf("head :%u\n",head); 61 printf("tail :%u\n",tail); 62 printf("mutex :%u\n",mutex.isLocked()); 63 printf("cond :%u\n",cond->iswaiting()); 64 printf("Clientcond :%u\n",clientCond->iswaiting()); 65 printf("Written : %u bytes\n",transfered_w); 66 printf("Read : %u bytes\n",transfered_r); 67 68 printf("cond waiting :%u\n",cond->waiting); 69 printf("cond aborted :%u\n",cond->aborted); 70 71 printf("clientCond waiting :%u\n",clientCond->waiting); 72 printf("clientCond aborted :%u\n",clientCond->aborted); 73 74 75 return 1; 76 } 77 uint32_t Transfert:: read(uint8_t *buf, uint32_t nb ) 78 { 79 uint32_t r=0; 80 uint32_t fill=0; 81 #ifdef MPLEX_D 82 printf("Reading %lu\n",nb); 83 #endif 84 85 while(1) 86 { 87 mutex.lock(); 88 fill=tail-head; 89 if(fill>=nb) 90 { 91 92 memcpy(buf,buffer+head,nb); 93 head+=nb; 94 r+=nb; 95 transfered_r+=nb; 96 goto endit; 97 } 98 99 // Purge 100 memcpy(buf,buffer+head,fill); 101 buf+=fill; 102 nb-=fill; 103 r+=fill; 104 head=tail=0; 105 if(aborted) 106 { 107 108 goto endit; 109 } 110 if(clientCond->iswaiting()) printf("Client : %d\n",clientCond->waiting); 111 threadFailure(!clientCond->iswaiting()); 112 113 #ifdef MPLEX_D 114 printf("Wanted : %lu , left :%lu\n",nb,fill); 115 printf("Slave sleeping\n"); 116 #endif 117 118 cond->wait(); 119 120 if(aborted) 121 { 122 mutex.lock(); 123 goto endit; 124 } 125 } 126 endit: 127 if(clientCond->iswaiting()) // No need to protect as the client is locked 128 { 129 fill=tail-head; 130 if(fill<LOW_LVL) 131 { 132 printf("Waking..\n"); 133 clientCond->wakeup(); 134 } 135 } 136 mutex.unlock(); 137 transfered_w+=r; 138 return r; 139 } 140 //********************************* 141 uint8_t Transfert::fillingUp( void) 142 { 143 uint8_t r=0; 144 145 mutex.lock(); 146 if((tail-head)>HIGH_LVL) 147 r=1; 148 else r=0; 149 mutex.unlock(); 150 return r; 151 152 153 } 154 uint8_t Transfert:: write(uint8_t *buf, uint32_t nb ) 155 { 156 if(aborted) return 0; 157 158 #ifdef MPLEX_D 159 printf("Writing %lu\n",nb); 160 #endif 161 mutex.lock(); 162 // Need to pack ? 163 if(nb+tail>=TRANSFERT_BUFFER) 164 { 165 memmove(buffer,buffer+head,tail-head); 166 tail-=head; 167 head=0; 168 } 169 // Overflow ? 170 if(nb+tail>=TRANSFERT_BUFFER) 171 { 172 printf("\n When writting %lu bytes, we overflow the existing %lu bytes\n",nb,tail-head); 173 threadFailure(0); 174 175 } 176 memcpy(buffer+tail,buf,nb); 177 transfered_w+=nb; 178 tail+=nb; 179 180 if(cond->iswaiting()) 181 { 182 #ifdef MPLEX_D 183 184 printf("Slave waking\n"); 185 #endif 186 cond->wakeup(); 187 } 188 mutex.unlock(); 189 return 1; 190 } 191 uint8_t Transfert::needData( void ) 192 { 193 int32_t l; 194 uint8_t r=0; 195 mutex.lock(); 196 l=tail-head; 197 threadFailure(l>=0); 198 mutex.unlock(); 199 if(l<_minRequired) r=1; 200 if(cond->iswaiting()) r=1; 201 // return cond->iswaiting(); 202 return r; 203 204 } 205 uint8_t Transfert::abort( void ) 206 { 207 aborted=1; 208 if(cond->iswaiting()) 209 cond->abort(); 210 return 1; 211 212 } 213 214 uint8_t Transfert::clientLock( void ) 215 { 216 #ifdef MPLEX_D 217 printf("Slave sleeping (%lu)\n",tail-=head); 218 #endif 219 220 mutex.lock(); 221 // Redo check under same mutex lock 222 if((tail-head)<HIGH_LVL) 223 { 224 mutex.unlock(); 225 return 1; 226 } 227 clientCond->wait(); 228 return 1; 229 230 } 231 #endif 232