1 /*
2  * Created on Feb 8, 2005
3  * Created by Alon Rohter
4  * Copyright (C) Azureus Software, Inc, All Rights Reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17  *
18  */
19 
20 package com.aelitis.azureus.core.peermanager.messaging.azureus;
21 
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 
26 import org.gudy.azureus2.core3.util.*;
27 
28 import com.aelitis.azureus.core.networkmanager.Transport;
29 import com.aelitis.azureus.core.peermanager.messaging.*;
30 
31 
32 
33 /**
34  * Length-prefixed message decoding.
35  */
36 public class AZMessageDecoder implements MessageStreamDecoder {
37   private static final int MIN_MESSAGE_LENGTH = 6;  //4 byte id length + at least 1 byte for id + 1 byte version
38   private static final int MAX_MESSAGE_LENGTH = 131072;  //128K arbitrary limit
39 
40   private static final byte SS = DirectByteBuffer.SS_MSG;
41 
42 
43   private DirectByteBuffer payload_buffer = null;
44   private final DirectByteBuffer length_buffer = DirectByteBufferPool.getBuffer( DirectByteBuffer.AL_MSG, 4 );
45   private final ByteBuffer[] decode_array = new ByteBuffer[] { null, length_buffer.getBuffer( SS ) };
46 
47   private boolean reading_length_mode = true;
48 
49   private int message_length;
50   private int pre_read_start_buffer;
51   private int pre_read_start_position;
52 
53   private volatile boolean destroyed = false;
54   private volatile boolean is_paused = false;
55 
56   private ArrayList messages_last_read = new ArrayList();
57   private int protocol_bytes_last_read = 0;
58   private int data_bytes_last_read = 0;
59   private int percent_complete = -1;
60 
61 
62   private byte[] msg_id_bytes = null;
63   private boolean msg_id_read_complete = false;
64 
65   private boolean last_read_made_progress;
66 
67   private int	maximum_message_size = MAX_MESSAGE_LENGTH;
68 
AZMessageDecoder()69   public AZMessageDecoder() {
70     /*nothing*/
71   }
72 
73   public void
setMaximumMessageSize( int max_bytes )74   setMaximumMessageSize(
75 	int	max_bytes )
76   {
77 	  maximum_message_size	= max_bytes;
78   }
79 
performStreamDecode( Transport transport, int max_bytes )80   public int performStreamDecode( Transport transport, int max_bytes ) throws IOException {
81     protocol_bytes_last_read = 0;
82     data_bytes_last_read = 0;
83 
84     int bytes_remaining = max_bytes;
85 
86     while( bytes_remaining > 0 ) {
87       if( destroyed ) {
88          //destruction currently isn't thread safe so one thread can destroy the decoder (e.g. when closing a connection)
89          //while the read-controller is still actively processing the us
90         //Debug.out( "AZ decoder already destroyed: " +transport.getDescription() );
91         break;
92       }
93 
94       if( is_paused ) {
95         Debug.out( "AZ decoder paused" );
96         break;
97       }
98 
99       int bytes_possible = preReadProcess( bytes_remaining );
100 
101       if( bytes_possible < 1 ) {
102         Debug.out( "ERROR AZ: bytes_possible < 1" );
103         break;
104       }
105 
106       long	actual_read;
107 
108       if( reading_length_mode ) {
109     	  actual_read = transport.read( decode_array, 1, 1 );  //only read into length buffer
110       }
111       else {
112     	  actual_read = transport.read( decode_array, 0, 2 );  //read payload buffer, and possibly next message length buffer
113       }
114 
115       last_read_made_progress = actual_read > 0;
116 
117       int bytes_read = postReadProcess();
118 
119       bytes_remaining -= bytes_read;
120 
121       if( bytes_read < bytes_possible ) {
122         break;
123       }
124     }
125 
126     return max_bytes - bytes_remaining;
127   }
128 
129 
130 
getPercentDoneOfCurrentMessage()131   public int getPercentDoneOfCurrentMessage() {
132     return percent_complete;
133   }
134 
135 
136 
removeDecodedMessages()137   public Message[] removeDecodedMessages() {
138     if( messages_last_read.isEmpty() )  return null;
139 
140     Message[] msgs = (Message[])messages_last_read.toArray( new Message[messages_last_read.size()] );
141     messages_last_read.clear();
142 
143     return msgs;
144   }
145 
146 
147 
getProtocolBytesDecoded()148   public int getProtocolBytesDecoded() {  return protocol_bytes_last_read;  }
149 
150 
151 
getDataBytesDecoded()152   public int getDataBytesDecoded() {  return data_bytes_last_read;  }
153 
getLastReadMadeProgress()154   public boolean getLastReadMadeProgress(){ return last_read_made_progress; };
155 
destroy()156   public ByteBuffer destroy() {
157     is_paused = true;
158     destroyed = true;
159 
160     /*
161     int lbuff_read = 0;
162     int pbuff_read = 0;
163     length_buffer.limit( SS, 4 );
164 
165     if( reading_length_mode ) {
166       lbuff_read = length_buffer.position( SS );
167     }
168     else { //reading payload
169       length_buffer.position( SS, 4 );
170       lbuff_read = 4;
171       pbuff_read = payload_buffer == null ? 0 : payload_buffer.position( SS );
172     }
173 
174     ByteBuffer unused = ByteBuffer.allocate( lbuff_read + pbuff_read );
175 
176     length_buffer.flip( SS );
177     unused.put( length_buffer.getBuffer( SS ) );
178 
179     if ( payload_buffer != null ) {
180       payload_buffer.flip( SS );
181       unused.put( payload_buffer.getBuffer( SS ) );
182     }
183 
184     unused.flip();
185     */
186 
187     length_buffer.returnToPool();
188 
189     if( payload_buffer != null ) {
190       payload_buffer.returnToPool();
191       payload_buffer = null;
192     }
193 
194     try{
195 	    for( int i=0; i < messages_last_read.size(); i++ ) {
196 	      Message msg = (Message)messages_last_read.get( i );
197 	      msg.destroy();
198 	    }
199     }catch( IndexOutOfBoundsException e ){
200     	// as access to messages_last_read isn't synchronized we can get this error if we destroy the
201     	// decoder in parallel with messages being removed. We don't really want to synchronize access
202     	// to this so we'll take the hit here
203     }
204 
205     messages_last_read.clear();
206 
207     //return unused;
208     return null;  //NOTE: we don't bother returning any already-read data
209   }
210 
211 
212 
213 
214 
preReadProcess( int allowed )215   private int preReadProcess( int allowed ) {
216     if( allowed < 1 ) {
217       Debug.out( "allowed < 1" );
218     }
219 
220     decode_array[ 0 ] = payload_buffer == null ? null : payload_buffer.getBuffer( SS );  //ensure the decode array has the latest payload pointer
221 
222     int bytes_available = 0;
223     boolean shrink_remaining_buffers = false;
224     int start_buff = reading_length_mode ? 1 : 0;
225     boolean marked = false;
226 
227     for( int i = start_buff; i < 2; i++ ) {  //set buffer limits according to bytes allowed
228       ByteBuffer bb = decode_array[ i ];
229 
230       if( bb == null ) {
231         Debug.out( "preReadProcess:: bb["+i+"] == null, decoder destroyed=" +destroyed );
232 
233         throw( new RuntimeException( "decoder destroyed" ));
234       }
235 
236 
237       if( shrink_remaining_buffers ) {
238         bb.limit( 0 );  //ensure no read into this next buffer is possible
239       }
240       else {
241         int remaining = bb.remaining();
242 
243         if( remaining < 1 )  continue;  //skip full buffer
244 
245         if( !marked ) {
246           pre_read_start_buffer = i;
247           pre_read_start_position = bb.position();
248           marked = true;
249         }
250 
251         if( remaining > allowed ) {  //read only part of this buffer
252           bb.limit( bb.position() + allowed );  //limit current buffer
253           bytes_available += bb.remaining();
254           shrink_remaining_buffers = true;  //shrink any tail buffers
255         }
256         else {  //full buffer is allowed to be read
257           bytes_available += remaining;
258           allowed -= remaining;  //count this buffer toward allowed and move on to the next
259         }
260       }
261     }
262 
263     return bytes_available;
264   }
265 
266 
267 
268 
postReadProcess()269   private int postReadProcess() throws IOException {
270   	int prot_bytes_read = 0;
271     int data_bytes_read = 0;
272 
273     if( !reading_length_mode && !destroyed ) {  //reading payload data mode
274       //ensure-restore proper buffer limits
275       payload_buffer.limit( SS, message_length );
276       length_buffer.limit( SS, 4 );
277 
278       int curr_position = payload_buffer.position( SS );
279       int read = curr_position - pre_read_start_position;
280 
281       if( msg_id_bytes == null && curr_position >= 4 ) {  //need to have read the message id length first 4 bytes
282       	payload_buffer.position( SS, 0 );
283       	int id_size = payload_buffer.getInt( SS );
284       	payload_buffer.position( SS, curr_position );  //restore
285       	if( id_size < 1 || id_size > 1024 )  throw new IOException( "invalid id_size [" +id_size+ "]" );
286       	msg_id_bytes = new byte[ id_size ];
287       }
288 
289       if( msg_id_bytes != null && curr_position >= msg_id_bytes.length + 4 ) {  //need to have also read the message id bytes
290       	if( !msg_id_read_complete ) {
291       		payload_buffer.position( SS, 4 );
292       		payload_buffer.get( SS, msg_id_bytes );
293       		payload_buffer.position( SS, curr_position );  //restore
294       		msg_id_read_complete = true;
295       	}
296 
297       	Message message = MessageManager.getSingleton().lookupMessage( msg_id_bytes );
298 
299       	if ( message == null ){
300 
301       		Debug.out( "Unknown message type '" + new String( msg_id_bytes ) + "'" );
302 
303       		throw( new IOException( "Unknown message type" ));
304       	}
305 
306       	if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
307       		data_bytes_read += read;
308       	}else{
309       		prot_bytes_read += read;
310       	}
311       }
312       else {
313       	prot_bytes_read += read;
314       }
315 
316       if( !payload_buffer.hasRemaining( SS ) && !is_paused ) {  //full message received!
317         payload_buffer.position( SS, 0 );  //prepare for use
318 
319         DirectByteBuffer ref_buff = payload_buffer;
320         payload_buffer = null;
321 
322         try {
323           Message msg = AZMessageFactory.createAZMessage( ref_buff );
324           messages_last_read.add( msg );
325         }
326         catch( Throwable e ) {
327           ref_buff.returnToPoolIfNotFree();
328 
329           	// maintain unexpected erorrs as such so they get logged later
330 
331           if ( e instanceof RuntimeException ){
332 
333         	  throw((RuntimeException)e );
334           }
335 
336           throw new IOException( "AZ message decode failed: " + e.getMessage() );
337         }
338 
339         reading_length_mode = true;  //see if we've already read the next message's length
340         percent_complete = -1;  //reset receive percentage
341         msg_id_bytes = null;
342         msg_id_read_complete = false;
343       }
344       else {  //only partial received so far
345         percent_complete = (payload_buffer.position( SS ) * 100) / message_length;  //compute receive percentage
346       }
347     }
348 
349 
350     if( reading_length_mode && !destroyed ) {
351       length_buffer.limit( SS, 4 );  //ensure proper buffer limit
352 
353       prot_bytes_read += (pre_read_start_buffer == 1) ? length_buffer.position( SS ) - pre_read_start_position : length_buffer.position( SS );
354 
355       if( !length_buffer.hasRemaining( SS ) ) {  //done reading the length
356         reading_length_mode = false;
357         length_buffer.position( SS, 0 );
358 
359         message_length = length_buffer.getInt( SS );
360 
361         length_buffer.position( SS, 0 );  //reset it for next length read
362 
363         if( message_length < MIN_MESSAGE_LENGTH || message_length > maximum_message_size ) {
364           throw new IOException( "Invalid message length given for AZ message decode: " + message_length + " (max=" + maximum_message_size + ")" );
365         }
366 
367         payload_buffer = DirectByteBufferPool.getBuffer( DirectByteBuffer.AL_MSG_AZ_PAYLOAD, message_length );
368       }
369     }
370 
371     protocol_bytes_last_read += prot_bytes_read;
372     data_bytes_last_read += data_bytes_read;
373 
374     return prot_bytes_read + data_bytes_read;
375   }
376 
377 
378 
pauseDecoding()379   public void pauseDecoding() {
380     is_paused = true;
381   }
382 
383 
resumeDecoding()384   public void resumeDecoding() {
385     is_paused = false;
386   }
387 
388 
389 }
390