1 /*****************************************************************************/ 2 /* Software Testing Automation Framework (STAF) */ 3 /* (C) Copyright IBM Corp. 2002, 2004, 2005 */ 4 /* */ 5 /* This software is licensed under the Eclipse Public License (EPL) V1.0. */ 6 /*****************************************************************************/ 7 8 package com.ibm.staf.service.stax; 9 import com.ibm.staf.*; 10 import com.ibm.staf.wrapper.STAFLog; 11 import java.util.Map; 12 import java.util.HashMap; 13 import java.util.LinkedHashMap; 14 import java.util.TreeMap; 15 import java.util.TreeSet; 16 import java.util.LinkedHashSet; 17 import java.util.LinkedList; 18 import java.util.ArrayList; 19 import java.util.Iterator; 20 import java.util.List; 21 import java.io.File; 22 import java.io.FileWriter; 23 import java.io.IOException; 24 import java.io.PrintWriter; 25 import java.io.StringWriter; 26 27 import java.util.Vector; 28 import java.util.StringTokenizer; 29 import java.util.Date; 30 import java.util.Set; 31 32 import org.python.core.Py; 33 34 import org.python.core.PyObject; 35 import org.python.core.PyCode; 36 37 /* Jython 2.1: 38 import org.python.core.__builtin__; 39 */ 40 // Jython 2.5: 41 import org.python.core.CompileMode; 42 43 /** 44 * The representation of a STAX job. A STAX job is created by the STAXParser 45 * when the STAX service receives an EXECUTE request specifying a file or string 46 * containing XML that defines a STAX job. 47 */ 48 public class STAXJob implements STAXThreadCompleteListener, 49 STAXSTAFQueueListener 50 { 51 // For debugging - Counts and prints the number of cache gets/adds 52 static final boolean COUNT_PYCODE_CACHES = false; 53 54 static final String STAX_JOB_EVENT = new String("Job"); 55 56 // Logfile types: 57 58 /** 59 * Indicates to log to the STAX Service Log file 60 */ 61 static final int SERVICE_LOG = 1; 62 63 /** 64 * Indicates to log to the STAX Job Log file 65 */ 66 public static final int JOB_LOG = 2; 67 68 /** 69 * Indicates to log to the STAX Job User Log file 70 */ 71 public static final int USER_JOB_LOG = 3; 72 73 /** 74 * Indicates to log to the JVM Log file 75 */ 76 public static final int JVM_LOG = 4; 77 78 public static final int NO_NOTIFY_ONEND = 0; 79 public static final int NOTIFY_ONEND_BY_HANDLE = 1; 80 public static final int NOTIFY_ONEND_BY_NAME = 2; 81 82 /** 83 * Job completion status codes and their string values 84 */ 85 public static final int ABNORMAL_STATUS = -1; 86 public static final int NORMAL_STATUS = 0; 87 public static final int TERMINATED_STATUS = 1; 88 public static final int UNKNOWN_STATUS = 2; 89 90 public static final String ABNORMAL_STATUS_STRING = "Abnormal"; 91 public static final String NORMAL_STATUS_STRING = "Normal"; 92 public static final String TERMINATED_STATUS_STRING = "Terminated"; 93 public static final String UNKNOWN_STATUS_STRING = "Unknown"; 94 95 /** 96 * Job state codes and their string values 97 */ 98 public static final int PENDING_STATE = 0; 99 public static final int RUNNING_STATE = 1; 100 public static final String PENDING_STATE_STRING = "Pending"; 101 public static final String RUNNING_STATE_STRING = "Running"; 102 103 static final int BREAKPOINT_FUNCTION = 0; 104 static final int BREAKPOINT_LINE = 1; 105 106 /** 107 * Creates a new STAXJob instance passing in a STAX object which 108 * represents the STAX service that is executing this job. 109 */ STAXJob(STAX staxService)110 public STAXJob(STAX staxService) { 111 this(staxService, new STAXDocument()); 112 } 113 114 /** 115 * Creates a new STAXJob instance using an existing STAX 116 * document. 117 * 118 * @param staxService the STAX service. 119 * @param document the STAX document to be executed by 120 * this job. 121 */ STAXJob(STAX staxService, STAXDocument document)122 public STAXJob(STAX staxService, STAXDocument document) { 123 fSTAX = staxService; 124 fDocument = document; 125 STAXThread thread = new STAXThread(this); 126 thread.addCompletionNotifiee(this); 127 fThreadMap.put(thread.getThreadNumberAsInteger(), thread); 128 fClearlogs = fSTAX.getClearlogs(); 129 fLogTCElapsedTime = fSTAX.getLogTCElapsedTime(); 130 fLogTCNumStarts = fSTAX.getLogTCNumStarts(); 131 fLogTCStartStop = fSTAX.getLogTCStartStop(); 132 fPythonOutput = fSTAX.getPythonOutput(); 133 fPythonLogLevel = fSTAX.getPythonLogLevel(); 134 fMaxSTAXThreads = fSTAX.getMaxSTAXThreads(); 135 } 136 137 /** 138 * Gets the STAX object which represents the job's STAX service 139 * @return an instance of the job's STAX service 140 */ getSTAX()141 public STAX getSTAX() { return fSTAX; } 142 143 /** 144 * Gets the STAX document that is executed by this job. 145 * 146 * @return the job's STAX document. 147 */ getSTAXDocument()148 public STAXDocument getSTAXDocument() { return fDocument; } 149 150 /** 151 * Sets the STAX document to be executed by this job 152 * @param document the STAX document to be executed by this job 153 */ setSTAXDocument(STAXDocument document)154 public void setSTAXDocument(STAXDocument document) 155 { 156 fDocument = document; 157 } 158 159 /** 160 * Gets the next number for a thread in a job 161 * @return a number for the next thread in a job 162 */ getNextThreadNumber()163 public int getNextThreadNumber() 164 { 165 synchronized (fNextThreadNumberSynch) 166 { 167 return fNextThreadNumber++; 168 } 169 } 170 171 /** 172 * Gets the next number for a breakpoint in a job 173 * @return a number for the next breakpoint in a job 174 */ getNextBreakpointNumber()175 public int getNextBreakpointNumber() 176 { 177 synchronized (fNextBreakpointNumberSynch) 178 { 179 return fNextBreakpointNumber++; 180 } 181 } 182 setDefaultCallAction(STAXCallAction action)183 public void setDefaultCallAction(STAXCallAction action) 184 { 185 fDefaultCallAction = action; 186 } 187 188 /** 189 * Gets the name of the function that should be called to start the 190 * execution of a job 191 * @return the name of the starting function for a job 192 */ getStartFunction()193 public String getStartFunction() { return fDocument.getStartFunction(); } 194 195 /** 196 * Sets the name of the function that should be called to start the 197 * execution of a job 198 * @param startFunction the name of the starting function for a job 199 */ setStartFunction(String startFunction)200 public void setStartFunction(String startFunction) 201 { fDocument.setStartFunction(startFunction); } 202 getStartFuncArgs()203 public String getStartFuncArgs() { return fDocument.getStartFunctionArgs(); } 204 setStartFuncArgs(String startFuncArgs)205 public void setStartFuncArgs(String startFuncArgs) 206 { fDocument.setStartFuncArgs(startFuncArgs); } 207 setExecuteAndHold()208 public void setExecuteAndHold() 209 { fExecuteAndHold = true; } 210 211 // Sets the value of the FUNCTION option on a STAX EXECUTE request setStartFunctionOverride(String startFunction)212 public void setStartFunctionOverride(String startFunction) 213 { fStartFunction = startFunction; } 214 getStartFunctionOverride()215 public String getStartFunctionOverride() { return fStartFunction; } 216 217 // Sets the value of the ARGS option on a STAX EXECUTE request setStartFuncArgsOverride(String startFuncArgs)218 public void setStartFuncArgsOverride(String startFuncArgs) 219 { fStartFuncArgs = startFuncArgs; } 220 getStartFuncArgsOverride()221 public String getStartFuncArgsOverride() { return fStartFuncArgs; } 222 getJobName()223 public String getJobName() { return fJobName; } 224 setJobName(String jobName)225 public void setJobName(String jobName) { fJobName = jobName; } 226 getJobNumber()227 public int getJobNumber() { return fJobNumber; } 228 setJobNumber(int jobNumber)229 public void setJobNumber(int jobNumber) { fJobNumber = jobNumber; } 230 getNextProcNumber()231 public int getNextProcNumber() 232 { 233 synchronized (fNextProcNumberSynch) 234 { 235 return fProcNumber++; 236 } 237 } 238 getNextCmdNumber()239 public int getNextCmdNumber() 240 { 241 synchronized (fNextCmdNumberSynch) 242 { 243 return fCmdNumber++; 244 } 245 } 246 getNextProcessKey()247 public int getNextProcessKey() 248 { 249 synchronized (fNextProcessKeySynch) 250 { 251 return fProcessKey++; 252 } 253 } getJobDataDir()254 public String getJobDataDir() { return fJobDataDir; } 255 setJobDataDir(String jobDataDir)256 public void setJobDataDir(String jobDataDir) { fJobDataDir = jobDataDir; } 257 getXmlMachine()258 public String getXmlMachine() { return fXmlMachine; } 259 setXmlMachine(String machName)260 public void setXmlMachine(String machName) 261 { fXmlMachine = machName; } 262 getXmlFile()263 public String getXmlFile() { return fXmlFile; } 264 setXmlFile(String fileName)265 public void setXmlFile(String fileName) { fXmlFile = fileName; } 266 getScripts()267 public List getScripts() { return fScripts; } 268 setScript(String script)269 public void setScript(String script) { fScripts.add(script); } 270 getScriptFiles()271 public List getScriptFiles() { return fScriptFiles; } 272 setScriptFile(String fileName)273 public void setScriptFile(String fileName) { fScriptFiles.add(fileName); } 274 getScriptFileMachine()275 public String getScriptFileMachine() { return fScriptFileMachine; } 276 setScriptFileMachine(String machName)277 public void setScriptFileMachine(String machName) 278 { fScriptFileMachine = machName; } 279 getSourceMachine()280 public String getSourceMachine() { return fSourceMachine; } 281 setSourceMachine(String machName)282 public void setSourceMachine(String machName) 283 { fSourceMachine = machName; } 284 getSourceHandleName()285 public String getSourceHandleName() { return fSourceHandleName; } 286 setSourceHandleName(String handleName)287 public void setSourceHandleName(String handleName) 288 { fSourceHandleName = handleName; } 289 getSourceHandle()290 public int getSourceHandle() { return fSourceHandle; } 291 setSourceHandle(int handle)292 public void setSourceHandle(int handle) 293 { fSourceHandle = handle; } 294 getClearlogs()295 public boolean getClearlogs() { return fClearlogs; } 296 getClearLogsAsString()297 public String getClearLogsAsString() 298 { 299 if (fClearlogs) 300 return "Enabled"; 301 else 302 return "Disabled"; 303 } 304 setClearlogs(boolean clearlogs)305 public void setClearlogs(boolean clearlogs) 306 { fClearlogs = clearlogs; } 307 getMaxSTAXThreads()308 public int getMaxSTAXThreads() { return fMaxSTAXThreads; } 309 getWaitTimeout()310 public String getWaitTimeout() { return fWaitTimeout; } 311 setWaitTimeout(String timeout)312 public void setWaitTimeout(String timeout) 313 { fWaitTimeout = timeout; } 314 getNotifyOnEnd()315 public int getNotifyOnEnd() { return fNotifyOnEnd; } 316 getNotifyOnEndAsString()317 public String getNotifyOnEndAsString() 318 { 319 if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_NAME) 320 return "By Name"; 321 else if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_HANDLE) 322 return "By Handle"; 323 else 324 return "No"; 325 } 326 setNotifyOnEnd(int notifyFlag)327 public void setNotifyOnEnd(int notifyFlag) { fNotifyOnEnd = notifyFlag; } 328 getState()329 public int getState() { return fState; } 330 getStateAsString()331 public String getStateAsString() 332 { 333 if (fState == PENDING_STATE) 334 return PENDING_STATE_STRING; 335 else 336 return RUNNING_STATE_STRING; 337 } 338 setState(int state)339 public void setState(int state) { fState = state; } 340 getResult()341 public PyObject getResult() { return fResult; } 342 setResult(PyObject result)343 public void setResult(PyObject result) 344 { fResult = result; } 345 getCompletionStatus()346 public int getCompletionStatus() { return fCompletionStatus; } 347 getCompletionStatusAsString()348 public String getCompletionStatusAsString() 349 { 350 if (fCompletionStatus == STAXJob.NORMAL_STATUS) 351 return STAXJob.NORMAL_STATUS_STRING; 352 else if (fCompletionStatus == STAXJob.TERMINATED_STATUS) 353 return STAXJob.TERMINATED_STATUS_STRING; 354 else if (fCompletionStatus == STAXJob.ABNORMAL_STATUS) 355 return STAXJob.ABNORMAL_STATUS_STRING; 356 else 357 return STAXJob.UNKNOWN_STATUS_STRING; 358 } 359 setCompletionStatus(int status)360 public void setCompletionStatus(int status) 361 { 362 fCompletionStatus = status; 363 } 364 getJobLogErrorsMC()365 public STAFMarshallingContext getJobLogErrorsMC() 366 { 367 return fJobLogErrorsMC; 368 } 369 getTestcaseList()370 public List getTestcaseList() { return fTestcaseList; } 371 setTestcaseList(List testcaseList)372 public void setTestcaseList(List testcaseList) 373 { fTestcaseList = testcaseList; } 374 getTestcaseTotalsMap()375 public Map getTestcaseTotalsMap() { return fTestcaseTotalsMap; } 376 setTestcaseTotalsMap(Map testcaseTotalsMap)377 public void setTestcaseTotalsMap(Map testcaseTotalsMap) 378 { fTestcaseTotalsMap = testcaseTotalsMap; } 379 getStartTimestamp()380 public STAXTimestamp getStartTimestamp() { return fStartTimestamp; } 381 setStartTimestamp()382 public void setStartTimestamp() 383 { 384 // Get the current date and time and set as the starting date/time 385 fStartTimestamp = new STAXTimestamp(); 386 } 387 getEndTimestamp()388 public STAXTimestamp getEndTimestamp() { return fEndTimestamp; } 389 getJobNumberAsInteger()390 public Integer getJobNumberAsInteger() { return new Integer(fJobNumber); } 391 getSTAFHandle()392 public STAFHandle getSTAFHandle() { return fHandle; } 393 setSTAFHandle()394 public void setSTAFHandle() throws STAFException 395 { 396 fHandle = new STAFHandle("STAX/Job/" + fJobNumber); 397 } 398 getLogTCElapsedTime()399 public boolean getLogTCElapsedTime() { return fLogTCElapsedTime; } 400 getLogTCElapsedTimeAsString()401 public String getLogTCElapsedTimeAsString() 402 { 403 if (fLogTCElapsedTime) 404 return "Enabled"; 405 else 406 return "Disabled"; 407 } 408 setLogTCElapsedTime(boolean logTCElapsedTime)409 public void setLogTCElapsedTime(boolean logTCElapsedTime) 410 { fLogTCElapsedTime = logTCElapsedTime; } 411 getLogTCNumStarts()412 public boolean getLogTCNumStarts() { return fLogTCNumStarts; } 413 getLogTCNumStartsAsString()414 public String getLogTCNumStartsAsString() 415 { 416 if (fLogTCNumStarts) 417 return "Enabled"; 418 else 419 return "Disabled"; 420 } 421 setLogTCNumStarts(boolean logTCNumStarts)422 public void setLogTCNumStarts(boolean logTCNumStarts) 423 { fLogTCNumStarts = logTCNumStarts; } 424 getLogTCStartStop()425 public boolean getLogTCStartStop() { return fLogTCStartStop; } 426 getLogTCStartStopAsString()427 public String getLogTCStartStopAsString() 428 { 429 if (fLogTCStartStop) 430 return "Enabled"; 431 else 432 return "Disabled"; 433 } 434 setLogTCStartStop(boolean logTCStartStop)435 public void setLogTCStartStop(boolean logTCStartStop) 436 { fLogTCStartStop = logTCStartStop; } 437 getPythonOutput()438 public int getPythonOutput() { return fPythonOutput; } 439 setPythonOutput(int pythonOutput)440 public void setPythonOutput(int pythonOutput) 441 { 442 fPythonOutput = pythonOutput; 443 } 444 getPythonLogLevel()445 public String getPythonLogLevel() { return fPythonLogLevel; } 446 setPythonLogLevel(String logLevel)447 public void setPythonLogLevel(String logLevel) 448 { 449 fPythonLogLevel = logLevel; 450 } 451 getNumThreads()452 public int getNumThreads() 453 { 454 return fThreadMap.size(); 455 } 456 getThreadMapCopy()457 public Map getThreadMapCopy() 458 { 459 return fThreadMap; 460 } 461 getThread(Integer threadNumber)462 public STAXThread getThread(Integer threadNumber) 463 { 464 synchronized (fThreadMap) 465 { 466 return (STAXThread)fThreadMap.get(threadNumber); 467 } 468 } 469 addThread(STAXThread thread)470 public void addThread(STAXThread thread) 471 { 472 synchronized (fThreadMap) 473 { 474 fThreadMap.put(thread.getThreadNumberAsInteger(), thread); 475 } 476 } 477 addThreadIfDoesNotExceedMax(STAXThread thread)478 public void addThreadIfDoesNotExceedMax(STAXThread thread) 479 throws STAXExceedsMaxThreadsException 480 { 481 synchronized (fThreadMap) 482 { 483 if ((fMaxSTAXThreads != 0) && 484 (fThreadMap.size() >= fMaxSTAXThreads)) 485 { 486 throw new STAXExceedsMaxThreadsException( 487 "Exceeded MaxSTAXThreads=" + fMaxSTAXThreads + 488 " (the maximum number of STAX Threads that " + 489 "can be running simultaneously in a job)."); 490 } 491 492 fThreadMap.put(thread.getThreadNumberAsInteger(), thread); 493 } 494 } 495 removeThread(Integer threadNumber)496 public void removeThread(Integer threadNumber) 497 { 498 synchronized (fThreadMap) 499 { 500 fThreadMap.remove(threadNumber); 501 } 502 } 503 addFunction(STAXFunctionAction function)504 public void addFunction(STAXFunctionAction function) 505 { 506 fDocument.addFunction(function); 507 } 508 getFunction(String name)509 public STAXAction getFunction(String name) 510 { 511 return fDocument.getFunction(name); 512 } 513 getBreakpointFirstFunction()514 public boolean getBreakpointFirstFunction() 515 { 516 return fBreakpointFirstFunction; 517 } 518 setBreakpointFirstFunction(boolean breakpointFirstFunction)519 public void setBreakpointFirstFunction(boolean breakpointFirstFunction) 520 { 521 fBreakpointFirstFunction = breakpointFirstFunction; 522 } 523 getBreakpointSubjobFirstFunction()524 public boolean getBreakpointSubjobFirstFunction() 525 { 526 return fBreakpointSubjobFirstFunction; 527 } 528 setBreakpointSubjobFirstFunction( boolean breakpointSubjobFirstFunction)529 public void setBreakpointSubjobFirstFunction( 530 boolean breakpointSubjobFirstFunction) 531 { 532 fBreakpointSubjobFirstFunction = breakpointSubjobFirstFunction; 533 } 534 functionExists(String name)535 public boolean functionExists(String name) 536 { 537 return fDocument.functionExists(name); 538 } 539 addDefaultAction(STAXAction action)540 public void addDefaultAction(STAXAction action) 541 { 542 fDocument.addDefaultAction(action); 543 } 544 addCompletionNotifiee(STAXJobCompleteListener listener)545 public void addCompletionNotifiee(STAXJobCompleteListener listener) 546 { 547 synchronized (fCompletionNotifiees) 548 { 549 fCompletionNotifiees.addLast(listener); 550 } 551 } 552 addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee)553 public STAFResult addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee) 554 { 555 synchronized (fCompletionNotifiees) 556 { 557 // Check if the notifiee is already in the list 558 559 Iterator iter = fCompletionNotifiees.iterator(); 560 561 while (iter.hasNext()) 562 { 563 Object notifieeObj = iter.next(); 564 565 if (notifieeObj instanceof 566 com.ibm.staf.service.stax.STAXJobCompleteNotifiee) 567 { 568 STAXJobCompleteNotifiee aNotifiee = 569 (STAXJobCompleteNotifiee)notifieeObj; 570 571 if (aNotifiee.getMachine().equals(notifiee.getMachine()) && 572 aNotifiee.getHandle() == notifiee.getHandle() && 573 aNotifiee.getHandleName().equals(notifiee.getHandleName())) 574 { 575 return new STAFResult( 576 STAFResult.AlreadyExists, 577 "A notifiee is already registered for machine=" + 578 notifiee.getMachine() + 579 ", handle=" + notifiee.getHandle() + 580 ", handleName=" + notifiee.getHandleName()); 581 } 582 } 583 584 } 585 586 // Add to the end of the list 587 588 fCompletionNotifiees.addLast(notifiee); 589 } 590 591 return new STAFResult(STAFResult.Ok, ""); 592 } 593 removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee)594 public STAFResult removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee) 595 { 596 boolean found = false; 597 598 synchronized (fCompletionNotifiees) 599 { 600 // Check if notifiee exists. If so, remove the notifiee 601 602 Iterator iter = fCompletionNotifiees.iterator(); 603 604 while (iter.hasNext()) 605 { 606 Object notifieeObj = iter.next(); 607 608 if (notifieeObj instanceof 609 com.ibm.staf.service.stax.STAXJobCompleteNotifiee) 610 { 611 STAXJobCompleteNotifiee aNotifiee = 612 (STAXJobCompleteNotifiee)notifieeObj; 613 614 if (aNotifiee.getMachine().equals(notifiee.getMachine()) && 615 aNotifiee.getHandle() == notifiee.getHandle() && 616 aNotifiee.getHandleName().equals(notifiee.getHandleName())) 617 { 618 // Remove notifiee from list 619 620 fCompletionNotifiees.remove(aNotifiee); 621 found = true; 622 return new STAFResult(STAFResult.Ok, ""); 623 } 624 } 625 } 626 } 627 628 return new STAFResult( 629 STAFResult.DoesNotExist, 630 "No notifiee registered for machine=" + notifiee.getMachine() + 631 ", handle=" + notifiee.getHandle() + 632 ", handleName=" + notifiee.getHandleName()); 633 } 634 getCompletionNotifiees()635 public LinkedList getCompletionNotifiees() 636 { 637 synchronized(fCompletionNotifiees) 638 { 639 return new LinkedList(fCompletionNotifiees); 640 } 641 } 642 643 // Gets the compiled Jython code for the specified code string. 644 // Note that we cache compiled Jython code so that if it is used more 645 // than once, it eliminates the overhead of recompiling the code string 646 // each time it is executed. 647 // 648 // Input Arguments: 649 // - codeString: must be valid Python code; however, like ordinary 650 // Python code, the existence of variables will not be 651 // checked until the code is executed. 652 // - kind: is either 'exec' if the string is made up of 653 // statements, 'eval' if it is an expression. 654 getCompiledPyCode(String codeString, String kind)655 public PyCode getCompiledPyCode(String codeString, String kind) 656 { 657 synchronized(fCompiledPyCodeCache) 658 { 659 PyCode codeObject = (PyCode)fCompiledPyCodeCache.get(codeString); 660 661 if (codeObject == null) 662 { 663 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheAdds++; 664 665 if (kind.equals("eval")) 666 { 667 // Set to avoid error of setting code object to nothing 668 if (codeString.equals("")) codeString = "None"; 669 670 /* Jython 2.1: 671 codeObject = __builtin__.compile( 672 "STAXPyEvalResult = " + codeString, 673 "<pyEval string>", "exec"); 674 */ 675 // Jython 2.5: 676 codeObject = Py.compile_flags( 677 codeString, "<pyEval string>", 678 CompileMode.eval, Py.getCompilerFlags()); 679 } 680 else 681 { 682 /* Jython 2.1: 683 codeObject = __builtin__.compile( 684 codeString, "<pyExec string>", "exec"); 685 */ 686 // Jython 2.5: 687 codeObject = Py.compile_flags( 688 codeString, "<pyExec string>", 689 CompileMode.exec, Py.getCompilerFlags()); 690 } 691 692 fCompiledPyCodeCache.put(codeString, codeObject); 693 } 694 else 695 { 696 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheGets++; 697 } 698 699 return codeObject; 700 } 701 } 702 703 // Queue listener methods 704 registerSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)705 public void registerSTAFQueueListener(String msgType, 706 STAXSTAFQueueListener listener) 707 { 708 synchronized (fQueueListenerMap) 709 { 710 TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType); 711 712 if (listenerSet == null) 713 { 714 listenerSet = new TreeSet(new STAXObjectComparator()); 715 fQueueListenerMap.put(msgType, listenerSet); 716 } 717 718 listenerSet.add(listener); 719 } 720 } 721 unregisterSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)722 public void unregisterSTAFQueueListener(String msgType, 723 STAXSTAFQueueListener listener) 724 { 725 synchronized (fQueueListenerMap) 726 { 727 TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType); 728 729 if (listenerSet != null) 730 listenerSet.remove(listener); 731 } 732 } 733 734 // 735 // Data management functions 736 // 737 setData(String dataName, Object data)738 public boolean setData(String dataName, Object data) 739 { 740 // Return true if added successfully, else return false. 741 742 synchronized (fDataMap) 743 { 744 if (!fDataMap.containsKey(dataName)) 745 { 746 fDataMap.put(dataName, data); 747 return true; 748 } 749 } 750 751 return false; 752 } 753 getData(String dataName)754 public Object getData(String dataName) 755 { 756 synchronized (fDataMap) 757 { return fDataMap.get(dataName); } 758 } 759 removeData(String dataName)760 public boolean removeData(String dataName) 761 { 762 // Return true if removed successfully, else return false. 763 764 synchronized (fDataMap) 765 { 766 if (fDataMap.containsKey(dataName)) 767 { 768 fDataMap.remove(dataName); 769 return true; 770 } 771 } 772 773 return false; 774 } 775 776 // 777 // Execution methods 778 // 779 startExecution()780 public void startExecution() 781 throws STAFException, STAXException 782 { 783 // Check if the MAXRETURNFILESIZE setting is not 0 (no maximum size 784 // limit) and if so, set variable STAF/MaxReturnFileSize for the STAX 785 // job handle 786 787 if (fSTAX.getMaxReturnFileSize() != 0) 788 { 789 String request = "SET VAR " + STAFUtil.wrapData( 790 "STAF/MaxReturnFileSize=" + fSTAX.getMaxReturnFileSize()); 791 792 STAFResult result = fHandle.submit2("local", "VAR", request); 793 794 if (result.rc != STAFResult.Ok) 795 { 796 String msg = "The STAX service could not set the maximum " + 797 "file size variable for Job ID " + fJobNumber + 798 "\nSTAF local VAR " + request + " failed with RC=" + 799 result.rc + ", Result=" + result.result; 800 801 log(STAXJob.JOB_LOG, "error", msg); 802 } 803 } 804 805 fSTAFQueueMonitor = new STAFQueueMonitor(this); 806 fSTAFQueueMonitor.start(); 807 808 // Initialize data for the Job 809 810 fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this) 811 { 812 public void visit(Object o, Iterator iter) 813 { 814 STAXJobManagementHandler handler = (STAXJobManagementHandler)o; 815 816 handler.initJob((STAXJob)fData); 817 } 818 }); 819 820 // Register to listen for messages that STAF requests have completed 821 822 registerSTAFQueueListener("STAF/RequestComplete", this); 823 824 // Get the main thread, thread 1 825 826 STAXThread thread = (STAXThread)fThreadMap.get(new Integer(1)); 827 828 // Use a custom OutputStream to redirect the Python Interpreter's 829 // stdout and stderr to somewhere other than the JVM Log and/or 830 // to reformat the output (e.g. add a timestamp, job ID) when 831 // logging to the JVM Log. Use the log level specified for the 832 // job's python output. 833 834 thread.setPythonInterpreterStdout(new STAXPythonOutput(this)); 835 836 // Override the log level to use to always be "Error" for stderr 837 thread.setPythonInterpreterStderr(new STAXPythonOutput(this, "Error")); 838 839 // Get the starting function for the job 840 841 STAXAction action = (STAXAction)fDocument.getFunction( 842 fDocument.getStartFunction()); 843 844 if (action == null) 845 { 846 throw new STAXInvalidStartFunctionException( 847 "'" + fDocument.getStartFunction() + 848 "' is not a valid function name. No function with " + 849 "this name is defined."); 850 } 851 else 852 { 853 // Call the main function passing any arguments 854 855 String startFunctionUneval = "'" + fDocument.getStartFunction() + 856 "'"; 857 858 if (fDefaultCallAction != null) 859 { 860 STAXCallAction callAction = fDefaultCallAction; 861 callAction.setFunction(startFunctionUneval); 862 callAction.setArgs(fDocument.getStartFunctionArgs()); 863 action = callAction; 864 } 865 else 866 { 867 STAXCallAction callAction = new STAXCallAction( 868 startFunctionUneval, fDocument.getStartFunctionArgs()); 869 callAction.setLineNumber( 870 "<External>", "<Error in ARGS option>"); 871 callAction.setXmlFile(getXmlFile()); 872 callAction.setXmlMachine(getXmlMachine()); 873 action = callAction; 874 } 875 } 876 877 ArrayList actionList = new ArrayList(); 878 879 // If HOLD was specified on EXECUTE request, add hold action as 880 // the first action in the actionList. 881 if (fExecuteAndHold) 882 actionList.add(new STAXHoldAction("'main'")); 883 884 // Add additional Python variables about the Job 885 thread.pySetVar("STAXJobID", new Integer(fJobNumber)); 886 thread.pySetVar("STAXJobName", fJobName); 887 thread.pySetVar("STAXJobXMLFile", fXmlFile); 888 thread.pySetVar("STAXJobXMLMachine", fXmlMachine); 889 thread.pySetVar("STAXJobStartDate", fStartTimestamp.getDateString()); 890 thread.pySetVar("STAXJobStartTime", fStartTimestamp.getTimeString()); 891 thread.pySetVar("STAXJobSourceMachine", fSourceMachine); 892 thread.pySetVar("STAXJobSourceHandleName", fSourceHandleName); 893 thread.pySetVar("STAXJobSourceHandle", new Integer(fSourceHandle)); 894 thread.pySetVar("STAXJobStartFunctionName", 895 fDocument.getStartFunction()); 896 thread.pySetVar("STAXJobStartFunctionArgs", 897 fDocument.getStartFunctionArgs()); 898 thread.pySetVar("STAXCurrentFunction", Py.None); 899 thread.pySetVar("STAXCurrentXMLFile", fXmlFile); 900 thread.pySetVar("STAXCurrentXMLMachine", fXmlMachine); 901 902 if (!fScriptFileMachine.equals("")) 903 thread.pySetVar("STAXJobScriptFileMachine", fScriptFileMachine); 904 else 905 thread.pySetVar("STAXJobScriptFileMachine", fSourceMachine); 906 907 thread.pySetVar("STAXJobScriptFiles", fScriptFiles.toArray()); 908 thread.pySetVar("STAXJobHandle", fHandle); 909 910 thread.pySetVar("STAXServiceName", fSTAX.getServiceName()); 911 thread.pySetVar("STAXServiceMachine", fSTAX.getLocalMachineName()); 912 thread.pySetVar("STAXServiceMachineNickname", 913 fSTAX.getLocalMachineNickname()); 914 thread.pySetVar("STAXEventServiceName", fSTAX.getEventServiceName()); 915 thread.pySetVar("STAXEventServiceMachine", 916 fSTAX.getEventServiceMachine()); 917 918 thread.pySetVar("STAXJobUserLog", new STAFLog( 919 STAFLog.MACHINE, 920 fSTAX.getServiceName().toUpperCase() + "_Job_" + 921 fJobNumber + "_User", 922 fHandle, 0)); 923 thread.pySetVar("STAXJobLogName", 924 fSTAX.getServiceName().toUpperCase() + 925 "_Job_" + fJobNumber); 926 thread.pySetVar("STAXJobUserLogName", 927 fSTAX.getServiceName().toUpperCase() + 928 "_Job_" + fJobNumber + "_User"); 929 930 thread.pySetVar("STAXJobWriteLocation", fJobDataDir); 931 thread.pySetVar("STAXMessageLog", new Integer(0)); 932 thread.pySetVar("STAXLogMessage", new Integer(0)); 933 934 if (fLogTCElapsedTime) 935 thread.pySetVar("STAXLogTCElapsedTime", new Integer(1)); 936 else 937 thread.pySetVar("STAXLogTCElapsedTime", new Integer(0)); 938 939 if (fLogTCNumStarts) 940 thread.pySetVar("STAXLogTCNumStarts", new Integer(1)); 941 else 942 thread.pySetVar("STAXLogTCNumStarts", new Integer(0)); 943 944 if (fLogTCStartStop) 945 thread.pySetVar("STAXLogTCStartStop", new Integer(1)); 946 else 947 thread.pySetVar("STAXLogTCStartStop", new Integer(0)); 948 949 thread.pySetVar("STAXPythonOutput", 950 STAXPythonOutput.getPythonOutputAsString( 951 getPythonOutput())); 952 953 thread.pySetVar("STAXPythonLogLevel", getPythonLogLevel()); 954 955 // Add default signal handlers to the actionList for the main block. 956 addDefaultSignalHandlers(actionList); 957 958 // Put the "main" function/block at the bottom of the stack, 959 // with its action being a sequence group that contains the 960 // default actions (scripts and signalhandlers) in the <stax> 961 // element and then the call of the main function. 962 963 LinkedList defaultActions = fDocument.getDefaultActions(); 964 965 while (!defaultActions.isEmpty()) 966 { 967 actionList.add((STAXAction)defaultActions.removeLast()); 968 } 969 970 actionList.add(action); // Add call of main function 971 972 // Create a sequence action for the "main" function/block 973 STAXActionDefaultImpl mainSequence = new STAXSequenceAction(actionList); 974 mainSequence.setLineNumber("sequence", "<Internal>"); 975 mainSequence.setXmlFile(getXmlFile()); 976 mainSequence.setXmlMachine(getXmlMachine()); 977 978 STAXActionDefaultImpl mainBlock = new STAXBlockAction( 979 "main", mainSequence); 980 mainBlock.setLineNumber("block", "<Internal>"); 981 mainBlock.setXmlFile(getXmlFile()); 982 mainBlock.setXmlMachine(getXmlMachine()); 983 984 thread.pushAction(mainBlock); 985 986 // Change the job's state from pending to running 987 fState = RUNNING_STATE; 988 989 // Generate the job is running event 990 991 HashMap jobRunningMap = new HashMap(); 992 jobRunningMap.put("type", "job"); 993 jobRunningMap.put("status", "run"); 994 jobRunningMap.put("jobID", String.valueOf(fJobNumber)); 995 jobRunningMap.put("startFunction", fDocument.getStartFunction()); 996 jobRunningMap.put("jobName", fJobName); 997 jobRunningMap.put("startTimestamp", 998 fStartTimestamp.getTimestampString()); 999 1000 generateEvent(STAXJob.STAX_JOB_EVENT, jobRunningMap, true); 1001 1002 thread.pySetVar("STAXJob", this); 1003 1004 thread.setBreakpointFirstFunction(fBreakpointFirstFunction); 1005 1006 // Schedule the thread to run 1007 thread.schedule(); 1008 } 1009 1010 /** 1011 * This is a recursive function that checks if a function needs to 1012 * import other xml files that contain functions it requires. 1013 * 1014 * Checks if the specified function has any function-import sub-elements 1015 * that specify files to be imported. If so, it parses the files (if not 1016 * already in the cache) and adds their required functions to the job's 1017 * fFunctionMap. For any new functions added to the job's fFunctionMap, 1018 * it recursively calls the addImportedFunctions method. 1019 */ addImportedFunctions(STAXFunctionAction functionAction)1020 public void addImportedFunctions(STAXFunctionAction functionAction) 1021 throws STAFException, STAXException 1022 { 1023 List importList = functionAction.getImportList(); 1024 1025 if (importList.size() == 0) 1026 return; 1027 1028 // Process function-import list (since not empty) 1029 1030 String evalElem = "function-import"; 1031 int evalIndex = 0; 1032 Iterator iter = importList.iterator(); 1033 1034 while (iter.hasNext()) 1035 { 1036 STAXFunctionImport functionImport = 1037 (STAXFunctionImport)iter.next(); 1038 1039 String machine = functionImport.getMachine(); 1040 String file = functionImport.getFile(); 1041 String directory = functionImport.getDirectory(); 1042 1043 boolean directorySpecified = false; 1044 1045 if (directory != null) 1046 directorySpecified = true; 1047 1048 boolean machineSpecified = false; 1049 1050 if (machine != null) 1051 { 1052 machineSpecified = true; 1053 1054 // Check if "machine" contains any STAF variables 1055 1056 if (machine.indexOf("{") != -1) 1057 { 1058 // Resolve variables on the local STAX service machine 1059 1060 STAFResult result = this.submitSync( 1061 "local", "VAR", "RESOLVE STRING " + 1062 STAFUtil.wrapData(machine)); 1063 1064 if (result.rc != STAFResult.Ok) 1065 { 1066 String errorMsg = "Cause: Error resolving STAF " + 1067 "variables in the \"machine\" attribute " + 1068 "for element type \" function-import\"," + 1069 "\nRC: " + result.rc + 1070 ", Result: " + result.result; 1071 1072 functionAction.setElementInfo( 1073 new STAXElementInfo( 1074 evalElem, "machine", evalIndex, errorMsg)); 1075 1076 throw new STAXFunctionImportException( 1077 STAXUtil.formatErrorMessage(functionAction)); 1078 } 1079 1080 machine = result.result; 1081 } 1082 } 1083 else 1084 { 1085 machine = fXmlMachine; 1086 } 1087 1088 // Check if file or directory contains any STAF variables. 1089 1090 String evalAttr = "file"; 1091 String unresValue = file; 1092 1093 if (directorySpecified) 1094 { 1095 evalAttr = "directory"; 1096 unresValue = directory; 1097 } 1098 1099 if (unresValue.indexOf("{") != -1) 1100 { 1101 // Resolve variables on the local STAX service machine 1102 1103 STAFResult result = this.submitSync( 1104 "local", "VAR", "RESOLVE STRING " + 1105 STAFUtil.wrapData(unresValue)); 1106 1107 if (result.rc != STAFResult.Ok) 1108 { 1109 String errorMsg = "Cause: Error resolving STAF " + 1110 "variables in the \"" + evalAttr + "\" attribute " + 1111 "for element type \"function-import\"." + 1112 "\nRC: " + result.rc + 1113 ", Result: " + result.result; 1114 1115 functionAction.setElementInfo( 1116 new STAXElementInfo( 1117 evalElem, evalAttr, evalIndex, errorMsg)); 1118 1119 throw new STAXFunctionImportException( 1120 STAXUtil.formatErrorMessage(functionAction)); 1121 } 1122 1123 if (!directorySpecified) 1124 file = result.result; 1125 else 1126 directory = result.result; 1127 } 1128 1129 // Handle any functions specified to be imported 1130 1131 String functions = functionImport.getFunctions(); 1132 1133 Vector importedFunctionList = new Vector(); 1134 1135 if (functions != null) 1136 { 1137 // Convert string containing a whitespace-separated list of 1138 // functions to a vector. 1139 // 1140 // Note: The tokenizer uses the default delimiter set, which 1141 // is " \t\n\r\f" and consists of the space character, 1142 // the tab character, the newline character, the 1143 // carriage-return character, and the form-feed character. 1144 // Delimiter characters themselves will not be treated as 1145 // tokens. 1146 1147 StringTokenizer st = new StringTokenizer(functions); 1148 1149 while (st.hasMoreElements()) 1150 { 1151 importedFunctionList.add(st.nextElement()); 1152 } 1153 } 1154 1155 // Get the file separator for the machine where the import file/ 1156 // directory resides as this is needed to normalize the path name 1157 // and may also be needed to determine if its a relative file 1158 // name, and to get the parent directory. 1159 1160 STAFResult result = STAXFileCache.getFileSep( 1161 machine, this.getSTAX().getSTAFHandle()); 1162 1163 if (result.rc != STAFResult.Ok) 1164 { 1165 String errorMsg = "Cause: No response from machine \"" + 1166 machine + "\" when trying to get the contents of a " + 1167 "file specified by element type \"function-import\"." + 1168 "\nRC: " + result.rc + ", Result: " + result.result; 1169 1170 functionAction.setElementInfo( 1171 new STAXElementInfo( 1172 evalElem, "machine", evalIndex, errorMsg)); 1173 1174 throw new STAXFunctionImportException( 1175 STAXUtil.formatErrorMessage(functionAction)); 1176 } 1177 1178 String fileSep = result.result; 1179 1180 // Set a flag to indicate if the file/directory name is 1181 // case-sensitive (e.g. true if it resides on a Unix machine, 1182 // false if Windows) 1183 1184 boolean caseSensitiveFileName = true; 1185 1186 if (fileSep.equals("\\")) 1187 { 1188 // Windows machine so not case-sensitive 1189 1190 caseSensitiveFileName = false; 1191 } 1192 1193 // Import the file specified in the function-import element. 1194 // If the file specified in the function-import element isn't 1195 // already in the file cache, get the file and parse it. 1196 // Then, add the functions defined in this file to the main job's 1197 // function map. 1198 1199 // If no machine attribute is specified, then the file specified 1200 // could be a relative file name so need to assign its absolute 1201 // file name 1202 1203 if (!machineSpecified) 1204 { 1205 // Check if a relative path was specified 1206 1207 String entry = file; 1208 1209 if (directorySpecified) 1210 entry = directory; 1211 1212 if (STAXUtil.isRelativePath(entry, fileSep)) 1213 { 1214 // Assign the absolute name assuming it is relative 1215 // to the parent xml file's path 1216 1217 String currentFile = functionAction.getXmlFile(); 1218 1219 if (currentFile.equals(STAX.INLINE_DATA)) 1220 { 1221 // Cannot specify a relative file path if the parent 1222 // xml file is STAX.INLINE_DATA 1223 1224 String errorMsg = "Cause: Cannot specify a " + 1225 "relative path in attribute \"" + evalAttr + 1226 "\" for element type \"function-import\" when " + 1227 "the parent xml file is " + STAX.INLINE_DATA; 1228 1229 functionAction.setElementInfo( 1230 new STAXElementInfo( 1231 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1232 evalIndex, errorMsg)); 1233 1234 throw new STAXFunctionImportException( 1235 STAXUtil.formatErrorMessage(functionAction)); 1236 } 1237 1238 entry = STAXUtil.getParentPath(currentFile, fileSep) + 1239 entry; 1240 1241 if (!directorySpecified) 1242 file = entry; 1243 else 1244 directory = entry; 1245 } 1246 } 1247 1248 // Normalize the import file/directory name so that we have a 1249 // better chance at matching file names that are already cached 1250 1251 if (!directorySpecified) 1252 file = STAXUtil.normalizeFilePath(file, fileSep); 1253 else 1254 directory = STAXUtil.normalizeFilePath(directory, fileSep); 1255 1256 // Create a STAX XML Parser 1257 1258 STAXParser parser = null; 1259 1260 try 1261 { 1262 parser = new STAXParser(getSTAX()); 1263 } 1264 catch (Exception ex) 1265 { 1266 String errorMsg = ex.getClass().getName() + "\n" + 1267 ex.getMessage(); 1268 1269 functionAction.setElementInfo( 1270 new STAXElementInfo( 1271 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1272 evalIndex, errorMsg)); 1273 1274 throw new STAXFunctionImportException( 1275 STAXUtil.formatErrorMessage(functionAction)); 1276 } 1277 1278 // Create a list of files to process 1279 1280 List theFileList = new ArrayList(); 1281 1282 if (!directorySpecified) 1283 { 1284 // There will be just one file to process 1285 1286 theFileList.add(file); 1287 } 1288 else 1289 { 1290 // Submit a FS LIST DIRECTORY request for all *.xml files 1291 // in the directory 1292 1293 result = submitSync( 1294 machine, "FS", "LIST DIRECTORY " + 1295 STAFUtil.wrapData(directory) + 1296 " TYPE F EXT xml CASEINSENSITIVE"); 1297 1298 if (result.rc != 0) 1299 { 1300 evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME; 1301 String errorMsg = "Cause: "; 1302 1303 if (result.rc == STAFResult.NoPathToMachine) 1304 { 1305 errorMsg = errorMsg + "No response from machine "; 1306 evalAttr = "machine"; 1307 } 1308 else 1309 { 1310 errorMsg = errorMsg + "Error "; 1311 } 1312 1313 errorMsg = errorMsg + "when submitting a FS LIST " + 1314 "DIRECTORY request to list all *.xml files in the " + 1315 "directory specified by element type " + 1316 "\"function-import\":" + 1317 "\n Directory: " + directory + 1318 "\n Machine: " + machine + 1319 "\n\nRC: " + result.rc + ", Result: " + result.result; 1320 1321 functionAction.setElementInfo( 1322 new STAXElementInfo( 1323 evalElem, evalAttr, evalIndex, errorMsg)); 1324 1325 throw new STAXFunctionImportException( 1326 STAXUtil.formatErrorMessage(functionAction)); 1327 } 1328 1329 Iterator dirListIter = ((List)result.resultObj).iterator(); 1330 1331 while (dirListIter.hasNext()) 1332 { 1333 theFileList.add(directory + fileSep + 1334 (String)dirListIter.next()); 1335 } 1336 } 1337 1338 Iterator fileListIter = theFileList.iterator(); 1339 1340 while (fileListIter.hasNext()) 1341 { 1342 file = (String)fileListIter.next(); 1343 1344 Date dLastModified = null; 1345 STAXJob job = null; 1346 1347 // If file caching is enabled, find the modification date of 1348 // the file being imported 1349 1350 if (getSTAX().getFileCaching()) 1351 { 1352 if (STAXFileCache.get().isLocalMachine(machine)) 1353 { 1354 File fileObj = new File(file); 1355 1356 // Make sure the file exists 1357 1358 if (fileObj.exists()) 1359 { 1360 long lastModified = fileObj.lastModified(); 1361 1362 if (lastModified > 0) 1363 { 1364 // Chop off the milliseconds because some 1365 // systems don't report modTime to milliseconds 1366 1367 lastModified = ((long)(lastModified/1000))*1000; 1368 1369 dLastModified = new Date(lastModified); 1370 } 1371 } 1372 } 1373 1374 if (dLastModified == null) 1375 { 1376 // Find the remote file mod time using STAF 1377 1378 STAFResult entryResult = submitSync( 1379 machine, "FS", "GET ENTRY " + 1380 STAFUtil.wrapData(file) + " MODTIME"); 1381 1382 if (entryResult.rc == 0) 1383 { 1384 String modDate = entryResult.result; 1385 dLastModified = STAXFileCache.convertSTAXDate( 1386 modDate); 1387 } 1388 } 1389 1390 // Check for an up-to-date file in the cache 1391 1392 if ((dLastModified != null) && 1393 STAXFileCache.get().checkCache( 1394 machine, file, dLastModified, 1395 caseSensitiveFileName)) 1396 { 1397 // Get the doc from cache 1398 1399 STAXDocument doc = STAXFileCache.get().getDocument( 1400 machine, file, caseSensitiveFileName); 1401 1402 if (doc != null) 1403 { 1404 job = new STAXJob(getSTAX(), doc); 1405 } 1406 } 1407 } 1408 1409 // If the file was not in cache, then retrieve it using STAF 1410 1411 if (job == null) 1412 { 1413 result = submitSync( 1414 machine, "FS", "GET FILE " + STAFUtil.wrapData(file)); 1415 1416 if (result.rc != 0) 1417 { 1418 evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME; 1419 String errorMsg = "Cause: "; 1420 1421 if (result.rc == STAFResult.NoPathToMachine) 1422 { 1423 errorMsg = errorMsg + "No response from machine "; 1424 evalAttr = "machine"; 1425 } 1426 else 1427 { 1428 errorMsg = errorMsg + "Error "; 1429 } 1430 1431 errorMsg = errorMsg + "when submitting a FS GET " + 1432 "request to get a file specified by element type " + 1433 "\"function-import\":" + 1434 "\n File: " + file + 1435 "\n Machine: " + machine + 1436 "\n\nRC: " + result.rc + ", Result: " + result.result; 1437 1438 functionAction.setElementInfo( 1439 new STAXElementInfo( 1440 evalElem, evalAttr, evalIndex, errorMsg)); 1441 1442 throw new STAXFunctionImportException( 1443 STAXUtil.formatErrorMessage(functionAction)); 1444 } 1445 1446 // Parse the XML document 1447 1448 try 1449 { 1450 job = parser.parse(result.result, file, machine); 1451 } 1452 catch (Exception ex) 1453 { 1454 String errorMsg = "Cause: " + 1455 ex.getClass().getName() + "\n"; 1456 1457 // Make sure that the error message contains the File: 1458 // and Machine: where the error occurred 1459 1460 if (ex.getMessage().indexOf("File: ") == -1 || 1461 ex.getMessage().indexOf("Machine: ") == -1) 1462 { 1463 errorMsg = errorMsg + "\nFile: " + file + 1464 ", Machine: " + machine; 1465 } 1466 1467 errorMsg = errorMsg + ex.getMessage(); 1468 1469 functionAction.setElementInfo( 1470 new STAXElementInfo( 1471 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1472 evalIndex, errorMsg)); 1473 1474 throw new STAXFunctionImportException( 1475 STAXUtil.formatErrorMessage(functionAction)); 1476 } 1477 1478 // Add the XML document to the cache 1479 1480 if (getSTAX().getFileCaching() && (dLastModified != null)) 1481 { 1482 STAXFileCache.get().addDocument( 1483 machine, file, job.getSTAXDocument(), 1484 dLastModified, caseSensitiveFileName); 1485 } 1486 } 1487 1488 // Get a map of the functions in this xml file being imported 1489 1490 HashMap functionMap = job.getSTAXDocument().getFunctionMap(); 1491 1492 // Verify that if function names were specified in the 1493 // function-import element, verify that all the function names 1494 // specified exist in the xml file 1495 1496 if (!importedFunctionList.isEmpty()) 1497 { 1498 Iterator functionIterator = importedFunctionList.iterator(); 1499 1500 while (functionIterator.hasNext()) 1501 { 1502 String functionName = (String)functionIterator.next(); 1503 1504 if (!functionMap.containsKey(functionName)) 1505 { 1506 // Function name specified in the function-import 1507 // element's "functions" attribute does not exist 1508 1509 String errorMsg = "Cause: Function \"" + 1510 functionName + "\" does not exist in file \"" + 1511 file + "\" on machine \"" + machine + "\"."; 1512 1513 functionAction.setElementInfo( 1514 new STAXElementInfo( 1515 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1516 evalIndex, errorMsg)); 1517 1518 throw new STAXFunctionImportException( 1519 STAXUtil.formatErrorMessage(functionAction)); 1520 } 1521 } 1522 } 1523 1524 // Create a set containing the functions requested to be 1525 // imported. If no functions were specifically requested to 1526 // be imported, add all the functions in the xml file to the 1527 // set. 1528 1529 Set functionSet; 1530 1531 if (importedFunctionList.isEmpty()) 1532 functionSet = functionMap.keySet(); 1533 else 1534 functionSet = new LinkedHashSet(importedFunctionList); 1535 1536 // Add the functions requested to be imported via a 1537 // function-import element to the main job's fFunctionMap and 1538 // add any other functions that these functions also require 1539 // to the main job's fFunctionMap 1540 1541 Iterator functionIterator = functionSet.iterator(); 1542 Vector requiredFunctionList = new Vector(); 1543 1544 while (functionIterator.hasNext()) 1545 { 1546 String functionName = (String)functionIterator.next(); 1547 1548 STAXFunctionAction function = 1549 (STAXFunctionAction)functionMap.get(functionName); 1550 1551 // If this function does not exist, add the function to 1552 // the job's fFunctionMap 1553 1554 if (!(functionExists(functionName))) 1555 { 1556 // Add the function to the job's function map 1557 1558 this.addFunction(function); 1559 1560 // Check if this function requires any other functions 1561 // and if so, add them to the list of required 1562 // functions 1563 1564 StringTokenizer requiredFunctions = 1565 new StringTokenizer(function.getRequires(), " "); 1566 1567 while (requiredFunctions.hasMoreElements()) 1568 { 1569 requiredFunctionList.add( 1570 requiredFunctions.nextElement()); 1571 } 1572 1573 // Recursive call to add functions this functions 1574 // requires if specified by function-import elements 1575 1576 addImportedFunctions(function); 1577 } 1578 } 1579 1580 // Process required functions (functions that the imported 1581 // functions also require) 1582 1583 for (int i = 0; i < requiredFunctionList.size(); i++) 1584 { 1585 String functionName = 1586 (String)requiredFunctionList.elementAt(i); 1587 1588 if (!functionExists(functionName)) 1589 { 1590 addRequiredFunctions(functionName, functionMap); 1591 } 1592 } 1593 } // End while (fileListIter.hasNext()) 1594 1595 evalIndex++; 1596 } // End while 1597 } 1598 1599 /** 1600 * This recursive method adds the specified required function to the 1601 * main job's fFunctionMap. And, if this function has any function-import 1602 * elements, it recursively adds these functions that are required by the 1603 * imported function. And, if this function has any required functions 1604 * from the same xml file, it recursively adds these required functions. 1605 */ addRequiredFunctions(String functionName, HashMap functionMap)1606 private void addRequiredFunctions(String functionName, HashMap functionMap) 1607 throws STAFException, STAXException 1608 { 1609 STAXFunctionAction function = 1610 (STAXFunctionAction)functionMap.get(functionName); 1611 1612 this.addFunction(function); 1613 1614 // If this function has any function-import elements, recursively 1615 // add these functions from the imported xml file that are 1616 // required by the imported function to the main job's 1617 // fFunctionMap if not already present 1618 1619 addImportedFunctions(function); 1620 1621 // If this function has a "requires" attribute specifying 1622 // additional functions in the same xml file that it requires, 1623 // recursively add these required functions to the main job's 1624 // fFunctionMap if not already present 1625 1626 StringTokenizer requiredFunctions = new StringTokenizer( 1627 function.getRequires(), " "); 1628 1629 while (requiredFunctions.hasMoreElements()) 1630 { 1631 String reqFunctionName = (String)requiredFunctions.nextElement(); 1632 1633 if (!functionExists(reqFunctionName)) 1634 { 1635 addRequiredFunctions(reqFunctionName, functionMap); 1636 } 1637 } 1638 } 1639 addDefaultSignalHandlers(ArrayList actionList)1640 public void addDefaultSignalHandlers(ArrayList actionList) 1641 { 1642 // Array of default SignalHandlers. Each signal is described by 1643 // a signal name, an action, and a signal message variable name. 1644 // If the signal message variable name is not null, a message will 1645 // be sent to the STAX Monitor and logged with level 'error'. 1646 1647 String[][] defaultSHArray = 1648 { 1649 { 1650 "STAXPythonEvaluationError", "terminate", "STAXPythonEvalMsg" 1651 }, 1652 { 1653 "STAXProcessStartError", "continue", "STAXProcessStartErrorMsg" 1654 }, 1655 { 1656 "STAXProcessStartTimeout", "continue", 1657 "STAXProcessStartTimeoutMsg" 1658 }, 1659 { 1660 "STAXCommandStartError", "terminate", 1661 "STAXCommandStartErrorMsg" 1662 }, 1663 { 1664 "STAXFunctionDoesNotExist", "terminate", 1665 "STAXFunctionDoesNotExistMsg" 1666 }, 1667 { 1668 "STAXInvalidBlockName", "terminate", "STAXInvalidBlockNameMsg" 1669 }, 1670 { 1671 "STAXBlockDoesNotExist", "continue", "STAXBlockDoesNotExistMsg" 1672 }, 1673 { 1674 "STAXLogError", "continue", "STAXLogMsg" 1675 }, 1676 { 1677 "STAXTestcaseMissingError", "continue", 1678 "STAXTestcaseMissingMsg" 1679 }, 1680 { 1681 "STAXInvalidTcStatusResult", "continue", 1682 "STAXInvalidTcStatusResultMsg" 1683 }, 1684 { 1685 "STAXInvalidTimerValue", "terminate", 1686 "STAXInvalidTimerValueMsg" 1687 }, 1688 { 1689 "STAXNoSuchSignalHandler", "continue", 1690 "STAXNoSuchSignalHandlerMsg" 1691 }, 1692 { 1693 "STAXEmptyList", "continue", null 1694 }, 1695 { 1696 "STAXMaxThreadsExceeded", "terminate", 1697 "STAXMaxThreadsExceededMsg" 1698 }, 1699 { 1700 "STAXInvalidMaxThreads", "terminate", 1701 "STAXInvalidMaxThreadsMsg" 1702 }, 1703 { 1704 "STAXFunctionArgValidate", "terminate", 1705 "STAXFunctionArgValidateMsg" 1706 }, 1707 { 1708 "STAXImportError", "terminate", "STAXImportErrorMsg" 1709 }, 1710 { 1711 "STAXFunctionImportError", "terminate", 1712 "STAXFunctionImportErrorMsg" 1713 }, 1714 { 1715 "STAXInvalidTestcaseMode", "continue", 1716 "STAXInvalidTestcaseModeMsg" 1717 } 1718 }; 1719 1720 // Add default SignalHandlers to actionList 1721 1722 for (int i = 0; i < defaultSHArray.length; i++) 1723 { 1724 ArrayList signalHandlerActionList = new ArrayList(); 1725 1726 String signalName = defaultSHArray[i][0]; 1727 String signalAction = defaultSHArray[i][1]; 1728 String signalMsgVarName = defaultSHArray[i][2]; 1729 String signalMsgText = ""; 1730 1731 if (signalAction.equals("terminate")) 1732 { 1733 signalMsgText = "'" + signalName + " signal raised. " + 1734 "Terminating job. '" + " + " + signalMsgVarName; 1735 } 1736 else if (signalAction.equals("continue")) 1737 { 1738 signalMsgText = "'" + signalName + " signal raised. " + 1739 "Continuing job. '" + " + " + signalMsgVarName; 1740 } 1741 1742 if (signalMsgVarName == null) 1743 { 1744 // Add a No Operation (nop) Action to the action list 1745 1746 signalHandlerActionList.add(new STAXNopAction()); 1747 } 1748 else 1749 { 1750 // Add a Log Action to the action list 1751 1752 int logfile = STAXJob.JOB_LOG; 1753 1754 if (signalName == "STAXLogError") 1755 { 1756 // Log the error message in the STAX JVM log (otherwise 1757 // may get a duplicate STAXLogError signal) 1758 1759 logfile = STAXJob.JVM_LOG; 1760 } 1761 1762 // Log the message with level 'Error' and send the message 1763 // to the STAX Monitor 1764 1765 signalHandlerActionList.add(new STAXLogAction( 1766 signalMsgText, "'error'", "1", "1", logfile)); 1767 } 1768 1769 if (signalAction.equals("terminate")) 1770 { 1771 // Add a Terminate Job Action to the action list 1772 1773 signalHandlerActionList.add( 1774 new STAXTerminateAction("'main'")); 1775 } 1776 1777 // Add the signalhandler action to the action list 1778 1779 if (signalHandlerActionList.size() == 1) 1780 { 1781 // Don't need a STAXSequenceAction since only 1 action in list 1782 1783 actionList.add(new STAXSignalHandlerAction( 1784 "'" + signalName + "'", 1785 (STAXAction)signalHandlerActionList.get(0))); 1786 } 1787 else 1788 { 1789 // Wrap the action list is a STAXSequenceAction 1790 1791 actionList.add(new STAXSignalHandlerAction( 1792 "'" + signalName + "'", 1793 new STAXSequenceAction(signalHandlerActionList))); 1794 } 1795 } 1796 } 1797 1798 // 1799 // Event methods 1800 // 1801 generateEvent(String eventSubType, Map details)1802 public void generateEvent(String eventSubType, Map details) 1803 { 1804 generateEvent(eventSubType, details, false); 1805 } 1806 generateEvent(String eventSubType, Map details, boolean notifyAll)1807 public void generateEvent(String eventSubType, Map details, 1808 boolean notifyAll) 1809 { 1810 StringBuffer detailsString = new StringBuffer(); 1811 String key = ""; 1812 String value = ""; 1813 1814 Iterator keyIter = details.keySet().iterator(); 1815 1816 while (keyIter.hasNext()) 1817 { 1818 key = (String)keyIter.next(); 1819 1820 value = (String)details.get(key); 1821 1822 if (value == null) 1823 { 1824 // do nothing 1825 } 1826 else 1827 { 1828 detailsString.append("PROPERTY ").append( 1829 STAFUtil.wrapData(key + "=" + value)).append(" "); 1830 } 1831 } 1832 1833 // The type machine should always be the local machine 1834 // The details parm must already be in the :length: format 1835 STAFResult result = submitSync( 1836 fSTAX.getEventServiceMachine(), 1837 fSTAX.getEventServiceName(), "GENERATE TYPE " + 1838 fSTAX.getServiceName().toUpperCase() + "/" + 1839 fSTAX.getLocalMachineName() + "/" + fJobNumber + 1840 " SUBTYPE " + STAFUtil.wrapData(eventSubType) + 1841 " ASYNC " + detailsString.toString()); 1842 1843 // Debug 1844 if (false) 1845 { 1846 System.out.println("Event:\n" + "GENERATE TYPE " + 1847 fSTAX.getServiceName().toUpperCase() + "/" + 1848 fSTAX.getLocalMachineName() + "/" + fJobNumber + 1849 " SUBTYPE " + STAFUtil.wrapData(eventSubType) + 1850 " ASYNC " + details); 1851 } 1852 1853 if (notifyAll) 1854 { 1855 submitSync(fSTAX.getEventServiceMachine(), 1856 fSTAX.getEventServiceName(), "GENERATE TYPE " + 1857 fSTAX.getServiceName().toUpperCase() + "/" + 1858 fSTAX.getLocalMachineName() + " SUBTYPE " + 1859 STAFUtil.wrapData(eventSubType) + 1860 " ASYNC " + detailsString.toString()); 1861 1862 // Debug 1863 if (false) 1864 { 1865 System.out.println("Event:\n" + "GENERATE TYPE " + 1866 fSTAX.getServiceName().toUpperCase() + "/" + 1867 fSTAX.getLocalMachineName() + " SUBTYPE " + 1868 STAFUtil.wrapData(eventSubType) + 1869 " ASYNC " + details); 1870 } 1871 } 1872 } 1873 log(int logfile, String level, String message)1874 public STAFResult log(int logfile, String level, String message) 1875 { 1876 if (logfile == STAXJob.JVM_LOG) 1877 { 1878 // Log to the JVM log instead of a STAX Job log 1879 1880 STAXTimestamp currentTimestamp = new STAXTimestamp(); 1881 1882 System.out.println( 1883 currentTimestamp.getTimestampString() + " " + message); 1884 1885 return new STAFResult(); 1886 } 1887 1888 String serviceName = fSTAX.getServiceName().toUpperCase(); 1889 String logName; 1890 1891 // Don't resolve messages logged to the STAX_Service log and to 1892 // STAX Job logs to avoid possible RC 13 errors which would prevent 1893 // the message from being logged. 1894 1895 boolean noResolveMessage = false; 1896 1897 if (logfile == STAXJob.SERVICE_LOG) 1898 { 1899 logName = serviceName + "_Service"; 1900 noResolveMessage = true; 1901 } 1902 else if (logfile == STAXJob.JOB_LOG) 1903 { 1904 logName = serviceName + "_Job_" + fJobNumber; 1905 noResolveMessage = true; 1906 } 1907 else if (logfile == STAXJob.USER_JOB_LOG) 1908 { 1909 logName = serviceName + "_Job_" + fJobNumber + "_User"; 1910 } 1911 else 1912 { 1913 // Log to STAX_Service log if invalid logfile specified 1914 1915 STAXTimestamp currentTimestamp = new STAXTimestamp(); 1916 System.out.println(currentTimestamp.getTimestampString() + 1917 " STAX Service Error: Invalid Logfile " + 1918 logfile); 1919 1920 logName = serviceName + "_Service"; 1921 noResolveMessage = true; 1922 } 1923 1924 String logRequest = "LOG MACHINE LOGNAME " + 1925 STAFUtil.wrapData(logName) + " LEVEL " + level; 1926 1927 if (noResolveMessage) logRequest += " NORESOLVEMESSAGE"; 1928 1929 logRequest += " MESSAGE " + STAFUtil.wrapData(message); 1930 1931 STAFResult result = submitSync("LOCAL", "LOG", logRequest); 1932 1933 // Check if the result was unsuccessful except ignore the following 1934 // errors: 1935 // - UnknownService (2) in case the LOG service is not registered 1936 // - HandleDoesNotExist (5) in case the STAX job's handle has been 1937 // unregistered indicating the job has completed 1938 1939 if ((result.rc != STAFResult.Ok) && 1940 (result.rc != STAFResult.UnknownService) && 1941 (result.rc != STAFResult.HandleDoesNotExist)) 1942 { 1943 if (logfile != STAXJob.USER_JOB_LOG) 1944 { 1945 STAXTimestamp currentTimestamp = new STAXTimestamp(); 1946 1947 System.out.println( 1948 currentTimestamp.getTimestampString() + 1949 " Log request failed with RC " + result.rc + 1950 " and Result " + result.result + 1951 " level: " + level + 1952 " logRequest: " + logRequest); 1953 } 1954 else if ((result.rc == STAFResult.VariableDoesNotExist) && 1955 (!noResolveMessage)) 1956 { 1957 // Retry logging the error message without resolving 1958 // variables in the message 1959 1960 logRequest += " NORESOLVEMESSAGE"; 1961 1962 result = submitSync("LOCAL", "LOG", logRequest); 1963 } 1964 } 1965 1966 return result; 1967 } 1968 clearLogs()1969 public void clearLogs() 1970 { 1971 String serviceName = fSTAX.getServiceName().toUpperCase(); 1972 String jobLogName; 1973 String userJobLogName; 1974 1975 jobLogName = serviceName + "_Job_" + fJobNumber; 1976 userJobLogName = serviceName + "_Job_" + fJobNumber + "_User"; 1977 1978 String logRequest = "DELETE MACHINE " + 1979 fSTAX.getLocalMachineNickname() + " LOGNAME " + 1980 STAFUtil.wrapData(jobLogName) + " CONFIRM"; 1981 1982 STAFResult result = submitSync("LOCAL", "LOG", logRequest); 1983 1984 logRequest = "DELETE MACHINE " + fSTAX.getLocalMachineNickname() + 1985 " LOGNAME " + STAFUtil.wrapData(userJobLogName) + " CONFIRM"; 1986 1987 result = submitSync("LOCAL", "LOG", logRequest); 1988 } 1989 clearJobDataDir()1990 public void clearJobDataDir() 1991 { 1992 // Delete the job data directory and recreate it 1993 1994 File dir = new File(fJobDataDir); 1995 1996 if (dir.exists()) 1997 { 1998 String deleteDirRequest = "DELETE ENTRY " + 1999 STAFUtil.wrapData(fJobDataDir) + " RECURSE CONFIRM"; 2000 2001 submitSync("local", "FS", deleteDirRequest); 2002 } 2003 2004 if (!dir.exists()) 2005 { 2006 dir.mkdirs(); 2007 } 2008 } 2009 getBreakpointFunctionList()2010 public List getBreakpointFunctionList() 2011 { 2012 return fBreakpointFunctionList; 2013 } 2014 setBreakpointFunctionList(List functionList)2015 public void setBreakpointFunctionList(List functionList) 2016 { 2017 synchronized(fBreakpointFunctionList) 2018 { 2019 fBreakpointFunctionList = functionList; 2020 } 2021 } 2022 addBreakpointFunction(String functionName)2023 public int addBreakpointFunction(String functionName) 2024 { 2025 int breakpointID = getNextBreakpointNumber(); 2026 2027 synchronized(fBreakpointFunctionList) 2028 { 2029 fBreakpointFunctionList.add(functionName); 2030 } 2031 2032 synchronized (fBreakpointsMap) 2033 { 2034 fBreakpointsMap.put(String.valueOf(breakpointID), 2035 new STAXBreakpoint(BREAKPOINT_FUNCTION, 2036 functionName, "", "", "")); 2037 } 2038 2039 return breakpointID; 2040 } 2041 isBreakpointFunction(String functionName)2042 public boolean isBreakpointFunction(String functionName) 2043 { 2044 synchronized (fBreakpointsMap) 2045 { 2046 Iterator iter = fBreakpointsMap.keySet().iterator(); 2047 2048 while (iter.hasNext()) 2049 { 2050 String id = (String)iter.next(); 2051 2052 STAXBreakpoint breakpoint = 2053 (STAXBreakpoint)fBreakpointsMap.get(id); 2054 2055 if (functionName.equals(breakpoint.getFunction())) 2056 { 2057 return true; 2058 } 2059 } 2060 } 2061 return false; 2062 } 2063 getBreakpointLineList()2064 public List getBreakpointLineList() 2065 { 2066 return fBreakpointLineList; 2067 } 2068 getBreakpointsMap()2069 public TreeMap getBreakpointsMap() 2070 { 2071 return fBreakpointsMap; 2072 } 2073 setBreakpointLineList(List lineList)2074 public void setBreakpointLineList(List lineList) 2075 { 2076 synchronized(fBreakpointLineList) 2077 { 2078 fBreakpointLineList = lineList; 2079 } 2080 } 2081 removeBreakpoint(String id)2082 public STAFResult removeBreakpoint(String id) 2083 { 2084 synchronized(fBreakpointsMap) 2085 { 2086 if (!(fBreakpointsMap.containsKey(id))) 2087 { 2088 return new STAFResult(STAFResult.DoesNotExist, id); 2089 } 2090 else 2091 { 2092 fBreakpointsMap.remove(id); 2093 2094 return new STAFResult(STAFResult.Ok); 2095 } 2096 } 2097 } 2098 addBreakpointLine(String line, String file, String machine)2099 public int addBreakpointLine(String line, String file, String machine) 2100 { 2101 int breakpointID = getNextBreakpointNumber(); 2102 2103 synchronized(fBreakpointLineList) 2104 { 2105 fBreakpointLineList.add(line + " " + machine + " " + file); 2106 } 2107 2108 synchronized (fBreakpointsMap) 2109 { 2110 fBreakpointsMap.put(String.valueOf(breakpointID), 2111 new STAXBreakpoint(BREAKPOINT_LINE, 2112 "", line, file, machine)); 2113 } 2114 2115 return breakpointID; 2116 } 2117 isBreakpointLine(String line, String file, String machine)2118 public boolean isBreakpointLine(String line, String file, String machine) 2119 { 2120 synchronized (fBreakpointsMap) 2121 { 2122 Iterator iter = fBreakpointsMap.keySet().iterator(); 2123 2124 while (iter.hasNext()) 2125 { 2126 String id = (String)iter.next(); 2127 2128 STAXBreakpoint breakpoint = 2129 (STAXBreakpoint)fBreakpointsMap.get(id); 2130 2131 if (line.equals(breakpoint.getLine()) && 2132 file.equals(breakpoint.getFile()) && 2133 (breakpoint.getMachine().equals("") || 2134 machine.equalsIgnoreCase(breakpoint.getMachine()))) 2135 { 2136 return true; 2137 } 2138 } 2139 } 2140 return false; 2141 } 2142 breakpointsEmpty()2143 public boolean breakpointsEmpty() 2144 { 2145 return (fBreakpointsMap.isEmpty()); 2146 } 2147 2148 // Submit methods 2149 submitAsync(String location, String service, String request, STAXSTAFRequestCompleteListener listener)2150 public STAFResult submitAsync(String location, String service, 2151 String request, STAXSTAFRequestCompleteListener listener) 2152 { 2153 STAFResult result; 2154 2155 synchronized(fRequestMap) 2156 { 2157 result = fHandle.submit2( 2158 STAFHandle.ReqQueue, location, service, request); 2159 2160 if (result.rc == STAFResult.Ok) 2161 { 2162 try 2163 { 2164 // Convert request number to a negative integer if greater 2165 // then Integer.MAX_VALUE 2166 2167 Integer requestNumber = STAXUtil.convertRequestNumber( 2168 result.result); 2169 2170 fRequestMap.put(requestNumber, listener); 2171 2172 result.result = requestNumber.toString(); 2173 } 2174 catch (NumberFormatException e) 2175 { 2176 System.out.println( 2177 (new STAXTimestamp()).getTimestampString() + 2178 " STAXJob::submitAsync - " + e.toString()); 2179 2180 result.result = "0"; 2181 } 2182 } 2183 } 2184 2185 return result; 2186 } 2187 submitSync(String location, String service, String request)2188 public STAFResult submitSync(String location, String service, 2189 String request) 2190 { 2191 STAFHandle theHandle = fHandle; 2192 2193 // If the STAX job's handle has not yet been assigned, use the 2194 // STAX service's handle to submit the STAF service request 2195 2196 if (fHandle == null) 2197 theHandle = fSTAX.getSTAFHandle(); 2198 2199 STAFResult result = theHandle.submit2( 2200 STAFHandle.ReqSync, location, service, request); 2201 2202 return result; 2203 } 2204 submitAsyncForget(String location, String service, String request)2205 public STAFResult submitAsyncForget(String location, String service, 2206 String request) 2207 { 2208 STAFResult result = fHandle.submit2(STAFHandle.ReqFireAndForget, 2209 location, service, request); 2210 2211 return result; 2212 } 2213 2214 // STAXSTAFQueueListener method 2215 handleQueueMessage(STAXSTAFMessage message, STAXJob job)2216 public void handleQueueMessage(STAXSTAFMessage message, STAXJob job) 2217 { 2218 int requestNumber = message.getRequestNumber(); 2219 2220 STAXSTAFRequestCompleteListener listener = null; 2221 2222 synchronized (fRequestMap) 2223 { 2224 Integer key = new Integer(requestNumber); 2225 2226 listener = (STAXSTAFRequestCompleteListener)fRequestMap.get(key); 2227 2228 if (listener != null) 2229 { 2230 fRequestMap.remove(key); 2231 } 2232 } 2233 2234 if (listener != null) 2235 { 2236 listener.requestComplete(requestNumber, new STAFResult( 2237 message.getRequestRC(), message.getRequestResult())); 2238 } 2239 else 2240 { // Log a message in the job log 2241 String msg = "STAXJob.handleQueueMessage: " + 2242 " No listener found for message:\n" + 2243 STAFMarshallingContext.unmarshall( 2244 message.getResult()).toString(); 2245 job.log(STAXJob.JOB_LOG, "warning", msg); 2246 } 2247 } 2248 2249 // STAXThreadCompleteListener method 2250 threadComplete(STAXThread thread, int endCode)2251 public void threadComplete(STAXThread thread, int endCode) 2252 { 2253 // Debug: 2254 if (false) 2255 { 2256 System.out.println("Thread #" + thread.getThreadNumber() + 2257 " complete"); 2258 } 2259 2260 boolean jobComplete = false; 2261 2262 synchronized (fThreadMap) 2263 { 2264 fThreadMap.remove(thread.getThreadNumberAsInteger()); 2265 2266 if (fThreadMap.isEmpty()) 2267 jobComplete = true; 2268 } 2269 2270 if (jobComplete == true) 2271 { 2272 // Perform terminateJob on interested parties 2273 2274 fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this) 2275 { 2276 public void visit(Object o, Iterator iter) 2277 { 2278 STAXJobManagementHandler handler = 2279 (STAXJobManagementHandler)o; 2280 2281 handler.terminateJob((STAXJob)fData); 2282 } 2283 }); 2284 2285 // Set the result so that it is available upon job completion. 2286 2287 String resultToEval = "STAXResult"; 2288 2289 try 2290 { 2291 if (thread.pyBoolEval("isinstance(STAXResult, STAXGlobal)")) 2292 { 2293 // Use the STAXGlobal class's get() method so that the job 2294 // result when using toString() will be it's contents and 2295 // not org.python.core.PyFinalizableInstance 2296 2297 resultToEval = "STAXResult.get()"; 2298 } 2299 } 2300 catch (STAXPythonEvaluationException e) 2301 { /* Ignore error and assume not a STAXGlobal object */ } 2302 2303 try 2304 { 2305 fResult = thread.pyObjectEval(resultToEval); 2306 } 2307 catch (STAXPythonEvaluationException e) 2308 { 2309 fResult = Py.None; 2310 } 2311 2312 // Log the result from the job in a Status message in the Job log 2313 2314 STAFMarshallingContext mc = STAFMarshallingContext. 2315 unmarshall(fResult.toString()); 2316 log(STAXJob.JOB_LOG, "status", "Job Result: " + mc); 2317 2318 // Log a Stop message for the job in the STAX Service and Job logs 2319 2320 String msg = "JobID: " + fJobNumber; 2321 log(STAXJob.SERVICE_LOG, "stop", msg); 2322 log(STAXJob.JOB_LOG, "stop", msg); 2323 2324 // Get the current date and time and set as job ending date/time 2325 2326 fEndTimestamp = new STAXTimestamp(); 2327 2328 // Generate job completion event 2329 2330 HashMap jobEndMap = new HashMap(); 2331 jobEndMap.put("type", "job"); 2332 jobEndMap.put("block", "main"); 2333 jobEndMap.put("status", "end"); 2334 jobEndMap.put("jobID", String.valueOf(fJobNumber)); 2335 jobEndMap.put("result", fResult.toString()); 2336 jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString()); 2337 2338 generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true); 2339 2340 // Query its job log for any messages with level "Error". 2341 // Save the result's marshalling context so it's available when 2342 // writing the job result to a file and when the RETURNRESULT 2343 // option is specified on a STAX EXECUTE request 2344 2345 fJobLogErrorsMC = getJobLogErrors(); 2346 2347 // Write the job result information to files in the STAX job 2348 // directory 2349 2350 writeJobResultsToFile(); 2351 2352 // Send a STAF/Service/STAX/End message to indicate the job has 2353 // completed 2354 2355 submitSync("local", "QUEUE", "QUEUE TYPE STAF/Service/STAX/End " + 2356 "MESSAGE " + STAFUtil.wrapData("")); 2357 2358 // Debug 2359 if (COUNT_PYCODE_CACHES && STAX.CACHE_PYTHON_CODE) 2360 { 2361 System.out.println( 2362 "Job " + fJobNumber + ": " + 2363 " cacheGets=" + fCompiledPyCodeCacheGets + 2364 " cacheAdds=" + fCompiledPyCodeCacheAdds); 2365 } 2366 2367 while (!fCompletionNotifiees.isEmpty()) 2368 { 2369 STAXJobCompleteListener listener = 2370 (STAXJobCompleteListener)fCompletionNotifiees.removeFirst(); 2371 2372 if (listener != null) listener.jobComplete(this); 2373 } 2374 2375 try 2376 { 2377 fHandle.unRegister(); 2378 } 2379 catch (STAFException e) 2380 { 2381 /* Do Nothing */ 2382 } 2383 2384 // Clear out job's private variables 2385 2386 fScripts = new ArrayList(); 2387 fScriptFiles = new ArrayList(); 2388 fThreadMap = new LinkedHashMap(); 2389 fCompletionNotifiees = new LinkedList(); 2390 fCompiledPyCodeCache = new HashMap(); 2391 2392 // Commented out setting these variables to new (empty) objects, 2393 // as this could cause the "Testcases" and "Testcase Totals" values 2394 // in the job result to be empty due to timing issues 2395 //fTestcaseList = new ArrayList(); 2396 //fTestcaseTotalsMap = new HashMap(); 2397 2398 // Commented out setting the following variables to null because 2399 // STAFQueueMonitor.notifyListeners() is running in another 2400 // thread and could access these variables and get a NPE. 2401 // fQueueListenerMap = null; 2402 // fHandle = null; 2403 // fSTAFQueueMonitor = null; 2404 // fRequestMap = new HashMap(); 2405 // 2406 // synchronized (fDataMap) 2407 // { 2408 // fDataMap = new HashMap(); 2409 // } 2410 } 2411 } 2412 2413 /** 2414 * This method is called to clean up a job that is in a pending state. 2415 * This is a job that has had a job number assigned to it but did not 2416 * start execution because either an error occurred (e.g. xml parsing, 2417 * etc) before the job actually started execution or because the TEST 2418 * option was specified, which indicates to test that the job doesn't 2419 * contain xml parsing or Python compile errors and to not actually 2420 * run the job. 2421 */ cleanupPendingJob(STAFResult result)2422 public void cleanupPendingJob(STAFResult result) 2423 { 2424 STAXJob job = fSTAX.removeFromJobMap(getJobNumberAsInteger()); 2425 2426 if (job == null) 2427 { 2428 // The job is not in the job map 2429 return; 2430 } 2431 2432 if (result.rc != STAFResult.Ok) 2433 { 2434 setCompletionStatus(TERMINATED_STATUS); 2435 2436 // Log the error message in the job log 2437 2438 if (STAFMarshallingContext.isMarshalledData(result.result)) 2439 { 2440 try 2441 { 2442 STAFMarshallingContext mc = 2443 STAFMarshallingContext.unmarshall(result.result); 2444 Map resultMap = (Map)mc.getRootObject(); 2445 result.result = (String)resultMap.get("errorMsg"); 2446 } 2447 catch (Exception e) 2448 { 2449 // Ignore any errors 2450 System.out.println(e.toString()); 2451 } 2452 } 2453 2454 log(STAXJob.JOB_LOG, "error", result.result); 2455 } 2456 2457 // Log the testcase totals in the Job log by first calling the 2458 // STAXTestcaseActionFactory's initJob() method (to initialize the 2459 // testcase totals to 0) and then by calling its terminateJob() method 2460 // which logs the testcase totals 2461 2462 STAXJobManagementHandler testcaseHandler = 2463 (STAXJobManagementHandler)fSTAX.getActionFactory("testcase"); 2464 2465 testcaseHandler.initJob(this); 2466 testcaseHandler.terminateJob(this); 2467 2468 // Log the result from the job in a Status message in the Job log 2469 2470 STAFMarshallingContext mc = STAFMarshallingContext. 2471 unmarshall(fResult.toString()); 2472 log(STAXJob.JOB_LOG, "status", "Job Result: " + mc); 2473 2474 // Log a Stop message for the job in the STAX Service and Job logs 2475 2476 String msg = "JobID: " + fJobNumber; 2477 log(STAXJob.SERVICE_LOG, "stop", msg); 2478 log(STAXJob.JOB_LOG, "stop", msg); 2479 2480 if (fEndTimestamp == null) 2481 fEndTimestamp = new STAXTimestamp(); 2482 2483 // Generate job completion event 2484 2485 HashMap jobEndMap = new HashMap(); 2486 jobEndMap.put("type", "job"); 2487 jobEndMap.put("block", "main"); 2488 jobEndMap.put("status", "end"); 2489 jobEndMap.put("jobID", String.valueOf(fJobNumber)); 2490 jobEndMap.put("result", fResult.toString()); 2491 jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString()); 2492 2493 generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true); 2494 2495 // Query its job log for any messages with level "Error". 2496 // Save the result's marshalling context so it's available when 2497 // writing the job result to a file and when the RETURNRESULT 2498 // option is specified on a STAX EXECUTE request 2499 2500 fJobLogErrorsMC = getJobLogErrors(); 2501 2502 // Write the job result information to files in the STAX job 2503 // directory 2504 2505 writeJobResultsToFile(); 2506 2507 if (fHandle != null) 2508 { 2509 // Unregister the job's handle 2510 try 2511 { 2512 fHandle.unRegister(); 2513 } 2514 catch (STAFException e) 2515 { 2516 /* Do Nothing */ 2517 } 2518 } 2519 } 2520 2521 /** 2522 * Query the STAX Job Log to get any messages with level "Error" that 2523 * were logged for this job. 2524 * 2525 * @return STAFMarshallingContext Return a marshalling context for the 2526 * result from the LOG QUERY request whose root object is a list of 2527 * error messages. 2528 */ getJobLogErrors()2529 private STAFMarshallingContext getJobLogErrors() 2530 { 2531 String jobLogName = fSTAX.getServiceName().toUpperCase() + "_Job_" + 2532 getJobNumber(); 2533 2534 String request = "QUERY MACHINE " + fSTAX.getLocalMachineNickname() + 2535 " LOGNAME " + STAFUtil.wrapData(jobLogName) + 2536 " LEVELMASK Error" + 2537 " FROM " + getStartTimestamp().getTimestampString(); 2538 2539 STAFResult result = submitSync("local", "LOG", request); 2540 2541 // RC 4010 from the LOG QUERY request means exceeded the default 2542 // maximum query records 2543 2544 if ((result.rc == STAFResult.Ok) || (result.rc == 4010)) 2545 return result.resultContext; 2546 2547 // Ignore the following errors from the LOG QUERY request: 2548 // - UnknownService (2) in case the LOG service is not registered 2549 // - HandleDoesNotExist (5) in case the STAX job's handle has been 2550 // unregistered indicating the job has completed 2551 2552 if ((result.rc != STAFResult.UnknownService) && 2553 (result.rc != STAFResult.HandleDoesNotExist)) 2554 { 2555 // Log an error in the JVM log 2556 2557 STAXTimestamp currentTimestamp = new STAXTimestamp(); 2558 2559 System.out.println( 2560 currentTimestamp.getTimestampString() + " " + 2561 "STAXJob::getJobLogErrors() failed for Job ID: " + 2562 getJobNumber() + "\nSTAF local LOG " + request + 2563 " RC=" + result.rc + ", Result=" + result.result); 2564 } 2565 2566 return null; 2567 } 2568 2569 /* 2570 * Write the marshalled job result information to files in the STAX job 2571 * directory 2572 */ writeJobResultsToFile()2573 private void writeJobResultsToFile() 2574 { 2575 // Create the marshalling context for the results without the testcase 2576 // list 2577 2578 STAFMarshallingContext resultMC = new STAFMarshallingContext(); 2579 resultMC.setMapClassDefinition(fSTAX.fResultMapClass); 2580 resultMC.setMapClassDefinition( 2581 STAXTestcaseActionFactory.fTestcaseTotalsMapClass); 2582 2583 Map resultMap = fSTAX.fResultMapClass.createInstance(); 2584 2585 resultMap = addCommonFieldsToResultMap(resultMap); 2586 2587 resultMC.setRootObject(resultMap); 2588 2589 String marshalledResultString = resultMC.marshall(); 2590 2591 // Write the marshalled string for the results without the testcase 2592 // list to file marshalledResults.txt in directory fJobDataDir 2593 2594 writeStringToFile(fSTAX.getResultFileName(fJobNumber), 2595 marshalledResultString); 2596 2597 // Create the marshalling context for the results with the testcase 2598 // list 2599 2600 resultMC = new STAFMarshallingContext(); 2601 resultMC.setMapClassDefinition(fSTAX.fDetailedResultMapClass); 2602 resultMC.setMapClassDefinition( 2603 STAXTestcaseActionFactory.fTestcaseTotalsMapClass); 2604 resultMC.setMapClassDefinition( 2605 STAXTestcaseActionFactory.fQueryTestcaseMapClass); 2606 2607 resultMap = fSTAX.fDetailedResultMapClass.createInstance(); 2608 2609 resultMap = addCommonFieldsToResultMap(resultMap); 2610 2611 resultMap.put("testcaseList", fTestcaseList); 2612 2613 resultMC.setRootObject(resultMap); 2614 2615 marshalledResultString = resultMC.marshall(); 2616 2617 // Write the marshalled string for the results with the testcase 2618 // list to file marshalledResultsLong.txt in directory fJobDataDir 2619 2620 writeStringToFile(fSTAX.getDetailedResultFileName(fJobNumber), 2621 marshalledResultString); 2622 } 2623 addCommonFieldsToResultMap(Map inResultMap)2624 private Map addCommonFieldsToResultMap(Map inResultMap) 2625 { 2626 Map resultMap = new HashMap(inResultMap); 2627 2628 if (!fJobName.equals("")) 2629 resultMap.put("jobName", fJobName); 2630 2631 resultMap.put("startTimestamp", 2632 fStartTimestamp.getTimestampString()); 2633 resultMap.put("endTimestamp", 2634 fEndTimestamp.getTimestampString()); 2635 resultMap.put("status", getCompletionStatusAsString()); 2636 resultMap.put("result", fResult.toString()); 2637 resultMap.put("testcaseTotals", fTestcaseTotalsMap); 2638 resultMap.put("jobLogErrors", fJobLogErrorsMC); 2639 resultMap.put("xmlFileName", fXmlFile); 2640 resultMap.put("fileMachine", fXmlMachine); 2641 resultMap.put("function", getStartFunction()); 2642 resultMap.put("arguments", 2643 STAFUtil.maskPrivateData(getStartFuncArgs())); 2644 resultMap.put("scriptList", fScripts); 2645 resultMap.put("scriptFileList", fScriptFiles); 2646 2647 if (!fScriptFileMachine.equals("")) 2648 resultMap.put("scriptMachine", fScriptFileMachine); 2649 2650 return resultMap; 2651 } 2652 2653 /** 2654 * Write the contents of a string to the specified file name 2655 */ writeStringToFile(String fileName, String data)2656 private void writeStringToFile(String fileName, String data) 2657 { 2658 FileWriter out = null; 2659 2660 try 2661 { 2662 File outFile = new File(fileName); 2663 out = new FileWriter(outFile); 2664 out.write(data); 2665 } 2666 catch (IOException e) 2667 { 2668 log(STAXJob.JVM_LOG, "Error", 2669 "Error writing to job results file: " + fileName + ".\n" + 2670 e.toString()); 2671 } 2672 finally 2673 { 2674 try 2675 { 2676 if (out != null) out.close(); 2677 } 2678 catch (IOException e) 2679 { 2680 // Do nothing 2681 } 2682 } 2683 } 2684 2685 private STAX fSTAX; 2686 private STAXDocument fDocument; 2687 private STAXCallAction fDefaultCallAction = null; 2688 private Object fNextThreadNumberSynch = new Object(); 2689 private int fNextThreadNumber = 1; 2690 private int fJobNumber = 0; 2691 private Object fNextProcNumberSynch = new Object(); 2692 private int fProcNumber = 1; 2693 private Object fNextCmdNumberSynch = new Object(); 2694 private int fCmdNumber = 1; 2695 private Object fNextProcessKeySynch = new Object(); 2696 private int fProcessKey = 1; 2697 private String fJobDataDir = new String(); 2698 private String fJobName = new String(); 2699 private String fXmlMachine = new String(); 2700 private String fXmlFile = new String(); 2701 private boolean fClearlogs; 2702 private int fMaxSTAXThreads; 2703 private String fWaitTimeout = null; 2704 private String fStartFunction = new String(); 2705 private String fStartFuncArgs = new String(); 2706 private List fScripts = new ArrayList(); 2707 private List fScriptFiles = new ArrayList(); 2708 private String fScriptFileMachine = new String(); 2709 private Map fThreadMap = new LinkedHashMap(); 2710 private STAFHandle fHandle = null; 2711 private STAFQueueMonitor fSTAFQueueMonitor = null; 2712 private LinkedList fCompletionNotifiees = new LinkedList(); 2713 private boolean fExecuteAndHold = false; 2714 private String fSourceMachine = new String(); // Source == Originating 2715 private String fSourceHandleName = new String(); 2716 private int fSourceHandle; 2717 private int fNotifyOnEnd = STAXJob.NO_NOTIFY_ONEND; 2718 private int fState = PENDING_STATE; 2719 private PyObject fResult = Py.None; 2720 private int fCompletionStatus = STAXJob.NORMAL_STATUS; 2721 private STAXTimestamp fStartTimestamp; 2722 private STAXTimestamp fEndTimestamp; 2723 private HashMap fQueueListenerMap = new HashMap(); 2724 private HashMap fDataMap = new HashMap(); 2725 private HashMap fRequestMap = new HashMap(); // Map of active STAF requests 2726 private boolean fLogTCElapsedTime; 2727 private boolean fLogTCNumStarts; 2728 private boolean fLogTCStartStop; 2729 private int fPythonOutput; 2730 private String fPythonLogLevel; 2731 private HashMap fCompiledPyCodeCache = new HashMap(); 2732 private long fCompiledPyCodeCacheAdds = 0; 2733 private long fCompiledPyCodeCacheGets = 0; 2734 private List fTestcaseList = new ArrayList(); 2735 private List fBreakpointFunctionList = new ArrayList(); 2736 private List fBreakpointLineList = new ArrayList(); 2737 private Map fTestcaseTotalsMap = new HashMap(); 2738 private TreeMap fBreakpointsMap = new TreeMap(); 2739 private Object fNextBreakpointNumberSynch = new Object(); 2740 private int fNextBreakpointNumber = 1; 2741 private boolean fBreakpointFirstFunction = false; 2742 private boolean fBreakpointSubjobFirstFunction = false; 2743 private STAFMarshallingContext fJobLogErrorsMC = 2744 new STAFMarshallingContext(); 2745 2746 class STAFQueueMonitor extends Thread 2747 { STAFQueueMonitor(STAXJob job)2748 STAFQueueMonitor(STAXJob job) 2749 { 2750 fJob = job; 2751 } 2752 2753 /** 2754 * Log an error message in the JVM log and the STAX Job log 2755 */ logMessage(String message, Throwable t)2756 public void logMessage(String message, Throwable t) 2757 { 2758 STAXTimestamp currentTimestamp = new STAXTimestamp(); 2759 2760 message = "STAXJob$STAFQueueMonitor.run(): " + message; 2761 2762 if (t != null) 2763 { 2764 // Add the Java stack trace to the message 2765 2766 StringWriter sw = new StringWriter(); 2767 t.printStackTrace(new PrintWriter(sw)); 2768 2769 if (t.getMessage() != null) 2770 message += "\n" + t.getMessage() + "\n" + sw.toString(); 2771 else 2772 message += "\n" + sw.toString(); 2773 } 2774 2775 // Truncate the message to a maximum size of 3000 (in case 2776 // the result from the QUEUE GET WAIT request is very large) 2777 2778 if (message.length() > 3000) 2779 message = message.substring(0, 3000) + "..."; 2780 2781 // Log an error message in the STAX JVM log 2782 2783 System.out.println( 2784 currentTimestamp.getTimestampString() + 2785 " Error: STAX Job ID " + fJob.getJobNumber() + ". " + 2786 message); 2787 2788 // Log an error message in the STAX Job log 2789 2790 fJob.log(STAXJob.JOB_LOG, "Error", message); 2791 } 2792 run()2793 public void run() 2794 { 2795 STAFMarshallingContext mc; 2796 List messageList; 2797 int maxMessages = fJob.getSTAX().getMaxGetQueueMessages(); 2798 String request = "GET WAIT FIRST " + maxMessages; 2799 STAFResult result; 2800 int numErrors = 0; 2801 2802 // Maximum consecutive errors submitting a local QUEUE GET WAIT 2803 // request before we decide to exit the infinite loop 2804 int maxErrors = 5; 2805 2806 // Process messages on the STAX job handle's queue until we get 2807 // a "STAF/Service/STAX/End" message or until an error occurs 5 2808 // consecutive times submitting a STAF local QUEUE GET request 2809 // (so that we don't get stuck in an infinite loop eating CPU). 2810 2811 for (;;) 2812 { 2813 result = null; 2814 2815 try 2816 { 2817 // For better performance when more than 1 message is on 2818 // the queue waiting to be processed, get multiple 2819 // messages off the queue at a time (up to maxMessages) 2820 // so that the total size of the messages hopefully won't 2821 // cause an OutOfMemory problem. 2822 // Note: If an error occurs unmarshalling the list of 2823 // messages, all these messages will be lost. 2824 2825 result = submitSync("local", "QUEUE", request); 2826 2827 if (result == null) 2828 { 2829 numErrors++; 2830 2831 logMessage( 2832 "STAF local QUEUE " + request + 2833 " returned null. This may have been caused by " + 2834 "running out of memory creating the result.", 2835 null); 2836 2837 if (numErrors < maxErrors) 2838 { 2839 continue; 2840 } 2841 else 2842 { 2843 logMessage( 2844 "Exiting this thread after the QUEUE GET " + 2845 "request failed " + maxErrors + 2846 " consecutive times.", null); 2847 2848 return; // Exit STAFQueueMonitor thread 2849 } 2850 } 2851 2852 if (result.rc == STAFResult.Ok) 2853 { 2854 numErrors = 0; 2855 } 2856 else if (result.rc == STAFResult.HandleDoesNotExist) 2857 { 2858 // This means that the STAX job's handle has been 2859 // unregistered which means that the STAX job is no 2860 // longer running so we should exit this thread. 2861 // We've seen this happen before this thread gets 2862 // the message with type "STAF/Service/STAX/End" off 2863 // the queue. 2864 2865 return; // Exit STAFQueueMonitor thread 2866 } 2867 else 2868 { 2869 numErrors++; 2870 2871 logMessage( 2872 "STAF local QUEUE " + request + 2873 " failed with RC=" + result.rc + ", Result=" + 2874 result.result, null); 2875 2876 if (numErrors < maxErrors) 2877 { 2878 continue; 2879 } 2880 else 2881 { 2882 logMessage( 2883 "Exiting this thread after the QUEUE GET " + 2884 "request failed " + maxErrors + 2885 " consecutive times", null); 2886 2887 return; // Exit STAFQueueMonitor thread 2888 } 2889 } 2890 } 2891 catch (Throwable t) 2892 { 2893 // Note: One possible reason for an exception occurring 2894 // submitting a QUEUE GET WAIT request can be because 2895 // an error occurred when the result was auto-unmarshalled 2896 // due to invalid marshalled data in the result (though 2897 // in STAF V3.3.3 a fix was made to the Java unmarshall 2898 // method so that this problem should no longer occur). 2899 2900 numErrors++; 2901 2902 logMessage( 2903 "Exception getting message(s) off the queue.", t); 2904 2905 if (numErrors < maxErrors) 2906 { 2907 continue; 2908 } 2909 else 2910 { 2911 logMessage( 2912 "Exiting this thread after the QUEUE GET " + 2913 "request failed " + maxErrors + 2914 " consecutive times", null); 2915 2916 return; // Exit STAFQueueMonitor thread 2917 } 2918 } 2919 2920 try 2921 { 2922 // Unmarshall the result from a QUEUE GET request, but 2923 // don't unmarshall indirect objects 2924 2925 mc = STAFMarshallingContext.unmarshall( 2926 result.result, 2927 STAFMarshallingContext.IGNORE_INDIRECT_OBJECTS); 2928 2929 messageList = (List)mc.getRootObject(); 2930 } 2931 catch (Throwable t) 2932 { 2933 // Log an error message and continue 2934 2935 logMessage( 2936 "Exception unmarshalling queued messages. " + 2937 "\nMarshalled string: " + result.result, t); 2938 continue; 2939 } 2940 2941 // Iterate through the list of messages removed from our 2942 // handle's queue and process each message 2943 2944 Iterator iter = messageList.iterator(); 2945 STAXSTAFMessage msg; 2946 2947 while (iter.hasNext()) 2948 { 2949 // Process the message 2950 2951 try 2952 { 2953 msg = new STAXSTAFMessage((Map)iter.next()); 2954 } 2955 catch (Throwable t) 2956 { 2957 // Log an error message and continue 2958 2959 logMessage("Exception handling a queued message.", t); 2960 continue; 2961 } 2962 2963 String type = msg.getType(); 2964 2965 if (type != null && 2966 type.equalsIgnoreCase("STAF/Service/STAX/End")) 2967 { 2968 return; // Exit STAFQueueMonitorThread 2969 } 2970 2971 try 2972 { 2973 notifyListeners(msg); 2974 } 2975 catch (Throwable t) 2976 { 2977 // Log an error message and continue 2978 2979 logMessage( 2980 "Exception notifying listeners for a message " + 2981 "with type '" + type + 2982 "' from handle " + msg.getHandle() + 2983 " on machine " + msg.getMachine() + 2984 " \nMessage: " + msg.getMessage(), t); 2985 continue; 2986 } 2987 } 2988 } 2989 } 2990 notifyListeners(STAXSTAFMessage msg)2991 public void notifyListeners(STAXSTAFMessage msg) 2992 { 2993 // Perform a lookup of registered message handlers for 2994 // this message type and pass the STAXSTAFMessage to 2995 // the registered message handlers. 2996 2997 String theType = msg.getType(); 2998 int listenersFound = 0; 2999 3000 synchronized (fQueueListenerMap) 3001 { 3002 Iterator mapIter = fQueueListenerMap.keySet().iterator(); 3003 3004 while (mapIter.hasNext()) 3005 { 3006 String msgType = (String)mapIter.next(); 3007 3008 if ((msgType == null) || 3009 // Messages from 3.x clients that STAX is interested 3010 // in will have a type 3011 (theType != null && 3012 theType.equalsIgnoreCase(msgType)) || 3013 // The following is for messages from 2.x clients 3014 // which will have a null type and the message will 3015 // begin with the type (which for processes will be 3016 // STAF/PROCESS/END instead of STAF/Process/End 3017 (theType == null && msg.getMessage().toUpperCase(). 3018 startsWith(msgType.toUpperCase()))) 3019 { 3020 TreeSet listenerSet = (TreeSet)fQueueListenerMap.get( 3021 msgType); 3022 3023 if (listenerSet == null) continue; 3024 3025 Iterator iter = listenerSet.iterator(); 3026 3027 while (iter.hasNext()) 3028 { 3029 STAXSTAFQueueListener listener = 3030 (STAXSTAFQueueListener)iter.next(); 3031 3032 listener.handleQueueMessage(msg, fJob); 3033 listenersFound++; 3034 } 3035 } 3036 } 3037 3038 if ((listenersFound == 0) && 3039 (theType.equals("STAF/RequestComplete") || 3040 theType.equals("STAF/Process/End"))) 3041 { 3042 // Log a warning message in the job log as this could 3043 // indicate a problem in the how the STAX service is 3044 // handling these messages 3045 3046 try 3047 { 3048 fJob.log(STAXJob.JOB_LOG, "warning", 3049 "STAXJob.notifyListeners: " + 3050 "No listener found for message:\n" + 3051 STAFMarshallingContext.unmarshall( 3052 msg.getResult()).toString()); 3053 } 3054 catch (Throwable t) 3055 { 3056 // Ignore 3057 } 3058 } 3059 } 3060 } 3061 3062 STAXJob fJob; 3063 3064 } // end STAFQueueMonitor 3065 3066 } 3067