1#!/opt/python-2.7/bin/python 2# 3# Authors: Sergey Satskiy 4# 5# $Id: ut.py 482916 2015-10-27 14:49:14Z satskyse $ 6# 7 8""" 9A very basic NetStorage server test 10""" 11 12import sys, datetime, socket, os 13from optparse import OptionParser 14from ncbi_grid_1_1.ncbi import json_over_uttp, uttp 15import pprint 16import random 17 18 19CLIENT_NAME = "NST Basic UT" 20APPLICATION = os.path.basename( os.path.realpath( sys.argv[ 0 ] ) ) 21TEST_OBJ_CONTENT = "NST Basic UT data" 22TEST_ATTR_NAME = "basic_ut_attr_name" 23TEST_ATTR_VALUE = "basic_ut_attr_value" 24 25VERBOSE = False 26 27 28def generateSessionID(): 29 " Generates the session ID in an appropriate format " 30 # It sould be like 1111111111111111_0000SID 31 return str( random.randint( 1111111111111111, 32 9999999999999999 ) ) + "_0000SID" 33 34SESSIONID = generateSessionID() 35NCBI_PHID = 'NST_BASIC_UT_PHID' 36 37try: 38 hostIP = socket.gethostbyname( socket.gethostname() ) 39except: 40 hostIP = "127.0.0.1" 41 42 43 44class NSTProtocolError( Exception ): 45 " NetStorage response does not fit the protocol " 46 def __init__( self, message ): 47 Exception.__init__( self, message ) 48 return 49 50class NSTResponseError( Exception ): 51 " NetStorage response has an error in it " 52 def __init__( self, message ): 53 Exception.__init__( self, message ) 54 return 55 56class NSTObjectContentError( Exception ): 57 " NetStorage read object content does not match written " 58 def __init__( self, message ): 59 Exception.__init__( self, message ) 60 return 61 62class NSTAttrValueError( Exception ): 63 " NetStorage read attribute value does not match written " 64 def __init__( self, message ): 65 Exception.__init__( self, message ) 66 return 67 68 69def printVerbose( msg ): 70 " Prints stdout message conditionally " 71 if VERBOSE: 72 timestamp = datetime.datetime.now().strftime( '%m-%d-%y %H:%M:%S' ) 73 print timestamp + " " + msg 74 return 75 76def printStderr( msg ): 77 " Prints onto stderr with a prefix " 78 timestamp = datetime.datetime.now().strftime( '%m-%d-%y %H:%M:%S' ) 79 print >> sys.stderr, timestamp + " NetStorage check script. " + msg 80 printVerbose( msg ) 81 return 82 83 84 85class NetStorage: 86 " Implements communications with a NetStorage server " 87 88 def __init__( self, host_, port_ ): 89 self.__commandSN = 0 90 self.__sock = None 91 self.__host = host_ 92 self.__port = port_ 93 self.__nst = None 94 return 95 96 def connect( self, timeout ): 97 " Establishes a connection to the server " 98 socket.socket.write = socket.socket.send 99 socket.socket.flush = lambda ignore: ignore 100 101 self.__sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) 102 self.__sock.settimeout( timeout ) 103 self.__sock.connect( ( self.__host, self.__port ) ) 104 105 self.__nst = json_over_uttp.MessageExchange( self.__sock, self.__sock ) 106 return 107 108 def exchange( self, message ): 109 " Does the basic exchange " 110 self.__commandSN += 1 111 112 message[ "SN" ] = self.__commandSN 113 self.printMessage( "Message to server", message ) 114 response = self.__nst.exchange( message ) 115 self.printMessage( "Message from server", response ) 116 self.checkResponse( response ) 117 return response 118 119 def receive( self ): 120 " Receives a single server message " 121 response = self.__nst.receive() 122 self.printMessage( "Message from server", response ) 123 return response 124 125 @staticmethod 126 def printMessage( prefix, msg ): 127 " Prints the message " 128 if VERBOSE: 129 print prefix + ":" 130 prettyPrinter = pprint.PrettyPrinter( indent = 4 ) 131 prettyPrinter.pprint( msg ) 132 return 133 134 def sendHello( self, service, metadataOption ): 135 " Sends the HELLO message " 136 message = { 'Type': 'HELLO', 137 'SessionID': SESSIONID, 138 'ncbi_phid': NCBI_PHID, 139 'ClientIP': hostIP, 140 'Application': APPLICATION, 141 'Ticket': 'None', 142 'Metadata': metadataOption, 143 'Client': CLIENT_NAME } 144 if service: 145 message[ "Service" ] = service 146 response = self.exchange( message ) 147 return response 148 149 def createInNetCache( self ): 150 " Creates an object in NetCache " 151 # Flags for NetCache 152 storageFlags = { "Fast": True, "Movable": True } 153 message = { 'Type': 'CREATE', 154 'SessionID': SESSIONID, 155 'ncbi_phid': NCBI_PHID, 156 'ClientIP': hostIP, 157 'StorageFlags': storageFlags } 158 159 response = self.exchange( message ) 160 return response 161 162 def writeTestObj( self ): 163 " Writes test object content " 164 uttp_writer = self.__nst.get_uttp_writer() 165 data = uttp_writer.send_chunk( TEST_OBJ_CONTENT ) 166 if data: 167 self.__sock.send(data) 168 data = uttp_writer.send_control_symbol('\n') 169 if data: 170 self.__sock.send(data) 171 data = uttp_writer.flush_buf() 172 if data: 173 self.__sock.send(data) 174 175 response = self.receive() 176 self.checkResponse( response ) 177 return response 178 179 def setAttr( self, objectLoc, attrName, attrValue ): 180 " Sends SETATTR message " 181 message = { 'Type': 'SETATTR', 182 'SessionID': SESSIONID, 183 'ncbi_phid': NCBI_PHID, 184 'ClientIP': hostIP, 185 'ObjectLoc': objectLoc, 186 'AttrName': attrName, 187 'AttrValue': attrValue } 188 189 response = self.exchange( message ) 190 return response 191 192 def getAttr( self, objectLoc, attrName ): 193 " Sends GETATTR message " 194 message = { 'Type': 'GETATTR', 195 'SessionID': SESSIONID, 196 'ncbi_phid': NCBI_PHID, 197 'ClientIP': hostIP, 198 'ObjectLoc': objectLoc, 199 'AttrName': attrName } 200 201 response = self.exchange( message ) 202 return response 203 204 def readPrologue( self, objectLoc ): 205 " Reads the given object " 206 message = { 'Type': 'READ', 207 'SessionID': SESSIONID, 208 'ncbi_phid': NCBI_PHID, 209 'ClientIP': hostIP, 210 'ObjectLoc': objectLoc } 211 212 response = self.exchange( message ) 213 return response 214 215 def readTestObj( self ): 216 " Reads the test object " 217 content = "" 218 uttp_reader = self.__nst.get_uttp_reader() 219 220 while True: 221 buf = self.__sock.recv( 1024 ) 222 uttp_reader.set_new_buf(buf) 223 while True: 224 event = uttp_reader.next_event() 225 if event == uttp.Reader.END_OF_BUFFER: 226 break 227 228 if event == uttp.Reader.CHUNK_PART or \ 229 event == uttp.Reader.CHUNK: 230 content += uttp_reader.get_chunk() 231 elif event == uttp.Reader.CONTROL_SYMBOL: 232 if uttp_reader.get_control_symbol() != '\n': 233 raise NSTProtocolError( "Invalid data stream terminator" ) 234 response = self.receive() 235 self.checkResponse( response ) 236 return response, content 237 else: 238 raise NSTProtocolError( "Unexpected UTTP packet type" ) 239 240 def delete( self, objectLoc ): 241 " Deletes the given object " 242 message = { 'Type': 'DELETE', 243 'SessionID': SESSIONID, 244 'ncbi_phid': NCBI_PHID, 245 'ClientIP': hostIP, 246 'ObjectLoc': objectLoc } 247 response = self.exchange( message ) 248 return response 249 250 @staticmethod 251 def checkResponse( response ): 252 " Checks the server response message and throws an exception if errors " 253 if "Status" not in response: 254 raise NSTResponseError( "Server response does not have 'Status'" ) 255 if response[ "Status" ] != "OK": 256 if "Errors" in response: 257 # It is a list of errors 258 errors = [] 259 for item in response[ "Errors" ]: 260 code = "???" 261 msg = "???" 262 if "Code" in item: 263 code = str( item[ "Code" ] ) 264 if "Message" in item: 265 msg = str( item[ "Message" ] ) 266 errors.append( "Code: " + code + " Message: " + msg ) 267 raise NSTResponseError( "\n".join( errors ) ) 268 else: 269 raise NSTResponseError( "Unknown server error response" ) 270 271 272 273def writeReadObject( nst ): 274 " Creates, writes, reads an object and compares the content " 275 response = nst.createInNetCache() 276 if "ObjectLoc" not in response: 277 raise NSTResponseError( "Cannot find object " 278 "locator in CREATE response" ) 279 objectLoc = response[ "ObjectLoc" ] 280 nst.writeTestObj() 281 nst.readPrologue( objectLoc ) 282 response, content = nst.readTestObj() 283 if content != TEST_OBJ_CONTENT: 284 raise NSTObjectContentError( "Read object content does not " 285 "match written" ) 286 return objectLoc 287 288 289def safeDelete( nst, objectLoc ): 290 " Suppress exceptions while deleting an object " 291 if nst is None: 292 return 293 if objectLoc is None: 294 return 295 try: 296 nst.delete( objectLoc ) 297 except: 298 pass 299 300 301def main(): 302 " main function for the netstorage health check " 303 304 try: 305 parser = OptionParser( 306 """ 307 %prog <connection point> 308 """ ) 309 parser.add_option( "-v", "--verbose", 310 action="store_true", dest="verbose", default=False, 311 help="be verbose (default: False)" ) 312 parser.add_option( "--service", dest="service", default=None, 313 help="service name for the HELLO message" ) 314 parser.add_option( "--loops", dest="loops", default=1, 315 help="number of loops" ) 316 parser.add_option( "--timeout", dest="timeout", default=5, 317 help="communication timeout" ) 318 parser.add_option( "--no-db", action="store_true", dest="no_db", default=False, 319 help="without metadata (default: with metadata)" ) 320 321 # parse the command line options 322 options, args = parser.parse_args() 323 global VERBOSE 324 VERBOSE = options.verbose 325 326 if len( args ) != 1: 327 printStderr( "Incorrect number of arguments" ) 328 return 1 329 330 # These arguments are always available 331 connectionPoint = args[ 0 ] 332 if len( connectionPoint.split( ":" ) ) != 2: 333 printStderr( "invalid connection point format. Expected host:port" ) 334 return 1 335 336 options.loops = int( options.loops ) 337 if options.loops <= 0: 338 printStderr( "Number of loops must be an integer > 0" ) 339 return 1 340 341 options.timeout = int( options.timeout ) 342 if options.timeout <= 0: 343 printStderr( "Communication timeout must be an integer > 0" ) 344 return 1 345 346 printVerbose( "Connection point: " + connectionPoint ) 347 printVerbose( "Service name: " + str( options.service ) ) 348 printVerbose( "Number of loops: " + str( options.loops ) ) 349 printVerbose( "Communication timeout: " + str( options.timeout ) ) 350 printVerbose( "With metadata: " + str( not options.no_db ) ) 351 except Exception, exc: 352 printStderr( "Error processing command line arguments: " + str( exc ) ) 353 return 1 354 355 356 # First stage - connection 357 try: 358 parts = connectionPoint.split( ":" ) 359 nst = NetStorage( parts[ 0 ], int( parts[ 1 ] ) ) 360 nst.connect( options.timeout ) 361 except socket.timeout, exc: 362 printStderr( "Error connecting to server: socket timeout" ) 363 return 2 364 except Exception, exc: 365 printStderr( "Error connecting to server: " + str( exc ) ) 366 return 2 367 except: 368 printStderr( "Unknown check script error at the connecting stage" ) 369 return 2 370 371 count = 0 372 while options.loops > 0: 373 count += 1 374 printVerbose( "Loop #" + str( count ) ) 375 if options.no_db: 376 withoutMetadata( nst, options.service ) 377 else: 378 withMetadata( nst, options.service ) 379 options.loops -= 1 380 381 return 0 382 383 384 385def withMetadata( nst, service ): 386 387 # Second stage - using metadata 388 objectLoc = None 389 try: 390 nst.sendHello( service, 'required' ) 391 objectLoc = writeReadObject( nst ) 392 nst.setAttr( objectLoc, TEST_ATTR_NAME, TEST_ATTR_VALUE ) 393 response = nst.getAttr( objectLoc, TEST_ATTR_NAME ) 394 if "AttrValue" not in response: 395 raise NSTResponseError( "Cannot find attribute value " 396 "in GETATTR response" ) 397 if response[ "AttrValue" ] != TEST_ATTR_VALUE: 398 raise NSTAttrValueError( "Read attribute value does not " 399 "match written" ) 400 nst.delete( objectLoc ) 401 except socket.timeout, exc: 402 raise Exception( "Error communicating to server (with metadata): socket timeout" ) 403 404 except NSTProtocolError, exc: 405 safeDelete( nst, objectLoc ) 406 raise Exception( "NetStorage protocol error (with metadata): " + str( exc ) ) 407 except NSTResponseError, exc: 408 safeDelete( nst, objectLoc ) 409 raise Exception("NetStorage response error (with metadata): " + str( exc ) ) 410 except NSTObjectContentError, exc: 411 safeDelete( nst, objectLoc ) 412 raise Exception( "NetStorage object read/write error (with metadata): " + str( exc ) ) 413 except NSTAttrValueError, exc: 414 safeDelete( nst, objectLoc ) 415 raise Exception( "NetStorage attribute read/write error (with metadata): " + str( exc ) ) 416 except Exception, exc: 417 safeDelete( nst, objectLoc ) 418 raise Exception( "Object life cycle error (with metadata): " + str( exc ) ) 419 except: 420 safeDelete( nst, objectLoc ) 421 raise Exception( "Unknown object life cycle error (with metadata)" ) 422 return 0 423 424 425def withoutMetadata( nst, service ): 426 427 # Here: there was a problem in a meta data involving cycle 428 # try without metadata 429 objectLoc = None 430 try: 431 nst.sendHello( service, 'disabled' ) 432 objectLoc = writeReadObject( nst ) 433 nst.delete( objectLoc ) 434 except socket.timeout, exc: 435 raise Exception( "Error communicating to server (without metadata): socket timeout" ) 436 except NSTProtocolError, exc: 437 safeDelete( nst, objectLoc ) 438 raise Exception( "NetStorage protocol error (without metadata): " + str( exc ) ) 439 except NSTResponseError, exc: 440 safeDelete( nst, objectLoc ) 441 raise Exception( "NetStorage response error (without metadata): " + str( exc ) ) 442 except NSTObjectContentError, exc: 443 safeDelete( nst, objectLoc ) 444 raise Exception( "NetStorage object read/write error (without metadata): " + str( exc ) ) 445 except Exception, exc: 446 safeDelete( nst, objectLoc ) 447 raise Exception( "NetStorage object life cycle error (without metadata): " + str( exc ) ) 448 except: 449 safeDelete( nst, objectLoc ) 450 raise Exception( "Unknown NetStorage object life cycle error (without metadata)" ) 451 return 0 452 453 454# The script execution entry point 455if __name__ == "__main__": 456 printVerbose( "---------- Start ----------" ) 457 try: 458 returnValue = main() 459 except KeyboardInterrupt: 460 # Ctrl+C 461 printStderr( "Ctrl + C received" ) 462 returnValue = 4 463 except Exception, excpt: 464 printStderr( str( excpt ) ) 465 returnValue = 5 466 except: 467 printStderr( "Generic unknown script error" ) 468 returnValue = 6 469 470 printVerbose( "Return code: " + str( returnValue ) ) 471 printVerbose( "---------- Finish ---------" ) 472 sys.exit( returnValue ) 473