1 /* 2 * Created on Feb 8, 2005 3 * Created by Alon Rohter 4 * Copyright (C) Azureus Software, Inc, All Rights Reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 2 9 * of the License, or (at your option) any later version. 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 17 * 18 */ 19 20 package com.aelitis.azureus.core.peermanager.messaging.azureus; 21 22 import java.io.IOException; 23 import java.nio.ByteBuffer; 24 import java.util.ArrayList; 25 26 import org.gudy.azureus2.core3.util.*; 27 28 import com.aelitis.azureus.core.networkmanager.Transport; 29 import com.aelitis.azureus.core.peermanager.messaging.*; 30 31 32 33 /** 34 * Length-prefixed message decoding. 35 */ 36 public class AZMessageDecoder implements MessageStreamDecoder { 37 private static final int MIN_MESSAGE_LENGTH = 6; //4 byte id length + at least 1 byte for id + 1 byte version 38 private static final int MAX_MESSAGE_LENGTH = 131072; //128K arbitrary limit 39 40 private static final byte SS = DirectByteBuffer.SS_MSG; 41 42 43 private DirectByteBuffer payload_buffer = null; 44 private final DirectByteBuffer length_buffer = DirectByteBufferPool.getBuffer( DirectByteBuffer.AL_MSG, 4 ); 45 private final ByteBuffer[] decode_array = new ByteBuffer[] { null, length_buffer.getBuffer( SS ) }; 46 47 private boolean reading_length_mode = true; 48 49 private int message_length; 50 private int pre_read_start_buffer; 51 private int pre_read_start_position; 52 53 private volatile boolean destroyed = false; 54 private volatile boolean is_paused = false; 55 56 private ArrayList messages_last_read = new ArrayList(); 57 private int protocol_bytes_last_read = 0; 58 private int data_bytes_last_read = 0; 59 private int percent_complete = -1; 60 61 62 private byte[] msg_id_bytes = null; 63 private boolean msg_id_read_complete = false; 64 65 private boolean last_read_made_progress; 66 67 private int maximum_message_size = MAX_MESSAGE_LENGTH; 68 AZMessageDecoder()69 public AZMessageDecoder() { 70 /*nothing*/ 71 } 72 73 public void setMaximumMessageSize( int max_bytes )74 setMaximumMessageSize( 75 int max_bytes ) 76 { 77 maximum_message_size = max_bytes; 78 } 79 performStreamDecode( Transport transport, int max_bytes )80 public int performStreamDecode( Transport transport, int max_bytes ) throws IOException { 81 protocol_bytes_last_read = 0; 82 data_bytes_last_read = 0; 83 84 int bytes_remaining = max_bytes; 85 86 while( bytes_remaining > 0 ) { 87 if( destroyed ) { 88 //destruction currently isn't thread safe so one thread can destroy the decoder (e.g. when closing a connection) 89 //while the read-controller is still actively processing the us 90 //Debug.out( "AZ decoder already destroyed: " +transport.getDescription() ); 91 break; 92 } 93 94 if( is_paused ) { 95 Debug.out( "AZ decoder paused" ); 96 break; 97 } 98 99 int bytes_possible = preReadProcess( bytes_remaining ); 100 101 if( bytes_possible < 1 ) { 102 Debug.out( "ERROR AZ: bytes_possible < 1" ); 103 break; 104 } 105 106 long actual_read; 107 108 if( reading_length_mode ) { 109 actual_read = transport.read( decode_array, 1, 1 ); //only read into length buffer 110 } 111 else { 112 actual_read = transport.read( decode_array, 0, 2 ); //read payload buffer, and possibly next message length buffer 113 } 114 115 last_read_made_progress = actual_read > 0; 116 117 int bytes_read = postReadProcess(); 118 119 bytes_remaining -= bytes_read; 120 121 if( bytes_read < bytes_possible ) { 122 break; 123 } 124 } 125 126 return max_bytes - bytes_remaining; 127 } 128 129 130 getPercentDoneOfCurrentMessage()131 public int getPercentDoneOfCurrentMessage() { 132 return percent_complete; 133 } 134 135 136 removeDecodedMessages()137 public Message[] removeDecodedMessages() { 138 if( messages_last_read.isEmpty() ) return null; 139 140 Message[] msgs = (Message[])messages_last_read.toArray( new Message[messages_last_read.size()] ); 141 messages_last_read.clear(); 142 143 return msgs; 144 } 145 146 147 getProtocolBytesDecoded()148 public int getProtocolBytesDecoded() { return protocol_bytes_last_read; } 149 150 151 getDataBytesDecoded()152 public int getDataBytesDecoded() { return data_bytes_last_read; } 153 getLastReadMadeProgress()154 public boolean getLastReadMadeProgress(){ return last_read_made_progress; }; 155 destroy()156 public ByteBuffer destroy() { 157 is_paused = true; 158 destroyed = true; 159 160 /* 161 int lbuff_read = 0; 162 int pbuff_read = 0; 163 length_buffer.limit( SS, 4 ); 164 165 if( reading_length_mode ) { 166 lbuff_read = length_buffer.position( SS ); 167 } 168 else { //reading payload 169 length_buffer.position( SS, 4 ); 170 lbuff_read = 4; 171 pbuff_read = payload_buffer == null ? 0 : payload_buffer.position( SS ); 172 } 173 174 ByteBuffer unused = ByteBuffer.allocate( lbuff_read + pbuff_read ); 175 176 length_buffer.flip( SS ); 177 unused.put( length_buffer.getBuffer( SS ) ); 178 179 if ( payload_buffer != null ) { 180 payload_buffer.flip( SS ); 181 unused.put( payload_buffer.getBuffer( SS ) ); 182 } 183 184 unused.flip(); 185 */ 186 187 length_buffer.returnToPool(); 188 189 if( payload_buffer != null ) { 190 payload_buffer.returnToPool(); 191 payload_buffer = null; 192 } 193 194 try{ 195 for( int i=0; i < messages_last_read.size(); i++ ) { 196 Message msg = (Message)messages_last_read.get( i ); 197 msg.destroy(); 198 } 199 }catch( IndexOutOfBoundsException e ){ 200 // as access to messages_last_read isn't synchronized we can get this error if we destroy the 201 // decoder in parallel with messages being removed. We don't really want to synchronize access 202 // to this so we'll take the hit here 203 } 204 205 messages_last_read.clear(); 206 207 //return unused; 208 return null; //NOTE: we don't bother returning any already-read data 209 } 210 211 212 213 214 preReadProcess( int allowed )215 private int preReadProcess( int allowed ) { 216 if( allowed < 1 ) { 217 Debug.out( "allowed < 1" ); 218 } 219 220 decode_array[ 0 ] = payload_buffer == null ? null : payload_buffer.getBuffer( SS ); //ensure the decode array has the latest payload pointer 221 222 int bytes_available = 0; 223 boolean shrink_remaining_buffers = false; 224 int start_buff = reading_length_mode ? 1 : 0; 225 boolean marked = false; 226 227 for( int i = start_buff; i < 2; i++ ) { //set buffer limits according to bytes allowed 228 ByteBuffer bb = decode_array[ i ]; 229 230 if( bb == null ) { 231 Debug.out( "preReadProcess:: bb["+i+"] == null, decoder destroyed=" +destroyed ); 232 233 throw( new RuntimeException( "decoder destroyed" )); 234 } 235 236 237 if( shrink_remaining_buffers ) { 238 bb.limit( 0 ); //ensure no read into this next buffer is possible 239 } 240 else { 241 int remaining = bb.remaining(); 242 243 if( remaining < 1 ) continue; //skip full buffer 244 245 if( !marked ) { 246 pre_read_start_buffer = i; 247 pre_read_start_position = bb.position(); 248 marked = true; 249 } 250 251 if( remaining > allowed ) { //read only part of this buffer 252 bb.limit( bb.position() + allowed ); //limit current buffer 253 bytes_available += bb.remaining(); 254 shrink_remaining_buffers = true; //shrink any tail buffers 255 } 256 else { //full buffer is allowed to be read 257 bytes_available += remaining; 258 allowed -= remaining; //count this buffer toward allowed and move on to the next 259 } 260 } 261 } 262 263 return bytes_available; 264 } 265 266 267 268 postReadProcess()269 private int postReadProcess() throws IOException { 270 int prot_bytes_read = 0; 271 int data_bytes_read = 0; 272 273 if( !reading_length_mode && !destroyed ) { //reading payload data mode 274 //ensure-restore proper buffer limits 275 payload_buffer.limit( SS, message_length ); 276 length_buffer.limit( SS, 4 ); 277 278 int curr_position = payload_buffer.position( SS ); 279 int read = curr_position - pre_read_start_position; 280 281 if( msg_id_bytes == null && curr_position >= 4 ) { //need to have read the message id length first 4 bytes 282 payload_buffer.position( SS, 0 ); 283 int id_size = payload_buffer.getInt( SS ); 284 payload_buffer.position( SS, curr_position ); //restore 285 if( id_size < 1 || id_size > 1024 ) throw new IOException( "invalid id_size [" +id_size+ "]" ); 286 msg_id_bytes = new byte[ id_size ]; 287 } 288 289 if( msg_id_bytes != null && curr_position >= msg_id_bytes.length + 4 ) { //need to have also read the message id bytes 290 if( !msg_id_read_complete ) { 291 payload_buffer.position( SS, 4 ); 292 payload_buffer.get( SS, msg_id_bytes ); 293 payload_buffer.position( SS, curr_position ); //restore 294 msg_id_read_complete = true; 295 } 296 297 Message message = MessageManager.getSingleton().lookupMessage( msg_id_bytes ); 298 299 if ( message == null ){ 300 301 Debug.out( "Unknown message type '" + new String( msg_id_bytes ) + "'" ); 302 303 throw( new IOException( "Unknown message type" )); 304 } 305 306 if( message.getType() == Message.TYPE_DATA_PAYLOAD ) { 307 data_bytes_read += read; 308 }else{ 309 prot_bytes_read += read; 310 } 311 } 312 else { 313 prot_bytes_read += read; 314 } 315 316 if( !payload_buffer.hasRemaining( SS ) && !is_paused ) { //full message received! 317 payload_buffer.position( SS, 0 ); //prepare for use 318 319 DirectByteBuffer ref_buff = payload_buffer; 320 payload_buffer = null; 321 322 try { 323 Message msg = AZMessageFactory.createAZMessage( ref_buff ); 324 messages_last_read.add( msg ); 325 } 326 catch( Throwable e ) { 327 ref_buff.returnToPoolIfNotFree(); 328 329 // maintain unexpected erorrs as such so they get logged later 330 331 if ( e instanceof RuntimeException ){ 332 333 throw((RuntimeException)e ); 334 } 335 336 throw new IOException( "AZ message decode failed: " + e.getMessage() ); 337 } 338 339 reading_length_mode = true; //see if we've already read the next message's length 340 percent_complete = -1; //reset receive percentage 341 msg_id_bytes = null; 342 msg_id_read_complete = false; 343 } 344 else { //only partial received so far 345 percent_complete = (payload_buffer.position( SS ) * 100) / message_length; //compute receive percentage 346 } 347 } 348 349 350 if( reading_length_mode && !destroyed ) { 351 length_buffer.limit( SS, 4 ); //ensure proper buffer limit 352 353 prot_bytes_read += (pre_read_start_buffer == 1) ? length_buffer.position( SS ) - pre_read_start_position : length_buffer.position( SS ); 354 355 if( !length_buffer.hasRemaining( SS ) ) { //done reading the length 356 reading_length_mode = false; 357 length_buffer.position( SS, 0 ); 358 359 message_length = length_buffer.getInt( SS ); 360 361 length_buffer.position( SS, 0 ); //reset it for next length read 362 363 if( message_length < MIN_MESSAGE_LENGTH || message_length > maximum_message_size ) { 364 throw new IOException( "Invalid message length given for AZ message decode: " + message_length + " (max=" + maximum_message_size + ")" ); 365 } 366 367 payload_buffer = DirectByteBufferPool.getBuffer( DirectByteBuffer.AL_MSG_AZ_PAYLOAD, message_length ); 368 } 369 } 370 371 protocol_bytes_last_read += prot_bytes_read; 372 data_bytes_last_read += data_bytes_read; 373 374 return prot_bytes_read + data_bytes_read; 375 } 376 377 378 pauseDecoding()379 public void pauseDecoding() { 380 is_paused = true; 381 } 382 383 resumeDecoding()384 public void resumeDecoding() { 385 is_paused = false; 386 } 387 388 389 } 390