1#!/opt/python-2.7/bin/python
2#
3# Authors: Sergey Satskiy
4#
5# $Id: ut.py 482916 2015-10-27 14:49:14Z satskyse $
6#
7
8"""
9A very basic NetStorage server test
10"""
11
12import sys, datetime, socket, os
13from optparse import OptionParser
14from ncbi_grid_1_1.ncbi import json_over_uttp, uttp
15import pprint
16import random
17
18
19CLIENT_NAME = "NST Basic UT"
20APPLICATION = os.path.basename( os.path.realpath( sys.argv[ 0 ] ) )
21TEST_OBJ_CONTENT = "NST Basic UT data"
22TEST_ATTR_NAME = "basic_ut_attr_name"
23TEST_ATTR_VALUE = "basic_ut_attr_value"
24
25VERBOSE = False
26
27
28def generateSessionID():
29    " Generates the session ID in an appropriate format "
30    # It sould be like 1111111111111111_0000SID
31    return str( random.randint( 1111111111111111,
32                                9999999999999999 ) ) + "_0000SID"
33
34SESSIONID = generateSessionID()
35NCBI_PHID = 'NST_BASIC_UT_PHID'
36
37try:
38    hostIP = socket.gethostbyname( socket.gethostname() )
39except:
40    hostIP = "127.0.0.1"
41
42
43
44class NSTProtocolError( Exception ):
45    " NetStorage response does not fit the protocol "
46    def __init__( self, message ):
47        Exception.__init__( self, message )
48        return
49
50class NSTResponseError( Exception ):
51    " NetStorage response has an error in it "
52    def __init__( self, message ):
53        Exception.__init__( self, message )
54        return
55
56class NSTObjectContentError( Exception ):
57    " NetStorage read object content does not match written "
58    def __init__( self, message ):
59        Exception.__init__( self, message )
60        return
61
62class NSTAttrValueError( Exception ):
63    " NetStorage read attribute value does not match written "
64    def __init__( self, message ):
65        Exception.__init__( self, message )
66        return
67
68
69def printVerbose( msg ):
70    " Prints stdout message conditionally "
71    if VERBOSE:
72        timestamp = datetime.datetime.now().strftime( '%m-%d-%y %H:%M:%S' )
73        print timestamp + " " + msg
74    return
75
76def printStderr( msg ):
77    " Prints onto stderr with a prefix "
78    timestamp = datetime.datetime.now().strftime( '%m-%d-%y %H:%M:%S' )
79    print >> sys.stderr, timestamp + " NetStorage check script. " + msg
80    printVerbose( msg )
81    return
82
83
84
85class NetStorage:
86    " Implements communications with a NetStorage server "
87
88    def __init__( self, host_, port_ ):
89        self.__commandSN = 0
90        self.__sock = None
91        self.__host = host_
92        self.__port = port_
93        self.__nst = None
94        return
95
96    def connect( self, timeout ):
97        " Establishes a connection to the server "
98        socket.socket.write = socket.socket.send
99        socket.socket.flush = lambda ignore: ignore
100
101        self.__sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
102        self.__sock.settimeout( timeout )
103        self.__sock.connect( ( self.__host, self.__port ) )
104
105        self.__nst = json_over_uttp.MessageExchange( self.__sock, self.__sock )
106        return
107
108    def exchange( self, message ):
109        " Does the basic exchange "
110        self.__commandSN += 1
111
112        message[ "SN" ] = self.__commandSN
113        self.printMessage( "Message to server", message )
114        response = self.__nst.exchange( message )
115        self.printMessage( "Message from server", response )
116        self.checkResponse( response )
117        return response
118
119    def receive( self ):
120        " Receives a single server message "
121        response = self.__nst.receive()
122        self.printMessage( "Message from server", response )
123        return response
124
125    @staticmethod
126    def printMessage( prefix, msg ):
127        " Prints the message "
128        if VERBOSE:
129            print prefix + ":"
130            prettyPrinter = pprint.PrettyPrinter( indent = 4 )
131            prettyPrinter.pprint( msg )
132        return
133
134    def sendHello( self, service, metadataOption ):
135        " Sends the HELLO message "
136        message = { 'Type':         'HELLO',
137                    'SessionID':    SESSIONID,
138                    'ncbi_phid':    NCBI_PHID,
139                    'ClientIP':     hostIP,
140                    'Application':  APPLICATION,
141                    'Ticket':       'None',
142                    'Metadata':     metadataOption,
143                    'Client':       CLIENT_NAME }
144        if service:
145            message[ "Service" ] = service
146        response = self.exchange( message )
147        return response
148
149    def createInNetCache( self ):
150        " Creates an object in NetCache "
151        # Flags for NetCache
152        storageFlags = { "Fast": True, "Movable": True }
153        message = { 'Type':         'CREATE',
154                    'SessionID':    SESSIONID,
155                    'ncbi_phid':    NCBI_PHID,
156                    'ClientIP':     hostIP,
157                    'StorageFlags': storageFlags }
158
159        response = self.exchange( message )
160        return response
161
162    def writeTestObj( self ):
163        " Writes test object content "
164        uttp_writer = self.__nst.get_uttp_writer()
165        data = uttp_writer.send_chunk( TEST_OBJ_CONTENT )
166        if data:
167            self.__sock.send(data)
168        data = uttp_writer.send_control_symbol('\n')
169        if data:
170            self.__sock.send(data)
171        data = uttp_writer.flush_buf()
172        if data:
173            self.__sock.send(data)
174
175        response = self.receive()
176        self.checkResponse( response )
177        return response
178
179    def setAttr( self, objectLoc, attrName, attrValue ):
180        " Sends SETATTR message "
181        message = { 'Type':         'SETATTR',
182                    'SessionID':    SESSIONID,
183                    'ncbi_phid':    NCBI_PHID,
184                    'ClientIP':     hostIP,
185                    'ObjectLoc':    objectLoc,
186                    'AttrName':     attrName,
187                    'AttrValue':    attrValue }
188
189        response = self.exchange( message )
190        return response
191
192    def getAttr( self, objectLoc, attrName ):
193        " Sends GETATTR message "
194        message = { 'Type':         'GETATTR',
195                    'SessionID':    SESSIONID,
196                    'ncbi_phid':    NCBI_PHID,
197                    'ClientIP':     hostIP,
198                    'ObjectLoc':    objectLoc,
199                    'AttrName':     attrName }
200
201        response = self.exchange( message )
202        return response
203
204    def readPrologue( self, objectLoc ):
205        " Reads the given object "
206        message = { 'Type':         'READ',
207                    'SessionID':    SESSIONID,
208                    'ncbi_phid':    NCBI_PHID,
209                    'ClientIP':     hostIP,
210                    'ObjectLoc':    objectLoc }
211
212        response = self.exchange( message )
213        return response
214
215    def readTestObj( self ):
216        " Reads the test object "
217        content = ""
218        uttp_reader = self.__nst.get_uttp_reader()
219
220        while True:
221            buf = self.__sock.recv( 1024 )
222            uttp_reader.set_new_buf(buf)
223            while True:
224                event = uttp_reader.next_event()
225                if event == uttp.Reader.END_OF_BUFFER:
226                    break
227
228                if event == uttp.Reader.CHUNK_PART or \
229                        event == uttp.Reader.CHUNK:
230                    content += uttp_reader.get_chunk()
231                elif event == uttp.Reader.CONTROL_SYMBOL:
232                    if uttp_reader.get_control_symbol() != '\n':
233                        raise NSTProtocolError( "Invalid data stream terminator" )
234                    response = self.receive()
235                    self.checkResponse( response )
236                    return response, content
237                else:
238                    raise NSTProtocolError( "Unexpected UTTP packet type" )
239
240    def delete( self, objectLoc ):
241        " Deletes the given object "
242        message = { 'Type':         'DELETE',
243                    'SessionID':    SESSIONID,
244                    'ncbi_phid':    NCBI_PHID,
245                    'ClientIP':     hostIP,
246                    'ObjectLoc':    objectLoc }
247        response = self.exchange( message )
248        return response
249
250    @staticmethod
251    def checkResponse( response ):
252        " Checks the server response message and throws an exception if errors "
253        if "Status" not in response:
254            raise NSTResponseError( "Server response does not have 'Status'" )
255        if response[ "Status" ] != "OK":
256            if "Errors" in response:
257                # It is a list of errors
258                errors = []
259                for item in response[ "Errors" ]:
260                    code = "???"
261                    msg = "???"
262                    if "Code" in item:
263                        code = str( item[ "Code" ] )
264                    if "Message" in item:
265                        msg = str( item[ "Message" ] )
266                    errors.append( "Code: " + code + " Message: " + msg )
267                raise NSTResponseError( "\n".join( errors ) )
268            else:
269                raise NSTResponseError( "Unknown server error response" )
270
271
272
273def writeReadObject( nst ):
274    " Creates, writes, reads an object and compares the content "
275    response = nst.createInNetCache()
276    if "ObjectLoc" not in response:
277        raise NSTResponseError( "Cannot find object "
278                                "locator in CREATE response" )
279    objectLoc = response[ "ObjectLoc" ]
280    nst.writeTestObj()
281    nst.readPrologue( objectLoc )
282    response, content = nst.readTestObj()
283    if content != TEST_OBJ_CONTENT:
284        raise NSTObjectContentError( "Read object content does not "
285                                     "match written" )
286    return objectLoc
287
288
289def safeDelete( nst, objectLoc ):
290    " Suppress exceptions while deleting an object "
291    if nst is None:
292        return
293    if objectLoc is None:
294        return
295    try:
296        nst.delete( objectLoc )
297    except:
298        pass
299
300
301def main():
302    " main function for the netstorage health check "
303
304    try:
305        parser = OptionParser(
306        """
307        %prog  <connection point>
308        """ )
309        parser.add_option( "-v", "--verbose",
310                           action="store_true", dest="verbose", default=False,
311                           help="be verbose (default: False)" )
312        parser.add_option( "--service", dest="service", default=None,
313                           help="service name for the HELLO message" )
314        parser.add_option( "--loops", dest="loops", default=1,
315                           help="number of loops" )
316        parser.add_option( "--timeout", dest="timeout", default=5,
317                           help="communication timeout" )
318        parser.add_option( "--no-db", action="store_true", dest="no_db", default=False,
319                           help="without metadata (default: with metadata)" )
320
321        # parse the command line options
322        options, args = parser.parse_args()
323        global VERBOSE
324        VERBOSE = options.verbose
325
326        if len( args ) != 1:
327            printStderr( "Incorrect number of arguments" )
328            return 1
329
330        # These arguments are always available
331        connectionPoint = args[ 0 ]
332        if len( connectionPoint.split( ":" ) ) != 2:
333            printStderr( "invalid connection point format. Expected host:port" )
334            return 1
335
336        options.loops = int( options.loops )
337        if options.loops <= 0:
338            printStderr( "Number of loops must be an integer > 0" )
339            return 1
340
341        options.timeout = int( options.timeout )
342        if options.timeout <= 0:
343            printStderr( "Communication timeout must be an integer > 0" )
344            return 1
345
346        printVerbose( "Connection point: " + connectionPoint )
347        printVerbose( "Service name: " + str( options.service ) )
348        printVerbose( "Number of loops: " + str( options.loops ) )
349        printVerbose( "Communication timeout: " + str( options.timeout ) )
350        printVerbose( "With metadata: " + str( not options.no_db ) )
351    except Exception, exc:
352        printStderr( "Error processing command line arguments: " + str( exc ) )
353        return 1
354
355
356    # First stage - connection
357    try:
358        parts = connectionPoint.split( ":" )
359        nst = NetStorage( parts[ 0 ], int( parts[ 1 ] ) )
360        nst.connect( options.timeout )
361    except socket.timeout, exc:
362        printStderr( "Error connecting to server: socket timeout" )
363        return 2
364    except Exception, exc:
365        printStderr( "Error connecting to server: " + str( exc ) )
366        return 2
367    except:
368        printStderr( "Unknown check script error at the connecting stage" )
369        return 2
370
371    count = 0
372    while options.loops > 0:
373        count += 1
374        printVerbose( "Loop #" + str( count ) )
375        if options.no_db:
376            withoutMetadata( nst, options.service )
377        else:
378            withMetadata( nst, options.service )
379        options.loops -= 1
380
381    return 0
382
383
384
385def withMetadata( nst, service ):
386
387    # Second stage - using metadata
388    objectLoc = None
389    try:
390        nst.sendHello( service, 'required' )
391        objectLoc = writeReadObject( nst )
392        nst.setAttr( objectLoc, TEST_ATTR_NAME, TEST_ATTR_VALUE )
393        response = nst.getAttr( objectLoc, TEST_ATTR_NAME )
394        if "AttrValue" not in response:
395            raise NSTResponseError( "Cannot find attribute value "
396                                    "in GETATTR response" )
397        if response[ "AttrValue" ] != TEST_ATTR_VALUE:
398            raise NSTAttrValueError( "Read attribute value does not "
399                                     "match written" )
400        nst.delete( objectLoc )
401    except socket.timeout, exc:
402        raise Exception( "Error communicating to server (with metadata): socket timeout" )
403
404    except NSTProtocolError, exc:
405        safeDelete( nst, objectLoc )
406        raise Exception( "NetStorage protocol error (with metadata): " + str( exc ) )
407    except NSTResponseError, exc:
408        safeDelete( nst, objectLoc )
409        raise Exception("NetStorage response error (with metadata): " + str( exc ) )
410    except NSTObjectContentError, exc:
411        safeDelete( nst, objectLoc )
412        raise Exception( "NetStorage object read/write error (with metadata): " + str( exc ) )
413    except NSTAttrValueError, exc:
414        safeDelete( nst, objectLoc )
415        raise Exception( "NetStorage attribute read/write error (with metadata): " + str( exc ) )
416    except Exception, exc:
417        safeDelete( nst, objectLoc )
418        raise Exception( "Object life cycle error (with metadata): " + str( exc ) )
419    except:
420        safeDelete( nst, objectLoc )
421        raise Exception( "Unknown object life cycle error (with metadata)" )
422    return 0
423
424
425def withoutMetadata( nst, service ):
426
427    # Here: there was a problem in a meta data involving cycle
428    #       try without metadata
429    objectLoc = None
430    try:
431        nst.sendHello( service, 'disabled' )
432        objectLoc = writeReadObject( nst )
433        nst.delete( objectLoc )
434    except socket.timeout, exc:
435        raise Exception( "Error communicating to server (without metadata): socket timeout" )
436    except NSTProtocolError, exc:
437        safeDelete( nst, objectLoc )
438        raise Exception( "NetStorage protocol error (without metadata): " + str( exc ) )
439    except NSTResponseError, exc:
440        safeDelete( nst, objectLoc )
441        raise Exception( "NetStorage response error (without metadata): " + str( exc ) )
442    except NSTObjectContentError, exc:
443        safeDelete( nst, objectLoc )
444        raise Exception( "NetStorage object read/write error (without metadata): " + str( exc ) )
445    except Exception, exc:
446        safeDelete( nst, objectLoc )
447        raise Exception( "NetStorage object life cycle error (without metadata): " + str( exc ) )
448    except:
449        safeDelete( nst, objectLoc )
450        raise Exception( "Unknown NetStorage object life cycle error (without metadata)" )
451    return 0
452
453
454# The script execution entry point
455if __name__ == "__main__":
456    printVerbose( "---------- Start ----------" )
457    try:
458        returnValue = main()
459    except KeyboardInterrupt:
460        # Ctrl+C
461        printStderr( "Ctrl + C received" )
462        returnValue = 4
463    except Exception, excpt:
464        printStderr( str( excpt ) )
465        returnValue = 5
466    except:
467        printStderr( "Generic unknown script error" )
468        returnValue = 6
469
470    printVerbose( "Return code: " + str( returnValue ) )
471    printVerbose( "---------- Finish ---------" )
472    sys.exit( returnValue )
473