1 /*****************************************************************************/ 2 /* Software Testing Automation Framework (STAF) */ 3 /* (C) Copyright IBM Corp. 2002, 2004, 2005 */ 4 /* */ 5 /* This software is licensed under the Eclipse Public License (EPL) V1.0. */ 6 /*****************************************************************************/ 7 8 package com.ibm.staf.service.stax; 9 import com.ibm.staf.*; 10 import com.ibm.staf.wrapper.STAFLog; 11 import java.util.Map; 12 import java.util.HashMap; 13 import java.util.LinkedHashMap; 14 import java.util.TreeMap; 15 import java.util.TreeSet; 16 import java.util.LinkedHashSet; 17 import java.util.LinkedList; 18 import java.util.ArrayList; 19 import java.util.Iterator; 20 import java.util.List; 21 import java.util.Comparator; 22 import java.io.File; 23 import java.io.FileWriter; 24 import java.io.IOException; 25 import java.io.PrintWriter; 26 import java.io.StringWriter; 27 28 import java.util.Vector; 29 import java.util.StringTokenizer; 30 import java.util.Date; 31 import java.util.Set; 32 33 import org.python.core.Py; 34 35 import org.python.core.PyObject; 36 import org.python.core.PyCode; 37 import org.python.core.CompilerFlags; 38 39 /* Jython 2.1: 40 import org.python.core.__builtin__; 41 */ 42 // Jython 2.5: 43 import org.python.core.CompileMode; 44 45 /** 46 * The representation of a STAX job. A STAX job is created by the STAXParser 47 * when the STAX service receives an EXECUTE request specifying a file or string 48 * containing XML that defines a STAX job. 49 */ 50 public class STAXJob implements STAXThreadCompleteListener, 51 STAXSTAFQueueListener 52 { 53 // For debugging - Counts and prints the number of cache gets/adds 54 static final boolean COUNT_PYCODE_CACHES = false; 55 56 static final String STAX_JOB_EVENT = new String("Job"); 57 58 // Logfile types: 59 60 /** 61 * Indicates to log to the STAX Service Log file 62 */ 63 static final int SERVICE_LOG = 1; 64 65 /** 66 * Indicates to log to the STAX Job Log file 67 */ 68 public static final int JOB_LOG = 2; 69 70 /** 71 * Indicates to log to the STAX Job User Log file 72 */ 73 public static final int USER_JOB_LOG = 3; 74 75 /** 76 * Indicates to log to the JVM Log file 77 */ 78 public static final int JVM_LOG = 4; 79 80 public static final int NO_NOTIFY_ONEND = 0; 81 public static final int NOTIFY_ONEND_BY_HANDLE = 1; 82 public static final int NOTIFY_ONEND_BY_NAME = 2; 83 84 /** 85 * Job completion status codes and their string values 86 */ 87 public static final int ABNORMAL_STATUS = -1; 88 public static final int NORMAL_STATUS = 0; 89 public static final int TERMINATED_STATUS = 1; 90 public static final int UNKNOWN_STATUS = 2; 91 92 public static final String ABNORMAL_STATUS_STRING = "Abnormal"; 93 public static final String NORMAL_STATUS_STRING = "Normal"; 94 public static final String TERMINATED_STATUS_STRING = "Terminated"; 95 public static final String UNKNOWN_STATUS_STRING = "Unknown"; 96 97 /** 98 * Job state codes and their string values 99 */ 100 public static final int PENDING_STATE = 0; 101 public static final int RUNNING_STATE = 1; 102 public static final String PENDING_STATE_STRING = "Pending"; 103 public static final String RUNNING_STATE_STRING = "Running"; 104 105 static final int BREAKPOINT_FUNCTION = 0; 106 static final int BREAKPOINT_LINE = 1; 107 108 /** 109 * Creates a new STAXJob instance passing in a STAX object which 110 * represents the STAX service that is executing this job. 111 */ STAXJob(STAX staxService)112 public STAXJob(STAX staxService) { 113 this(staxService, new STAXDocument()); 114 } 115 116 /** 117 * Creates a new STAXJob instance using an existing STAX 118 * document. 119 * 120 * @param staxService the STAX service. 121 * @param document the STAX document to be executed by 122 * this job. 123 */ STAXJob(STAX staxService, STAXDocument document)124 public STAXJob(STAX staxService, STAXDocument document) { 125 fSTAX = staxService; 126 fDocument = document; 127 STAXThread thread = new STAXThread(this); 128 thread.addCompletionNotifiee(this); 129 fThreadMap.put(thread.getThreadNumberAsInteger(), thread); 130 fClearlogs = fSTAX.getClearlogs(); 131 fLogTCElapsedTime = fSTAX.getLogTCElapsedTime(); 132 fLogTCNumStarts = fSTAX.getLogTCNumStarts(); 133 fLogTCStartStop = fSTAX.getLogTCStartStop(); 134 fPythonOutput = fSTAX.getPythonOutput(); 135 fPythonLogLevel = fSTAX.getPythonLogLevel(); 136 fInvalidLogLevelAction = fSTAX.getInvalidLogLevelAction(); 137 fMaxSTAXThreads = fSTAX.getMaxSTAXThreads(); 138 } 139 140 /** 141 * Gets the STAX object which represents the job's STAX service 142 * @return an instance of the job's STAX service 143 */ getSTAX()144 public STAX getSTAX() { return fSTAX; } 145 146 /** 147 * Gets the STAX document that is executed by this job. 148 * 149 * @return the job's STAX document. 150 */ getSTAXDocument()151 public STAXDocument getSTAXDocument() { return fDocument; } 152 153 /** 154 * Sets the STAX document to be executed by this job 155 * @param document the STAX document to be executed by this job 156 */ setSTAXDocument(STAXDocument document)157 public void setSTAXDocument(STAXDocument document) 158 { 159 fDocument = document; 160 } 161 162 /** 163 * Gets the next number for a thread in a job 164 * @return a number for the next thread in a job 165 */ getNextThreadNumber()166 public int getNextThreadNumber() 167 { 168 synchronized (fNextThreadNumberSynch) 169 { 170 return fNextThreadNumber++; 171 } 172 } 173 174 /** 175 * Gets the next number for a breakpoint in a job 176 * @return a number for the next breakpoint in a job 177 */ getNextBreakpointNumber()178 public int getNextBreakpointNumber() 179 { 180 synchronized (fNextBreakpointNumberSynch) 181 { 182 return fNextBreakpointNumber++; 183 } 184 } 185 setDefaultCallAction(STAXCallAction action)186 public void setDefaultCallAction(STAXCallAction action) 187 { 188 fDefaultCallAction = action; 189 } 190 191 /** 192 * Gets the name of the function that should be called to start the 193 * execution of a job 194 * @return the name of the starting function for a job 195 */ getStartFunction()196 public String getStartFunction() { return fDocument.getStartFunction(); } 197 198 /** 199 * Sets the name of the function that should be called to start the 200 * execution of a job 201 * @param startFunction the name of the starting function for a job 202 */ setStartFunction(String startFunction)203 public void setStartFunction(String startFunction) 204 { fDocument.setStartFunction(startFunction); } 205 getStartFuncArgs()206 public String getStartFuncArgs() { return fDocument.getStartFunctionArgs(); } 207 setStartFuncArgs(String startFuncArgs)208 public void setStartFuncArgs(String startFuncArgs) 209 { fDocument.setStartFuncArgs(startFuncArgs); } 210 setExecuteAndHold(String holdTimeout)211 public void setExecuteAndHold(String holdTimeout) 212 { fExecuteAndHold = holdTimeout; } 213 214 // Sets the value of the FUNCTION option on a STAX EXECUTE request setStartFunctionOverride(String startFunction)215 public void setStartFunctionOverride(String startFunction) 216 { fStartFunction = startFunction; } 217 getStartFunctionOverride()218 public String getStartFunctionOverride() { return fStartFunction; } 219 220 // Sets the value of the ARGS option on a STAX EXECUTE request setStartFuncArgsOverride(String startFuncArgs)221 public void setStartFuncArgsOverride(String startFuncArgs) 222 { fStartFuncArgs = startFuncArgs; } 223 getStartFuncArgsOverride()224 public String getStartFuncArgsOverride() { return fStartFuncArgs; } 225 getJobName()226 public String getJobName() { return fJobName; } 227 setJobName(String jobName)228 public void setJobName(String jobName) { fJobName = jobName; } 229 getJobNumber()230 public int getJobNumber() { return fJobNumber; } 231 setJobNumber(int jobNumber)232 public void setJobNumber(int jobNumber) { fJobNumber = jobNumber; } 233 getNextProcNumber()234 public int getNextProcNumber() 235 { 236 synchronized (fNextProcNumberSynch) 237 { 238 return fProcNumber++; 239 } 240 } 241 getNextCmdNumber()242 public int getNextCmdNumber() 243 { 244 synchronized (fNextCmdNumberSynch) 245 { 246 return fCmdNumber++; 247 } 248 } 249 getNextProcessKey()250 public int getNextProcessKey() 251 { 252 synchronized (fNextProcessKeySynch) 253 { 254 return fProcessKey++; 255 } 256 } getJobDataDir()257 public String getJobDataDir() { return fJobDataDir; } 258 setJobDataDir(String jobDataDir)259 public void setJobDataDir(String jobDataDir) { fJobDataDir = jobDataDir; } 260 getXmlMachine()261 public String getXmlMachine() { return fXmlMachine; } 262 setXmlMachine(String machName)263 public void setXmlMachine(String machName) 264 { fXmlMachine = machName; } 265 getXmlFile()266 public String getXmlFile() { return fXmlFile; } 267 setXmlFile(String fileName)268 public void setXmlFile(String fileName) { fXmlFile = fileName; } 269 getScripts()270 public List<String> getScripts() { return fScripts; } 271 setScript(String script)272 public void setScript(String script) { fScripts.add(script); } 273 getScriptFiles()274 public List<String> getScriptFiles() { return fScriptFiles; } 275 setScriptFile(String fileName)276 public void setScriptFile(String fileName) { fScriptFiles.add(fileName); } 277 getScriptFileMachine()278 public String getScriptFileMachine() { return fScriptFileMachine; } 279 setScriptFileMachine(String machName)280 public void setScriptFileMachine(String machName) 281 { fScriptFileMachine = machName; } 282 getSourceMachine()283 public String getSourceMachine() { return fSourceMachine; } 284 setSourceMachine(String machName)285 public void setSourceMachine(String machName) 286 { fSourceMachine = machName; } 287 getSourceHandleName()288 public String getSourceHandleName() { return fSourceHandleName; } 289 setSourceHandleName(String handleName)290 public void setSourceHandleName(String handleName) 291 { fSourceHandleName = handleName; } 292 getSourceHandle()293 public int getSourceHandle() { return fSourceHandle; } 294 setSourceHandle(int handle)295 public void setSourceHandle(int handle) 296 { fSourceHandle = handle; } 297 getClearlogs()298 public boolean getClearlogs() { return fClearlogs; } 299 getClearLogsAsString()300 public String getClearLogsAsString() 301 { 302 if (fClearlogs) 303 return "Enabled"; 304 else 305 return "Disabled"; 306 } 307 setClearlogs(boolean clearlogs)308 public void setClearlogs(boolean clearlogs) 309 { fClearlogs = clearlogs; } 310 getMaxSTAXThreads()311 public int getMaxSTAXThreads() { return fMaxSTAXThreads; } 312 getWaitTimeout()313 public String getWaitTimeout() { return fWaitTimeout; } 314 setWaitTimeout(String timeout)315 public void setWaitTimeout(String timeout) 316 { fWaitTimeout = timeout; } 317 getNotifyOnEnd()318 public int getNotifyOnEnd() { return fNotifyOnEnd; } 319 getNotifyOnEndAsString()320 public String getNotifyOnEndAsString() 321 { 322 if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_NAME) 323 return "By Name"; 324 else if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_HANDLE) 325 return "By Handle"; 326 else 327 return "No"; 328 } 329 setNotifyOnEnd(int notifyFlag)330 public void setNotifyOnEnd(int notifyFlag) { fNotifyOnEnd = notifyFlag; } 331 getState()332 public int getState() { return fState; } 333 getStateAsString()334 public String getStateAsString() 335 { 336 if (fState == PENDING_STATE) 337 return PENDING_STATE_STRING; 338 else 339 return RUNNING_STATE_STRING; 340 } 341 setState(int state)342 public void setState(int state) { fState = state; } 343 getResult()344 public PyObject getResult() { return fResult; } 345 setResult(PyObject result)346 public void setResult(PyObject result) 347 { fResult = result; } 348 getCompletionStatus()349 public int getCompletionStatus() { return fCompletionStatus; } 350 getCompletionStatusAsString()351 public String getCompletionStatusAsString() 352 { 353 if (fCompletionStatus == STAXJob.NORMAL_STATUS) 354 return STAXJob.NORMAL_STATUS_STRING; 355 else if (fCompletionStatus == STAXJob.TERMINATED_STATUS) 356 return STAXJob.TERMINATED_STATUS_STRING; 357 else if (fCompletionStatus == STAXJob.ABNORMAL_STATUS) 358 return STAXJob.ABNORMAL_STATUS_STRING; 359 else 360 return STAXJob.UNKNOWN_STATUS_STRING; 361 } 362 setCompletionStatus(int status)363 public void setCompletionStatus(int status) 364 { 365 fCompletionStatus = status; 366 } 367 getJobLogErrorsMC()368 public STAFMarshallingContext getJobLogErrorsMC() 369 { 370 return fJobLogErrorsMC; 371 } 372 getTestcaseList()373 public List<Map<String, Object>> getTestcaseList() { return fTestcaseList; } 374 setTestcaseList(List<Map<String, Object>> testcaseList)375 public void setTestcaseList(List<Map<String, Object>> testcaseList) 376 { fTestcaseList = testcaseList; } 377 getTestcaseTotalsMap()378 public Map<String, String> getTestcaseTotalsMap() 379 { return fTestcaseTotalsMap; } 380 setTestcaseTotalsMap(Map<String, String> testcaseTotalsMap)381 public void setTestcaseTotalsMap(Map<String, String> testcaseTotalsMap) 382 { fTestcaseTotalsMap = testcaseTotalsMap; } 383 getStartTimestamp()384 public STAXTimestamp getStartTimestamp() { return fStartTimestamp; } 385 setStartTimestamp()386 public void setStartTimestamp() 387 { 388 // Get the current date and time and set as the starting date/time 389 fStartTimestamp = new STAXTimestamp(); 390 } 391 getEndTimestamp()392 public STAXTimestamp getEndTimestamp() { return fEndTimestamp; } 393 getJobNumberAsInteger()394 public Integer getJobNumberAsInteger() { return new Integer(fJobNumber); } 395 getSTAFHandle()396 public STAFHandle getSTAFHandle() { return fHandle; } 397 setSTAFHandle()398 public void setSTAFHandle() throws STAFException 399 { 400 fHandle = new STAFHandle("STAX/Job/" + fJobNumber); 401 } 402 getLogTCElapsedTime()403 public boolean getLogTCElapsedTime() { return fLogTCElapsedTime; } 404 getLogTCElapsedTimeAsString()405 public String getLogTCElapsedTimeAsString() 406 { 407 if (fLogTCElapsedTime) 408 return "Enabled"; 409 else 410 return "Disabled"; 411 } 412 setLogTCElapsedTime(boolean logTCElapsedTime)413 public void setLogTCElapsedTime(boolean logTCElapsedTime) 414 { fLogTCElapsedTime = logTCElapsedTime; } 415 getLogTCNumStarts()416 public boolean getLogTCNumStarts() { return fLogTCNumStarts; } 417 getLogTCNumStartsAsString()418 public String getLogTCNumStartsAsString() 419 { 420 if (fLogTCNumStarts) 421 return "Enabled"; 422 else 423 return "Disabled"; 424 } 425 setLogTCNumStarts(boolean logTCNumStarts)426 public void setLogTCNumStarts(boolean logTCNumStarts) 427 { fLogTCNumStarts = logTCNumStarts; } 428 getLogTCStartStop()429 public boolean getLogTCStartStop() { return fLogTCStartStop; } 430 getLogTCStartStopAsString()431 public String getLogTCStartStopAsString() 432 { 433 if (fLogTCStartStop) 434 return "Enabled"; 435 else 436 return "Disabled"; 437 } 438 setLogTCStartStop(boolean logTCStartStop)439 public void setLogTCStartStop(boolean logTCStartStop) 440 { fLogTCStartStop = logTCStartStop; } 441 getPythonOutput()442 public int getPythonOutput() { return fPythonOutput; } 443 setPythonOutput(int pythonOutput)444 public void setPythonOutput(int pythonOutput) 445 { 446 fPythonOutput = pythonOutput; 447 } 448 getPythonLogLevel()449 public String getPythonLogLevel() { return fPythonLogLevel; } 450 setPythonLogLevel(String logLevel)451 public void setPythonLogLevel(String logLevel) 452 { 453 fPythonLogLevel = logLevel; 454 } 455 getInvalidLogLevelAction()456 public int getInvalidLogLevelAction() { return fInvalidLogLevelAction; } 457 setInvalidLogLevelAction(int action)458 public void setInvalidLogLevelAction(int action) 459 { 460 fInvalidLogLevelAction = action; 461 } 462 getNumThreads()463 public int getNumThreads() 464 { 465 return fThreadMap.size(); 466 } 467 getThreadMapCopy()468 public Map<Integer, STAXThread> getThreadMapCopy() 469 { 470 return fThreadMap; 471 } 472 getThread(Integer threadNumber)473 public STAXThread getThread(Integer threadNumber) 474 { 475 synchronized (fThreadMap) 476 { 477 return fThreadMap.get(threadNumber); 478 } 479 } 480 addThread(STAXThread thread)481 public void addThread(STAXThread thread) 482 { 483 synchronized (fThreadMap) 484 { 485 fThreadMap.put(thread.getThreadNumberAsInteger(), thread); 486 } 487 } 488 addThreadIfDoesNotExceedMax(STAXThread thread)489 public void addThreadIfDoesNotExceedMax(STAXThread thread) 490 throws STAXExceedsMaxThreadsException 491 { 492 synchronized (fThreadMap) 493 { 494 if ((fMaxSTAXThreads != 0) && 495 (fThreadMap.size() >= fMaxSTAXThreads)) 496 { 497 throw new STAXExceedsMaxThreadsException( 498 "Exceeded MaxSTAXThreads=" + fMaxSTAXThreads + 499 " (the maximum number of STAX Threads that " + 500 "can be running simultaneously in a job)."); 501 } 502 503 fThreadMap.put(thread.getThreadNumberAsInteger(), thread); 504 } 505 } 506 removeThread(Integer threadNumber)507 public void removeThread(Integer threadNumber) 508 { 509 synchronized (fThreadMap) 510 { 511 fThreadMap.remove(threadNumber); 512 } 513 } 514 getTimedEventQueue()515 public STAXTimedEventQueue getTimedEventQueue() 516 { 517 if (fSTAX.getTimedEventQueuePerJob()) 518 return fTimedEventQueue; 519 else 520 return fSTAX.getTimedEventQueue(); 521 } 522 addFunction(STAXFunctionAction function)523 public void addFunction(STAXFunctionAction function) 524 { 525 fDocument.addFunction(function); 526 } 527 getFunction(String name)528 public STAXAction getFunction(String name) 529 { 530 return fDocument.getFunction(name); 531 } 532 getBreakpointFirstFunction()533 public boolean getBreakpointFirstFunction() 534 { 535 return fBreakpointFirstFunction; 536 } 537 setBreakpointFirstFunction(boolean breakpointFirstFunction)538 public void setBreakpointFirstFunction(boolean breakpointFirstFunction) 539 { 540 fBreakpointFirstFunction = breakpointFirstFunction; 541 } 542 getBreakpointSubjobFirstFunction()543 public boolean getBreakpointSubjobFirstFunction() 544 { 545 return fBreakpointSubjobFirstFunction; 546 } 547 setBreakpointSubjobFirstFunction( boolean breakpointSubjobFirstFunction)548 public void setBreakpointSubjobFirstFunction( 549 boolean breakpointSubjobFirstFunction) 550 { 551 fBreakpointSubjobFirstFunction = breakpointSubjobFirstFunction; 552 } 553 functionExists(String name)554 public boolean functionExists(String name) 555 { 556 return fDocument.functionExists(name); 557 } 558 addDefaultAction(STAXAction action)559 public void addDefaultAction(STAXAction action) 560 { 561 fDocument.addDefaultAction(action); 562 } 563 addCompletionNotifiee(STAXJobCompleteListener listener)564 public void addCompletionNotifiee(STAXJobCompleteListener listener) 565 { 566 synchronized (fCompletionNotifiees) 567 { 568 fCompletionNotifiees.addLast(listener); 569 } 570 } 571 addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee)572 public STAFResult addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee) 573 { 574 synchronized (fCompletionNotifiees) 575 { 576 // Check if the notifiee is already in the list 577 578 for (STAXJobCompleteListener listener : fCompletionNotifiees) 579 { 580 if (listener instanceof 581 com.ibm.staf.service.stax.STAXJobCompleteNotifiee) 582 { 583 STAXJobCompleteNotifiee aNotifiee = 584 (STAXJobCompleteNotifiee)listener; 585 586 if (aNotifiee.getMachine().equals(notifiee.getMachine()) && 587 aNotifiee.getHandle() == notifiee.getHandle() && 588 aNotifiee.getHandleName().equals(notifiee.getHandleName())) 589 { 590 return new STAFResult( 591 STAFResult.AlreadyExists, 592 "A notifiee is already registered for machine=" + 593 notifiee.getMachine() + 594 ", handle=" + notifiee.getHandle() + 595 ", handleName=" + notifiee.getHandleName()); 596 } 597 } 598 599 } 600 601 // Add to the end of the list 602 603 fCompletionNotifiees.addLast(notifiee); 604 } 605 606 return new STAFResult(STAFResult.Ok, ""); 607 } 608 removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee)609 public STAFResult removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee) 610 { 611 boolean found = false; 612 613 synchronized (fCompletionNotifiees) 614 { 615 // Check if notifiee exists. If so, remove the notifiee 616 617 Iterator<STAXJobCompleteListener> iter = 618 fCompletionNotifiees.iterator(); 619 620 while (iter.hasNext()) 621 { 622 STAXJobCompleteListener listener = iter.next(); 623 624 if (listener instanceof 625 com.ibm.staf.service.stax.STAXJobCompleteNotifiee) 626 { 627 STAXJobCompleteNotifiee aNotifiee = 628 (STAXJobCompleteNotifiee)listener; 629 630 if (aNotifiee.getMachine().equals(notifiee.getMachine()) && 631 aNotifiee.getHandle() == notifiee.getHandle() && 632 aNotifiee.getHandleName().equals(notifiee.getHandleName())) 633 { 634 // Remove notifiee from list 635 636 fCompletionNotifiees.remove(aNotifiee); 637 found = true; 638 return new STAFResult(STAFResult.Ok, ""); 639 } 640 } 641 } 642 } 643 644 return new STAFResult( 645 STAFResult.DoesNotExist, 646 "No notifiee registered for machine=" + notifiee.getMachine() + 647 ", handle=" + notifiee.getHandle() + 648 ", handleName=" + notifiee.getHandleName()); 649 } 650 getCompletionNotifiees()651 public LinkedList<STAXJobCompleteListener> getCompletionNotifiees() 652 { 653 synchronized(fCompletionNotifiees) 654 { 655 return new LinkedList<STAXJobCompleteListener> 656 (fCompletionNotifiees); 657 } 658 } 659 660 // Gets the compiled Jython code for the specified code string. 661 // Note that we cache compiled Jython code so that if it is used more 662 // than once, it eliminates the overhead of recompiling the code string 663 // each time it is executed. 664 // 665 // Input Arguments: 666 // - codeString: must be valid Python code; however, like ordinary 667 // Python code, the existence of variables will not be 668 // checked until the code is executed. 669 // - kind: is either 'exec' if the string is made up of 670 // statements, 'eval' if it is an expression. 671 getCompiledPyCode(String codeString, String kind)672 public PyCode getCompiledPyCode(String codeString, String kind) 673 { 674 synchronized(fCompiledPyCodeCache) 675 { 676 PyCode codeObject = fCompiledPyCodeCache.get(codeString); 677 678 if (codeObject == null) 679 { 680 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheAdds++; 681 682 if (kind.equals("eval")) 683 { 684 // Set to avoid error of setting code object to nothing 685 if (codeString.equals("")) codeString = "None"; 686 687 /* Jython 2.1: 688 codeObject = __builtin__.compile( 689 "STAXPyEvalResult = " + codeString, 690 "<pyEval string>", "exec"); 691 */ 692 // Jython 2.5: 693 codeObject = Py.compile_flags( 694 codeString, "<pyEval string>", 695 CompileMode.eval, 696 Py.getCompilerFlags().combine( 697 CompilerFlags.PyCF_SOURCE_IS_UTF8)); 698 } 699 else 700 { 701 /* Jython 2.1: 702 codeObject = __builtin__.compile( 703 codeString, "<pyExec string>", "exec"); 704 */ 705 // Jython 2.5: 706 codeObject = Py.compile_flags( 707 codeString, "<pyExec string>", 708 CompileMode.exec, 709 Py.getCompilerFlags().combine( 710 CompilerFlags.PyCF_SOURCE_IS_UTF8)); 711 } 712 713 fCompiledPyCodeCache.put(codeString, codeObject); 714 } 715 else 716 { 717 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheGets++; 718 } 719 720 return codeObject; 721 } 722 } 723 724 // Queue listener methods 725 registerSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)726 public void registerSTAFQueueListener(String msgType, 727 STAXSTAFQueueListener listener) 728 { 729 synchronized (fQueueListenerMap) 730 { 731 TreeSet<STAXSTAFQueueListener> listenerSet = 732 fQueueListenerMap.get(msgType); 733 734 if (listenerSet == null) 735 { 736 listenerSet = new TreeSet<STAXSTAFQueueListener>( 737 new STAXSTAFQueueListenerComparator()); 738 fQueueListenerMap.put(msgType, listenerSet); 739 } 740 741 listenerSet.add(listener); 742 } 743 } 744 unregisterSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)745 public void unregisterSTAFQueueListener(String msgType, 746 STAXSTAFQueueListener listener) 747 { 748 synchronized (fQueueListenerMap) 749 { 750 TreeSet<STAXSTAFQueueListener> listenerSet = 751 fQueueListenerMap.get(msgType); 752 753 if (listenerSet != null) 754 listenerSet.remove(listener); 755 } 756 } 757 758 // 759 // Data management functions 760 // 761 setData(String dataName, Object data)762 public boolean setData(String dataName, Object data) 763 { 764 // Return true if added successfully, else return false. 765 766 synchronized (fDataMap) 767 { 768 if (!fDataMap.containsKey(dataName)) 769 { 770 fDataMap.put(dataName, data); 771 return true; 772 } 773 } 774 775 return false; 776 } 777 getData(String dataName)778 public Object getData(String dataName) 779 { 780 synchronized (fDataMap) 781 { return fDataMap.get(dataName); } 782 } 783 removeData(String dataName)784 public boolean removeData(String dataName) 785 { 786 // Return true if removed successfully, else return false. 787 788 synchronized (fDataMap) 789 { 790 if (fDataMap.containsKey(dataName)) 791 { 792 fDataMap.remove(dataName); 793 return true; 794 } 795 } 796 797 return false; 798 } 799 800 // 801 // Execution methods 802 // 803 startExecution()804 public void startExecution() 805 throws STAFException, STAXException 806 { 807 // Check if the MAXRETURNFILESIZE setting is not 0 (no maximum size 808 // limit) and if so, set variable STAF/MaxReturnFileSize for the STAX 809 // job handle 810 811 if (fSTAX.getMaxReturnFileSize() != 0) 812 { 813 String request = "SET VAR " + STAFUtil.wrapData( 814 "STAF/MaxReturnFileSize=" + fSTAX.getMaxReturnFileSize()); 815 816 STAFResult result = fHandle.submit2("local", "VAR", request); 817 818 if (result.rc != STAFResult.Ok) 819 { 820 String msg = "The STAX service could not set the maximum " + 821 "file size variable for Job ID " + fJobNumber + 822 "\nSTAF local VAR " + request + " failed with RC=" + 823 result.rc + ", Result=" + result.result; 824 825 log(STAXJob.JOB_LOG, "error", msg); 826 } 827 } 828 829 if (fSTAX.getTimedEventQueuePerJob()) 830 { 831 // Create a STAXTimedEventQueue thread for this job and start it 832 fTimedEventQueue = new STAXTimedEventQueue(); 833 } 834 835 fSTAFQueueMonitor = new STAFQueueMonitor(this); 836 fSTAFQueueMonitor.start(); 837 838 // Initialize data for the Job 839 840 fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this) 841 { 842 public void visit(Object o, Iterator iter) 843 { 844 STAXJobManagementHandler handler = (STAXJobManagementHandler)o; 845 846 handler.initJob((STAXJob)fData); 847 } 848 }); 849 850 // Register to listen for messages that STAF requests have completed 851 852 registerSTAFQueueListener("STAF/RequestComplete", this); 853 854 // Get the main thread, thread 1 855 856 STAXThread thread = fThreadMap.get(new Integer(1)); 857 858 // Use a custom OutputStream to redirect the Python Interpreter's 859 // stdout and stderr to somewhere other than the JVM Log and/or 860 // to reformat the output (e.g. add a timestamp, job ID) when 861 // logging to the JVM Log. Use the log level specified for the 862 // job's python output. 863 864 thread.setPythonInterpreterStdout(new STAXPythonOutput(this)); 865 866 // Override the log level to use to always be "Error" for stderr 867 thread.setPythonInterpreterStderr(new STAXPythonOutput(this, "Error")); 868 869 // Get the starting function for the job 870 871 STAXAction action = fDocument.getFunction( 872 fDocument.getStartFunction()); 873 874 if (action == null) 875 { 876 throw new STAXInvalidStartFunctionException( 877 "'" + fDocument.getStartFunction() + 878 "' is not a valid function name. No function with " + 879 "this name is defined."); 880 } 881 else 882 { 883 // Call the main function passing any arguments 884 885 String startFunctionUneval = "'" + fDocument.getStartFunction() + 886 "'"; 887 888 if (fDefaultCallAction != null) 889 { 890 STAXCallAction callAction = fDefaultCallAction; 891 callAction.setFunction(startFunctionUneval); 892 callAction.setArgs(fDocument.getStartFunctionArgs()); 893 action = callAction; 894 } 895 else 896 { 897 STAXCallAction callAction = new STAXCallAction( 898 startFunctionUneval, fDocument.getStartFunctionArgs()); 899 callAction.setLineNumber( 900 "<External>", "<Error in ARGS option>"); 901 callAction.setXmlFile(getXmlFile()); 902 callAction.setXmlMachine(getXmlMachine()); 903 action = callAction; 904 } 905 } 906 907 ArrayList<STAXAction> actionList = new ArrayList<STAXAction>(); 908 909 // If HOLD was specified on EXECUTE request, add hold action as 910 // the first action in the actionList. 911 912 if (fExecuteAndHold != null) 913 { 914 if (fExecuteAndHold.length() == 0) 915 { 916 // Add a hold action with no timeout 917 918 actionList.add(new STAXHoldAction("'main'")); 919 } 920 else 921 { 922 // Add a hold action with a timeout 923 924 actionList.add( 925 new STAXHoldAction("'main'", "1", fExecuteAndHold)); 926 } 927 } 928 929 // Add additional Python variables about the Job 930 thread.pySetVar("STAXJobID", new Integer(fJobNumber)); 931 thread.pySetVar("STAXJobName", fJobName); 932 thread.pySetVar("STAXJobXMLFile", fXmlFile); 933 thread.pySetVar("STAXJobXMLMachine", fXmlMachine); 934 thread.pySetVar("STAXJobStartDate", fStartTimestamp.getDateString()); 935 thread.pySetVar("STAXJobStartTime", fStartTimestamp.getTimeString()); 936 thread.pySetVar("STAXJobSourceMachine", fSourceMachine); 937 thread.pySetVar("STAXJobSourceHandleName", fSourceHandleName); 938 thread.pySetVar("STAXJobSourceHandle", new Integer(fSourceHandle)); 939 thread.pySetVar("STAXJobStartFunctionName", 940 fDocument.getStartFunction()); 941 thread.pySetVar("STAXJobStartFunctionArgs", 942 fDocument.getStartFunctionArgs()); 943 thread.pySetVar("STAXCurrentFunction", Py.None); 944 thread.pySetVar("STAXCurrentXMLFile", fXmlFile); 945 thread.pySetVar("STAXCurrentXMLMachine", fXmlMachine); 946 947 if (!fScriptFileMachine.equals("")) 948 thread.pySetVar("STAXJobScriptFileMachine", fScriptFileMachine); 949 else 950 thread.pySetVar("STAXJobScriptFileMachine", fSourceMachine); 951 952 thread.pySetVar("STAXJobScriptFiles", fScriptFiles.toArray()); 953 thread.pySetVar("STAXJobHandle", fHandle); 954 955 thread.pySetVar("STAXServiceName", fSTAX.getServiceName()); 956 thread.pySetVar("STAXServiceMachine", fSTAX.getLocalMachineName()); 957 thread.pySetVar("STAXServiceMachineNickname", 958 fSTAX.getLocalMachineNickname()); 959 thread.pySetVar("STAXEventServiceName", fSTAX.getEventServiceName()); 960 thread.pySetVar("STAXEventServiceMachine", 961 fSTAX.getEventServiceMachine()); 962 thread.pySetVar("STAXServicePath", fSTAX.getServicePath()); 963 964 thread.pySetVar("STAXJobUserLog", new STAFLog( 965 STAFLog.MACHINE, 966 fSTAX.getServiceName().toUpperCase() + "_Job_" + 967 fJobNumber + "_User", 968 fHandle, 0)); 969 thread.pySetVar("STAXJobLogName", 970 fSTAX.getServiceName().toUpperCase() + 971 "_Job_" + fJobNumber); 972 thread.pySetVar("STAXJobUserLogName", 973 fSTAX.getServiceName().toUpperCase() + 974 "_Job_" + fJobNumber + "_User"); 975 976 thread.pySetVar("STAXJobWriteLocation", fJobDataDir); 977 thread.pySetVar("STAXMessageLog", new Integer(0)); 978 thread.pySetVar("STAXLogMessage", new Integer(0)); 979 980 if (fLogTCElapsedTime) 981 thread.pySetVar("STAXLogTCElapsedTime", new Integer(1)); 982 else 983 thread.pySetVar("STAXLogTCElapsedTime", new Integer(0)); 984 985 if (fLogTCNumStarts) 986 thread.pySetVar("STAXLogTCNumStarts", new Integer(1)); 987 else 988 thread.pySetVar("STAXLogTCNumStarts", new Integer(0)); 989 990 if (fLogTCStartStop) 991 thread.pySetVar("STAXLogTCStartStop", new Integer(1)); 992 else 993 thread.pySetVar("STAXLogTCStartStop", new Integer(0)); 994 995 thread.pySetVar("STAXPythonOutput", 996 STAXPythonOutput.getPythonOutputAsString( 997 getPythonOutput())); 998 999 thread.pySetVar("STAXPythonLogLevel", getPythonLogLevel()); 1000 1001 thread.pySetVar("STAXInvalidLogLevelAction", 1002 STAXLogAction.getInvalidLogLevelActionAsString( 1003 getInvalidLogLevelAction())); 1004 1005 // Add default signal handlers to the actionList for the main block. 1006 addDefaultSignalHandlers(actionList); 1007 1008 // Put the "main" function/block at the bottom of the stack, 1009 // with its action being a sequence group that contains the 1010 // default actions (scripts and signalhandlers) in the <stax> 1011 // element and then the call of the main function. 1012 1013 LinkedList<STAXAction> defaultActions = fDocument.getDefaultActions(); 1014 1015 while (!defaultActions.isEmpty()) 1016 { 1017 actionList.add(defaultActions.removeLast()); 1018 } 1019 1020 actionList.add(action); // Add call of main function 1021 1022 // Create a sequence action for the "main" function/block 1023 STAXActionDefaultImpl mainSequence = new STAXSequenceAction(actionList); 1024 mainSequence.setLineNumber("sequence", "<Internal>"); 1025 mainSequence.setXmlFile(getXmlFile()); 1026 mainSequence.setXmlMachine(getXmlMachine()); 1027 1028 STAXActionDefaultImpl mainBlock = new STAXBlockAction( 1029 "main", mainSequence); 1030 mainBlock.setLineNumber("block", "<Internal>"); 1031 mainBlock.setXmlFile(getXmlFile()); 1032 mainBlock.setXmlMachine(getXmlMachine()); 1033 1034 thread.pushAction(mainBlock); 1035 1036 // Change the job's state from pending to running 1037 fState = RUNNING_STATE; 1038 1039 // Generate the job is running event 1040 1041 HashMap<String, String> jobRunningMap = new HashMap<String, String>(); 1042 jobRunningMap.put("type", "job"); 1043 jobRunningMap.put("status", "run"); 1044 jobRunningMap.put("jobID", String.valueOf(fJobNumber)); 1045 jobRunningMap.put("startFunction", fDocument.getStartFunction()); 1046 jobRunningMap.put("jobName", fJobName); 1047 jobRunningMap.put("startTimestamp", 1048 fStartTimestamp.getTimestampString()); 1049 1050 generateEvent(STAXJob.STAX_JOB_EVENT, jobRunningMap, true); 1051 1052 thread.pySetVar("STAXJob", this); 1053 1054 thread.setBreakpointFirstFunction(fBreakpointFirstFunction); 1055 1056 // Schedule the thread to run 1057 thread.schedule(); 1058 } 1059 1060 /** 1061 * This is a recursive function that checks if a function needs to 1062 * import other xml files that contain functions it requires. 1063 * 1064 * Checks if the specified function has any function-import sub-elements 1065 * that specify files to be imported. If so, it parses the files (if not 1066 * already in the cache) and adds their required functions to the job's 1067 * fFunctionMap. For any new functions added to the job's fFunctionMap, 1068 * it recursively calls the addImportedFunctions method. 1069 */ addImportedFunctions(STAXFunctionAction functionAction)1070 public void addImportedFunctions(STAXFunctionAction functionAction) 1071 throws STAFException, STAXException 1072 { 1073 List<STAXFunctionImport> importList = functionAction.getImportList(); 1074 1075 if (importList.size() == 0) 1076 return; 1077 1078 // Process function-import list (since not empty) 1079 1080 String evalElem = "function-import"; 1081 int evalIndex = 0; 1082 1083 for (STAXFunctionImport functionImport : importList) 1084 { 1085 String machine = functionImport.getMachine(); 1086 String file = functionImport.getFile(); 1087 String directory = functionImport.getDirectory(); 1088 1089 boolean directorySpecified = false; 1090 1091 if (directory != null) 1092 directorySpecified = true; 1093 1094 boolean machineSpecified = false; 1095 1096 if (machine != null) 1097 { 1098 machineSpecified = true; 1099 1100 // Check if "machine" contains any STAF variables 1101 1102 if (machine.indexOf("{") != -1) 1103 { 1104 // Resolve variables on the local STAX service machine 1105 1106 STAFResult result = this.submitSync( 1107 "local", "VAR", "RESOLVE STRING " + 1108 STAFUtil.wrapData(machine)); 1109 1110 if (result.rc != STAFResult.Ok) 1111 { 1112 String errorMsg = "Cause: Error resolving STAF " + 1113 "variables in the \"machine\" attribute " + 1114 "for element type \" function-import\"," + 1115 "\nRC: " + result.rc + 1116 ", Result: " + result.result; 1117 1118 functionAction.setElementInfo( 1119 new STAXElementInfo( 1120 evalElem, "machine", evalIndex, errorMsg)); 1121 1122 throw new STAXFunctionImportException( 1123 STAXUtil.formatErrorMessage(functionAction)); 1124 } 1125 1126 machine = result.result; 1127 } 1128 } 1129 else 1130 { 1131 machine = fXmlMachine; 1132 } 1133 1134 // Check if file or directory contains any STAF variables. 1135 1136 String evalAttr = "file"; 1137 String unresValue = file; 1138 1139 if (directorySpecified) 1140 { 1141 evalAttr = "directory"; 1142 unresValue = directory; 1143 } 1144 1145 if (unresValue.indexOf("{") != -1) 1146 { 1147 // Resolve variables on the local STAX service machine 1148 1149 STAFResult result = this.submitSync( 1150 "local", "VAR", "RESOLVE STRING " + 1151 STAFUtil.wrapData(unresValue)); 1152 1153 if (result.rc != STAFResult.Ok) 1154 { 1155 String errorMsg = "Cause: Error resolving STAF " + 1156 "variables in the \"" + evalAttr + "\" attribute " + 1157 "for element type \"function-import\"." + 1158 "\nRC: " + result.rc + 1159 ", Result: " + result.result; 1160 1161 functionAction.setElementInfo( 1162 new STAXElementInfo( 1163 evalElem, evalAttr, evalIndex, errorMsg)); 1164 1165 throw new STAXFunctionImportException( 1166 STAXUtil.formatErrorMessage(functionAction)); 1167 } 1168 1169 if (!directorySpecified) 1170 file = result.result; 1171 else 1172 directory = result.result; 1173 } 1174 1175 // Handle any functions specified to be imported 1176 1177 String functions = functionImport.getFunctions(); 1178 1179 Vector<String> importedFunctionList = new Vector<String>(); 1180 1181 if (functions != null) 1182 { 1183 // Convert string containing a whitespace-separated list of 1184 // functions to a vector. 1185 // 1186 // Note: The tokenizer uses the default delimiter set, which 1187 // is " \t\n\r\f" and consists of the space character, 1188 // the tab character, the newline character, the 1189 // carriage-return character, and the form-feed character. 1190 // Delimiter characters themselves will not be treated as 1191 // tokens. 1192 1193 StringTokenizer st = new StringTokenizer(functions); 1194 1195 while (st.hasMoreTokens()) 1196 { 1197 importedFunctionList.add(st.nextToken()); 1198 } 1199 } 1200 1201 // Get the file separator for the machine where the import file/ 1202 // directory resides as this is needed to normalize the path name 1203 // and may also be needed to determine if its a relative file 1204 // name, and to get the parent directory. 1205 1206 STAFResult result = STAXFileCache.getFileSep( 1207 machine, this.getSTAX().getSTAFHandle()); 1208 1209 if (result.rc != STAFResult.Ok) 1210 { 1211 String errorMsg = "Cause: No response from machine \"" + 1212 machine + "\" when trying to get the contents of a " + 1213 "file specified by element type \"function-import\"." + 1214 "\nRC: " + result.rc + ", Result: " + result.result; 1215 1216 functionAction.setElementInfo( 1217 new STAXElementInfo( 1218 evalElem, "machine", evalIndex, errorMsg)); 1219 1220 throw new STAXFunctionImportException( 1221 STAXUtil.formatErrorMessage(functionAction)); 1222 } 1223 1224 String fileSep = result.result; 1225 1226 // Set a flag to indicate if the file/directory name is 1227 // case-sensitive (e.g. true if it resides on a Unix machine, 1228 // false if Windows) 1229 1230 boolean caseSensitiveFileName = true; 1231 1232 if (fileSep.equals("\\")) 1233 { 1234 // Windows machine so not case-sensitive 1235 1236 caseSensitiveFileName = false; 1237 } 1238 1239 // Import the file specified in the function-import element. 1240 // If the file specified in the function-import element isn't 1241 // already in the file cache, get the file and parse it. 1242 // Then, add the functions defined in this file to the main job's 1243 // function map. 1244 1245 // If no machine attribute is specified, then the file specified 1246 // could be a relative file name so need to assign its absolute 1247 // file name 1248 1249 if (!machineSpecified) 1250 { 1251 // Check if a relative path was specified 1252 1253 String entry = file; 1254 1255 if (directorySpecified) 1256 entry = directory; 1257 1258 if (STAXUtil.isRelativePath(entry, fileSep)) 1259 { 1260 // Assign the absolute name assuming it is relative 1261 // to the parent xml file's path 1262 1263 String currentFile = functionAction.getXmlFile(); 1264 1265 if (currentFile.equals(STAX.INLINE_DATA)) 1266 { 1267 // Cannot specify a relative file path if the parent 1268 // xml file is STAX.INLINE_DATA 1269 1270 String errorMsg = "Cause: Cannot specify a " + 1271 "relative path in attribute \"" + evalAttr + 1272 "\" for element type \"function-import\" when " + 1273 "the parent xml file is " + STAX.INLINE_DATA; 1274 1275 functionAction.setElementInfo( 1276 new STAXElementInfo( 1277 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1278 evalIndex, errorMsg)); 1279 1280 throw new STAXFunctionImportException( 1281 STAXUtil.formatErrorMessage(functionAction)); 1282 } 1283 1284 entry = STAXUtil.getParentPath(currentFile, fileSep) + 1285 entry; 1286 1287 if (!directorySpecified) 1288 file = entry; 1289 else 1290 directory = entry; 1291 } 1292 } 1293 1294 // Normalize the import file/directory name so that we have a 1295 // better chance at matching file names that are already cached 1296 1297 if (!directorySpecified) 1298 file = STAXUtil.normalizeFilePath(file, fileSep); 1299 else 1300 directory = STAXUtil.normalizeFilePath(directory, fileSep); 1301 1302 // Create a STAX XML Parser 1303 1304 STAXParser parser = null; 1305 1306 try 1307 { 1308 parser = new STAXParser(getSTAX()); 1309 } 1310 catch (Exception ex) 1311 { 1312 String errorMsg = ex.getClass().getName() + "\n" + 1313 ex.getMessage(); 1314 1315 functionAction.setElementInfo( 1316 new STAXElementInfo( 1317 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1318 evalIndex, errorMsg)); 1319 1320 throw new STAXFunctionImportException( 1321 STAXUtil.formatErrorMessage(functionAction)); 1322 } 1323 1324 // Create a list of files to process 1325 1326 List<String> theFileList = new ArrayList<String>(); 1327 1328 if (!directorySpecified) 1329 { 1330 // There will be just one file to process 1331 1332 theFileList.add(file); 1333 } 1334 else 1335 { 1336 // Submit a FS LIST DIRECTORY request for all *.xml files 1337 // in the directory 1338 1339 result = submitSync( 1340 machine, "FS", "LIST DIRECTORY " + 1341 STAFUtil.wrapData(directory) + 1342 " TYPE F EXT xml CASEINSENSITIVE"); 1343 1344 if (result.rc != 0) 1345 { 1346 evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME; 1347 String errorMsg = "Cause: "; 1348 1349 if (result.rc == STAFResult.NoPathToMachine) 1350 { 1351 errorMsg = errorMsg + "No response from machine "; 1352 evalAttr = "machine"; 1353 } 1354 else 1355 { 1356 errorMsg = errorMsg + "Error "; 1357 } 1358 1359 errorMsg = errorMsg + "when submitting a FS LIST " + 1360 "DIRECTORY request to list all *.xml files in the " + 1361 "directory specified by element type " + 1362 "\"function-import\":" + 1363 "\n Directory: " + directory + 1364 "\n Machine: " + machine + 1365 "\n\nRC: " + result.rc + ", Result: " + result.result; 1366 1367 functionAction.setElementInfo( 1368 new STAXElementInfo( 1369 evalElem, evalAttr, evalIndex, errorMsg)); 1370 1371 throw new STAXFunctionImportException( 1372 STAXUtil.formatErrorMessage(functionAction)); 1373 } 1374 1375 Iterator fileIter = ((List)result.resultObj).iterator(); 1376 1377 while (fileIter.hasNext()) 1378 { 1379 theFileList.add( 1380 directory + fileSep + (String)fileIter.next()); 1381 } 1382 } 1383 1384 for (String fileName : theFileList) 1385 { 1386 Date dLastModified = null; 1387 STAXJob job = null; 1388 1389 // If file caching is enabled, find the modification date of 1390 // the file being imported 1391 1392 if (getSTAX().getFileCaching()) 1393 { 1394 if (STAXFileCache.get().isLocalMachine(machine)) 1395 { 1396 File fileObj = new File(fileName); 1397 1398 // Make sure the file exists 1399 1400 if (fileObj.exists()) 1401 { 1402 long lastModified = fileObj.lastModified(); 1403 1404 if (lastModified > 0) 1405 { 1406 // Chop off the milliseconds because some 1407 // systems don't report modTime to milliseconds 1408 1409 lastModified = ((long)(lastModified/1000))*1000; 1410 1411 dLastModified = new Date(lastModified); 1412 } 1413 } 1414 } 1415 1416 if (dLastModified == null) 1417 { 1418 // Find the remote file mod time using STAF 1419 1420 STAFResult entryResult = submitSync( 1421 machine, "FS", "GET ENTRY " + 1422 STAFUtil.wrapData(fileName) + " MODTIME"); 1423 1424 if (entryResult.rc == 0) 1425 { 1426 String modDate = entryResult.result; 1427 dLastModified = STAXFileCache.convertSTAXDate( 1428 modDate); 1429 } 1430 } 1431 1432 // Check for an up-to-date file in the cache 1433 1434 if ((dLastModified != null) && 1435 STAXFileCache.get().checkCache( 1436 machine, fileName, dLastModified, 1437 caseSensitiveFileName)) 1438 { 1439 // Get the doc from cache 1440 1441 STAXDocument doc = STAXFileCache.get().getDocument( 1442 machine, fileName, caseSensitiveFileName); 1443 1444 if (doc != null) 1445 { 1446 job = new STAXJob(getSTAX(), doc); 1447 } 1448 } 1449 } 1450 1451 // If the file was not in cache, then retrieve it using STAF 1452 1453 if (job == null) 1454 { 1455 result = submitSync( 1456 machine, "FS", "GET FILE " + STAFUtil.wrapData( 1457 fileName)); 1458 1459 if (result.rc != 0) 1460 { 1461 evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME; 1462 String errorMsg = "Cause: "; 1463 1464 if (result.rc == STAFResult.NoPathToMachine) 1465 { 1466 errorMsg = errorMsg + "No response from machine "; 1467 evalAttr = "machine"; 1468 } 1469 else 1470 { 1471 errorMsg = errorMsg + "Error "; 1472 } 1473 1474 errorMsg = errorMsg + "when submitting a FS GET " + 1475 "request to get a file specified by element type " + 1476 "\"function-import\":" + 1477 "\n File: " + fileName + 1478 "\n Machine: " + machine + 1479 "\n\nRC: " + result.rc + ", Result: " + result.result; 1480 1481 functionAction.setElementInfo( 1482 new STAXElementInfo( 1483 evalElem, evalAttr, evalIndex, errorMsg)); 1484 1485 throw new STAXFunctionImportException( 1486 STAXUtil.formatErrorMessage(functionAction)); 1487 } 1488 1489 // Parse the XML document 1490 1491 try 1492 { 1493 job = parser.parse(result.result, fileName, machine); 1494 } 1495 catch (Exception ex) 1496 { 1497 String errorMsg = "Cause: " + 1498 ex.getClass().getName() + "\n"; 1499 1500 // Make sure that the error message contains the File: 1501 // and Machine: where the error occurred 1502 1503 if (ex.getMessage().indexOf("File: ") == -1 || 1504 ex.getMessage().indexOf("Machine: ") == -1) 1505 { 1506 errorMsg = errorMsg + "\nFile: " + fileName + 1507 ", Machine: " + machine; 1508 } 1509 1510 errorMsg = errorMsg + ex.getMessage(); 1511 1512 functionAction.setElementInfo( 1513 new STAXElementInfo( 1514 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1515 evalIndex, errorMsg)); 1516 1517 throw new STAXFunctionImportException( 1518 STAXUtil.formatErrorMessage(functionAction)); 1519 } 1520 1521 // Add the XML document to the cache 1522 1523 if (getSTAX().getFileCaching() && (dLastModified != null)) 1524 { 1525 STAXFileCache.get().addDocument( 1526 machine, fileName, job.getSTAXDocument(), 1527 dLastModified, caseSensitiveFileName); 1528 } 1529 } 1530 1531 // Get a map of the functions in this xml file being imported 1532 1533 HashMap<String, STAXFunctionAction> functionMap = 1534 job.getSTAXDocument().getFunctionMap(); 1535 1536 // Verify that if function names were specified in the 1537 // function-import element, verify that all the function names 1538 // specified exist in the xml file 1539 1540 if (!importedFunctionList.isEmpty()) 1541 { 1542 for (String functionName : importedFunctionList) 1543 { 1544 if (!functionMap.containsKey(functionName)) 1545 { 1546 // Function name specified in the function-import 1547 // element's "functions" attribute does not exist 1548 1549 String errorMsg = "Cause: Function \"" + 1550 functionName + "\" does not exist in file \"" + 1551 fileName + "\" on machine \"" + machine + "\"."; 1552 1553 functionAction.setElementInfo( 1554 new STAXElementInfo( 1555 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME, 1556 evalIndex, errorMsg)); 1557 1558 throw new STAXFunctionImportException( 1559 STAXUtil.formatErrorMessage(functionAction)); 1560 } 1561 } 1562 } 1563 1564 // Create a set containing the functions requested to be 1565 // imported. If no functions were specifically requested to 1566 // be imported, add all the functions in the xml file to the 1567 // set. 1568 1569 Set<String> functionSet; 1570 1571 if (importedFunctionList.isEmpty()) 1572 functionSet = functionMap.keySet(); 1573 else 1574 functionSet = new LinkedHashSet<String>(importedFunctionList); 1575 1576 // Add the functions requested to be imported via a 1577 // function-import element to the main job's fFunctionMap and 1578 // add any other functions that these functions also require 1579 // to the main job's fFunctionMap 1580 1581 Vector<String> requiredFunctionList = new Vector<String>(); 1582 1583 for (String functionName : functionSet) 1584 { 1585 STAXFunctionAction function = functionMap.get(functionName); 1586 1587 // If this function does not exist, add the function to 1588 // the job's fFunctionMap 1589 1590 if (!(functionExists(functionName))) 1591 { 1592 // Add the function to the job's function map 1593 1594 this.addFunction(function); 1595 1596 // Check if this function requires any other functions 1597 // and if so, add them to the list of required 1598 // functions 1599 1600 StringTokenizer requiredFunctions = 1601 new StringTokenizer(function.getRequires(), " "); 1602 1603 while (requiredFunctions.hasMoreTokens()) 1604 { 1605 requiredFunctionList.add( 1606 requiredFunctions.nextToken()); 1607 } 1608 1609 // Recursive call to add functions this functions 1610 // requires if specified by function-import elements 1611 1612 addImportedFunctions(function); 1613 } 1614 } 1615 1616 // Process required functions (functions that the imported 1617 // functions also require) 1618 1619 for (int i = 0; i < requiredFunctionList.size(); i++) 1620 { 1621 String functionName = 1622 (String)requiredFunctionList.elementAt(i); 1623 1624 if (!functionExists(functionName)) 1625 { 1626 addRequiredFunctions(functionName, functionMap); 1627 } 1628 } 1629 } // End while (fileListIter.hasNext()) 1630 1631 evalIndex++; 1632 } // End while 1633 } 1634 1635 /** 1636 * This recursive method adds the specified required function to the 1637 * main job's fFunctionMap. And, if this function has any function-import 1638 * elements, it recursively adds these functions that are required by the 1639 * imported function. And, if this function has any required functions 1640 * from the same xml file, it recursively adds these required functions. 1641 */ addRequiredFunctions(String functionName, HashMap functionMap)1642 private void addRequiredFunctions(String functionName, HashMap functionMap) 1643 throws STAFException, STAXException 1644 { 1645 STAXFunctionAction function = 1646 (STAXFunctionAction)functionMap.get(functionName); 1647 1648 this.addFunction(function); 1649 1650 // If this function has any function-import elements, recursively 1651 // add these functions from the imported xml file that are 1652 // required by the imported function to the main job's 1653 // fFunctionMap if not already present 1654 1655 addImportedFunctions(function); 1656 1657 // If this function has a "requires" attribute specifying 1658 // additional functions in the same xml file that it requires, 1659 // recursively add these required functions to the main job's 1660 // fFunctionMap if not already present 1661 1662 StringTokenizer requiredFunctions = new StringTokenizer( 1663 function.getRequires(), " "); 1664 1665 while (requiredFunctions.hasMoreElements()) 1666 { 1667 String reqFunctionName = (String)requiredFunctions.nextElement(); 1668 1669 if (!functionExists(reqFunctionName)) 1670 { 1671 addRequiredFunctions(reqFunctionName, functionMap); 1672 } 1673 } 1674 } 1675 addDefaultSignalHandlers(ArrayList<STAXAction> actionList)1676 public void addDefaultSignalHandlers(ArrayList<STAXAction> actionList) 1677 { 1678 // Array of default SignalHandlers. Each signal is described by 1679 // a signal name, an action, and a signal message variable name. 1680 // If the signal message variable name is not null, a message will 1681 // be sent to the STAX Monitor and logged with level 'error'. 1682 1683 String[][] defaultSHArray = 1684 { 1685 { 1686 "STAXPythonEvaluationError", "terminate", "STAXPythonEvalMsg" 1687 }, 1688 { 1689 "STAXProcessStartError", "continue", "STAXProcessStartErrorMsg" 1690 }, 1691 { 1692 "STAXProcessStartTimeout", "continue", 1693 "STAXProcessStartTimeoutMsg" 1694 }, 1695 { 1696 "STAXCommandStartError", "terminate", 1697 "STAXCommandStartErrorMsg" 1698 }, 1699 { 1700 "STAXFunctionDoesNotExist", "terminate", 1701 "STAXFunctionDoesNotExistMsg" 1702 }, 1703 { 1704 "STAXInvalidBlockName", "terminate", "STAXInvalidBlockNameMsg" 1705 }, 1706 { 1707 "STAXBlockDoesNotExist", "continue", "STAXBlockDoesNotExistMsg" 1708 }, 1709 { 1710 "STAXLogError", "continue", "STAXLogMsg" 1711 }, 1712 { 1713 "STAXTestcaseMissingError", "continue", 1714 "STAXTestcaseMissingMsg" 1715 }, 1716 { 1717 "STAXInvalidTcStatusResult", "continue", 1718 "STAXInvalidTcStatusResultMsg" 1719 }, 1720 { 1721 "STAXInvalidTimerValue", "terminate", 1722 "STAXInvalidTimerValueMsg" 1723 }, 1724 { 1725 "STAXNoSuchSignalHandler", "continue", 1726 "STAXNoSuchSignalHandlerMsg" 1727 }, 1728 { 1729 "STAXEmptyList", "continue", null 1730 }, 1731 { 1732 "STAXMaxThreadsExceeded", "terminate", 1733 "STAXMaxThreadsExceededMsg" 1734 }, 1735 { 1736 "STAXInvalidMaxThreads", "terminate", 1737 "STAXInvalidMaxThreadsMsg" 1738 }, 1739 { 1740 "STAXFunctionArgValidate", "terminate", 1741 "STAXFunctionArgValidateMsg" 1742 }, 1743 { 1744 "STAXImportError", "terminate", "STAXImportErrorMsg" 1745 }, 1746 { 1747 "STAXFunctionImportError", "terminate", 1748 "STAXFunctionImportErrorMsg" 1749 }, 1750 { 1751 "STAXInvalidTestcaseMode", "continue", 1752 "STAXInvalidTestcaseModeMsg" 1753 } 1754 }; 1755 1756 // Add default SignalHandlers to actionList 1757 1758 for (int i = 0; i < defaultSHArray.length; i++) 1759 { 1760 ArrayList<STAXAction> signalHandlerActionList = 1761 new ArrayList<STAXAction>(); 1762 1763 String signalName = defaultSHArray[i][0]; 1764 String signalAction = defaultSHArray[i][1]; 1765 String signalMsgVarName = defaultSHArray[i][2]; 1766 String signalMsgText = ""; 1767 1768 if (signalAction.equals("terminate")) 1769 { 1770 signalMsgText = "'" + signalName + " signal raised. " + 1771 "Terminating job. '" + " + " + signalMsgVarName; 1772 } 1773 else if (signalAction.equals("continue")) 1774 { 1775 signalMsgText = "'" + signalName + " signal raised. " + 1776 "Continuing job. '" + " + " + signalMsgVarName; 1777 } 1778 1779 if (signalMsgVarName == null) 1780 { 1781 // Add a No Operation (nop) Action to the action list 1782 1783 signalHandlerActionList.add(new STAXNopAction()); 1784 } 1785 else 1786 { 1787 // Add a Log Action to the action list 1788 1789 int logfile = STAXJob.JOB_LOG; 1790 1791 if (signalName == "STAXLogError") 1792 { 1793 // Log the error message in the STAX JVM log (otherwise 1794 // may get a duplicate STAXLogError signal) 1795 1796 logfile = STAXJob.JVM_LOG; 1797 } 1798 1799 // Log the message with level 'Error' and send the message 1800 // to the STAX Monitor 1801 1802 signalHandlerActionList.add(new STAXLogAction( 1803 signalMsgText, "'error'", "1", "1", logfile)); 1804 } 1805 1806 if (signalAction.equals("terminate")) 1807 { 1808 // Add a Terminate Job Action to the action list 1809 1810 signalHandlerActionList.add( 1811 new STAXTerminateAction("'main'")); 1812 } 1813 1814 // Add the signalhandler action to the action list 1815 1816 if (signalHandlerActionList.size() == 1) 1817 { 1818 // Don't need a STAXSequenceAction since only 1 action in list 1819 1820 actionList.add(new STAXSignalHandlerAction( 1821 "'" + signalName + "'", 1822 signalHandlerActionList.get(0))); 1823 } 1824 else 1825 { 1826 // Wrap the action list is a STAXSequenceAction 1827 1828 actionList.add(new STAXSignalHandlerAction( 1829 "'" + signalName + "'", 1830 new STAXSequenceAction(signalHandlerActionList))); 1831 } 1832 } 1833 } 1834 1835 // 1836 // Event methods 1837 // 1838 generateEvent(String eventSubType, Map<String, String> details)1839 public void generateEvent(String eventSubType, Map<String, String> details) 1840 { 1841 generateEvent(eventSubType, details, false); 1842 } 1843 generateEvent(String eventSubType, Map<String, String> details, boolean notifyAll)1844 public void generateEvent(String eventSubType, Map<String, String> details, 1845 boolean notifyAll) 1846 { 1847 // Check if event generation has been disabled and if so, do nothing 1848 1849 if (!getSTAX().getEventGeneration()) 1850 return; 1851 1852 // Generate an event that can be used to monitor a STAX job 1853 1854 StringBuffer detailsString = new StringBuffer(); 1855 String key = ""; 1856 String value = ""; 1857 1858 for (Map.Entry<String, String> entry : details.entrySet()) 1859 { 1860 key = entry.getKey(); 1861 value = entry.getValue(); 1862 1863 if (value != null) 1864 { 1865 detailsString.append("PROPERTY ").append( 1866 STAFUtil.wrapData(key + "=" + value)).append(" "); 1867 } 1868 } 1869 1870 // The type machine should always be the local machine 1871 // The details parm must already be in the :length: format 1872 submitSync( 1873 fSTAX.getEventServiceMachine(), 1874 fSTAX.getEventServiceName(), 1875 "GENERATE TYPE " + fSTAX.getServiceName().toUpperCase() + "/" + 1876 fSTAX.getLocalMachineName() + "/" + fJobNumber + 1877 " SUBTYPE " + STAFUtil.wrapData(eventSubType) + 1878 " ASYNC " + detailsString.toString()); 1879 1880 // Debug 1881 if (false) 1882 { 1883 STAX.logToJVMLog( 1884 "Debug", fJobNumber, 0, 1885 "STAXJob::GenerateEvent:\n" + "GENERATE TYPE " + 1886 fSTAX.getServiceName().toUpperCase() + "/" + 1887 fSTAX.getLocalMachineName() + "/" + fJobNumber + 1888 " SUBTYPE " + STAFUtil.wrapData(eventSubType) + 1889 " ASYNC " + details); 1890 } 1891 1892 if (notifyAll) 1893 { 1894 submitSync( 1895 fSTAX.getEventServiceMachine(), 1896 fSTAX.getEventServiceName(), 1897 "GENERATE TYPE " + fSTAX.getServiceName().toUpperCase() + "/" + 1898 fSTAX.getLocalMachineName() + 1899 " SUBTYPE " + STAFUtil.wrapData(eventSubType) + 1900 " ASYNC " + detailsString.toString()); 1901 1902 // Debug 1903 if (false) 1904 { 1905 STAX.logToJVMLog( 1906 "Debug", fJobNumber, 0, 1907 "STAXJob::GenerateEvent:\n" + "GENERATE TYPE " + 1908 fSTAX.getServiceName().toUpperCase() + "/" + 1909 fSTAX.getLocalMachineName() + 1910 " SUBTYPE " + STAFUtil.wrapData(eventSubType) + 1911 " ASYNC " + details); 1912 } 1913 } 1914 } 1915 log(int logfile, String level, String message)1916 public STAFResult log(int logfile, String level, String message) 1917 { 1918 if (logfile == STAXJob.JVM_LOG) 1919 { 1920 // Log to the JVM log instead of a STAX Job log 1921 1922 STAX.logToJVMLog(level, fJobNumber, 0, message); 1923 1924 return new STAFResult(); 1925 } 1926 1927 String serviceName = fSTAX.getServiceName().toUpperCase(); 1928 String logName; 1929 1930 // Don't resolve messages logged to the STAX_Service log and to 1931 // STAX Job logs to avoid possible RC 13 errors which would prevent 1932 // the message from being logged. 1933 1934 boolean noResolveMessage = false; 1935 1936 if (logfile == STAXJob.SERVICE_LOG) 1937 { 1938 logName = serviceName + "_Service"; 1939 noResolveMessage = true; 1940 } 1941 else if (logfile == STAXJob.JOB_LOG) 1942 { 1943 logName = serviceName + "_Job_" + fJobNumber; 1944 noResolveMessage = true; 1945 } 1946 else if (logfile == STAXJob.USER_JOB_LOG) 1947 { 1948 logName = serviceName + "_Job_" + fJobNumber + "_User"; 1949 } 1950 else 1951 { 1952 // Log to STAX_Service log if invalid logfile specified 1953 1954 STAXTimestamp currentTimestamp = new STAXTimestamp(); 1955 System.out.println(currentTimestamp.getTimestampString() + 1956 " STAX Service Error: Invalid Logfile " + 1957 logfile); 1958 1959 logName = serviceName + "_Service"; 1960 noResolveMessage = true; 1961 } 1962 1963 String logRequest = "LOG MACHINE LOGNAME " + 1964 STAFUtil.wrapData(logName) + " LEVEL " + level; 1965 1966 if (noResolveMessage) logRequest += " NORESOLVEMESSAGE"; 1967 1968 logRequest += " MESSAGE " + STAFUtil.wrapData(message); 1969 1970 STAFResult result = submitSync("LOCAL", "LOG", logRequest); 1971 1972 // Check if the result was unsuccessful except ignore the following 1973 // errors: 1974 // - UnknownService (2) in case the LOG service is not registered 1975 // - HandleDoesNotExist (5) in case the STAX job's handle has been 1976 // unregistered indicating the job has completed 1977 1978 if ((result.rc != STAFResult.Ok) && 1979 (result.rc != STAFResult.UnknownService) && 1980 (result.rc != STAFResult.HandleDoesNotExist)) 1981 { 1982 if (logfile != STAXJob.USER_JOB_LOG) 1983 { 1984 STAX.logToJVMLog( 1985 "Error", fJobNumber, 0, 1986 "STAXJob::log: Log request failed with RC " + result.rc + 1987 " and Result " + result.result + 1988 " level: " + level + 1989 " logRequest: " + logRequest); 1990 } 1991 else if ((result.rc == STAFResult.VariableDoesNotExist) && 1992 (!noResolveMessage)) 1993 { 1994 // Retry logging the error message without resolving 1995 // variables in the message 1996 1997 logRequest += " NORESOLVEMESSAGE"; 1998 1999 result = submitSync("LOCAL", "LOG", logRequest); 2000 } 2001 } 2002 2003 return result; 2004 } 2005 clearLogs()2006 public void clearLogs() 2007 { 2008 String serviceName = fSTAX.getServiceName().toUpperCase(); 2009 String jobLogName; 2010 String userJobLogName; 2011 2012 jobLogName = serviceName + "_Job_" + fJobNumber; 2013 userJobLogName = serviceName + "_Job_" + fJobNumber + "_User"; 2014 2015 String logRequest = "DELETE MACHINE " + 2016 fSTAX.getLocalMachineNickname() + " LOGNAME " + 2017 STAFUtil.wrapData(jobLogName) + " CONFIRM"; 2018 2019 STAFResult result = submitSync("LOCAL", "LOG", logRequest); 2020 2021 logRequest = "DELETE MACHINE " + fSTAX.getLocalMachineNickname() + 2022 " LOGNAME " + STAFUtil.wrapData(userJobLogName) + " CONFIRM"; 2023 2024 result = submitSync("LOCAL", "LOG", logRequest); 2025 } 2026 clearJobDataDir()2027 public void clearJobDataDir() 2028 { 2029 // Delete the job data directory and recreate it 2030 2031 File dir = new File(fJobDataDir); 2032 2033 if (dir.exists()) 2034 { 2035 String deleteDirRequest = "DELETE ENTRY " + 2036 STAFUtil.wrapData(fJobDataDir) + " RECURSE CONFIRM"; 2037 2038 submitSync("local", "FS", deleteDirRequest); 2039 } 2040 2041 if (!dir.exists()) 2042 { 2043 dir.mkdirs(); 2044 } 2045 } 2046 getBreakpointFunctionList()2047 public List<String> getBreakpointFunctionList() 2048 { 2049 return fBreakpointFunctionList; 2050 } 2051 setBreakpointFunctionList(List<String> functionList)2052 public void setBreakpointFunctionList(List<String> functionList) 2053 { 2054 synchronized(fBreakpointFunctionList) 2055 { 2056 fBreakpointFunctionList = functionList; 2057 } 2058 } 2059 addBreakpointFunction(String functionName)2060 public int addBreakpointFunction(String functionName) 2061 { 2062 int breakpointID = getNextBreakpointNumber(); 2063 2064 synchronized(fBreakpointFunctionList) 2065 { 2066 fBreakpointFunctionList.add(functionName); 2067 } 2068 2069 synchronized (fBreakpointsMap) 2070 { 2071 fBreakpointsMap.put(String.valueOf(breakpointID), 2072 new STAXBreakpoint(BREAKPOINT_FUNCTION, 2073 functionName, "", "", "")); 2074 } 2075 2076 return breakpointID; 2077 } 2078 isBreakpointFunction(String functionName)2079 public boolean isBreakpointFunction(String functionName) 2080 { 2081 synchronized (fBreakpointsMap) 2082 { 2083 for (STAXBreakpoint breakpoint : fBreakpointsMap.values()) 2084 { 2085 if (functionName.equals(breakpoint.getFunction())) 2086 { 2087 return true; 2088 } 2089 } 2090 } 2091 2092 return false; 2093 } 2094 getBreakpointLineList()2095 public List<String> getBreakpointLineList() 2096 { 2097 return fBreakpointLineList; 2098 } 2099 getBreakpointsMap()2100 public TreeMap<String, STAXBreakpoint> getBreakpointsMap() 2101 { 2102 return fBreakpointsMap; 2103 } 2104 setBreakpointLineList(List<String> lineList)2105 public void setBreakpointLineList(List<String> lineList) 2106 { 2107 synchronized(fBreakpointLineList) 2108 { 2109 fBreakpointLineList = lineList; 2110 } 2111 } 2112 removeBreakpoint(String id)2113 public STAFResult removeBreakpoint(String id) 2114 { 2115 synchronized(fBreakpointsMap) 2116 { 2117 if (!(fBreakpointsMap.containsKey(id))) 2118 { 2119 return new STAFResult(STAFResult.DoesNotExist, id); 2120 } 2121 else 2122 { 2123 fBreakpointsMap.remove(id); 2124 2125 return new STAFResult(STAFResult.Ok); 2126 } 2127 } 2128 } 2129 addBreakpointLine(String line, String file, String machine)2130 public int addBreakpointLine(String line, String file, String machine) 2131 { 2132 int breakpointID = getNextBreakpointNumber(); 2133 2134 synchronized(fBreakpointLineList) 2135 { 2136 fBreakpointLineList.add(line + " " + machine + " " + file); 2137 } 2138 2139 synchronized (fBreakpointsMap) 2140 { 2141 fBreakpointsMap.put(String.valueOf(breakpointID), 2142 new STAXBreakpoint(BREAKPOINT_LINE, 2143 "", line, file, machine)); 2144 } 2145 2146 return breakpointID; 2147 } 2148 isBreakpointLine(String line, String file, String machine)2149 public boolean isBreakpointLine(String line, String file, String machine) 2150 { 2151 synchronized (fBreakpointsMap) 2152 { 2153 for (STAXBreakpoint breakpoint : fBreakpointsMap.values()) 2154 { 2155 if (line.equals(breakpoint.getLine()) && 2156 file.equals(breakpoint.getFile()) && 2157 (breakpoint.getMachine().equals("") || 2158 machine.equalsIgnoreCase(breakpoint.getMachine()))) 2159 { 2160 return true; 2161 } 2162 } 2163 } 2164 2165 return false; 2166 } 2167 breakpointsEmpty()2168 public boolean breakpointsEmpty() 2169 { 2170 return (fBreakpointsMap.isEmpty()); 2171 } 2172 2173 // Submit methods 2174 submitAsync(String location, String service, String request, STAXSTAFRequestCompleteListener listener)2175 public STAFResult submitAsync(String location, String service, 2176 String request, STAXSTAFRequestCompleteListener listener) 2177 { 2178 STAFResult result; 2179 2180 synchronized(fRequestMap) 2181 { 2182 result = fHandle.submit2( 2183 STAFHandle.ReqQueue, location, service, request); 2184 2185 if (result.rc == STAFResult.Ok) 2186 { 2187 try 2188 { 2189 // Convert request number to a negative integer if greater 2190 // then Integer.MAX_VALUE 2191 2192 Integer requestNumber = STAXUtil.convertRequestNumber( 2193 result.result); 2194 2195 fRequestMap.put(requestNumber, listener); 2196 2197 result.result = requestNumber.toString(); 2198 } 2199 catch (NumberFormatException e) 2200 { 2201 STAX.logToJVMLog("Error", fJobNumber, 0, 2202 "STAXJob::submitAsync - " + e.toString()); 2203 2204 result.result = "0"; 2205 } 2206 } 2207 } 2208 2209 return result; 2210 } 2211 submitSync(String location, String service, String request)2212 public STAFResult submitSync(String location, String service, 2213 String request) 2214 { 2215 STAFHandle theHandle = fHandle; 2216 2217 // If the STAX job's handle has not yet been assigned, use the 2218 // STAX service's handle to submit the STAF service request 2219 2220 if (fHandle == null) 2221 theHandle = fSTAX.getSTAFHandle(); 2222 2223 STAFResult result = theHandle.submit2( 2224 STAFHandle.ReqSync, location, service, request); 2225 2226 return result; 2227 } 2228 submitAsyncForget(String location, String service, String request)2229 public STAFResult submitAsyncForget(String location, String service, 2230 String request) 2231 { 2232 STAFResult result = fHandle.submit2(STAFHandle.ReqFireAndForget, 2233 location, service, request); 2234 2235 return result; 2236 } 2237 2238 // STAXSTAFQueueListener method 2239 handleQueueMessage(STAXSTAFMessage message, STAXJob job)2240 public void handleQueueMessage(STAXSTAFMessage message, STAXJob job) 2241 { 2242 int requestNumber = message.getRequestNumber(); 2243 2244 STAXSTAFRequestCompleteListener listener = null; 2245 2246 synchronized (fRequestMap) 2247 { 2248 Integer key = new Integer(requestNumber); 2249 2250 listener = fRequestMap.get(key); 2251 2252 if (listener != null) 2253 { 2254 fRequestMap.remove(key); 2255 } 2256 } 2257 2258 if (listener != null) 2259 { 2260 listener.requestComplete(requestNumber, new STAFResult( 2261 message.getRequestRC(), message.getRequestResult())); 2262 } 2263 else 2264 { // Log a message in the job log 2265 String msg = "STAXJob.handleQueueMessage: " + 2266 " No listener found for message:\n" + 2267 STAFMarshallingContext.unmarshall( 2268 message.getResult()).toString(); 2269 job.log(STAXJob.JOB_LOG, "warning", msg); 2270 } 2271 } 2272 2273 // STAXThreadCompleteListener method 2274 threadComplete(STAXThread thread, int endCode)2275 public void threadComplete(STAXThread thread, int endCode) 2276 { 2277 // Debug: 2278 if (false) 2279 { 2280 STAX.logToJVMLog( 2281 "Debug", thread, 2282 "STAXJob::threadComplete: Thread #" + 2283 thread.getThreadNumber() + " complete"); 2284 } 2285 2286 // Flush the stdout and stderr to make sure that all output is 2287 // printed. This is because if a STAX job uses sys.stdout.write 2288 // or sys.stderr.write in a script element, it is possible that 2289 // these messages have not all been flushed from the buffer and, 2290 // thus, not logged by the STAXPythonOutput class. 2291 // This was added as requested by Bug #1510. 2292 try 2293 { 2294 thread.pyExec("sys.stdout.flush(); sys.stderr.flush()"); 2295 } 2296 catch (STAXPythonEvaluationException e) 2297 { 2298 // Do nothing 2299 } 2300 2301 boolean jobComplete = false; 2302 2303 synchronized (fThreadMap) 2304 { 2305 fThreadMap.remove(thread.getThreadNumberAsInteger()); 2306 2307 if (fThreadMap.isEmpty()) 2308 jobComplete = true; 2309 } 2310 2311 if (jobComplete == true) 2312 { 2313 // Perform terminateJob on interested parties 2314 2315 fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this) 2316 { 2317 public void visit(Object o, Iterator iter) 2318 { 2319 STAXJobManagementHandler handler = 2320 (STAXJobManagementHandler)o; 2321 2322 handler.terminateJob((STAXJob)fData); 2323 } 2324 }); 2325 2326 // End the STAXTimedEventQueue thread for the job 2327 if (fSTAX.getTimedEventQueuePerJob()) 2328 fTimedEventQueue.end(); 2329 2330 // Set the result so that it is available upon job completion. 2331 2332 String resultToEval = "STAXResult"; 2333 2334 try 2335 { 2336 if (thread.pyBoolEval("isinstance(STAXResult, STAXGlobal)")) 2337 { 2338 // Use the STAXGlobal class's get() method so that the job 2339 // result when using toString() will be it's contents and 2340 // not org.python.core.PyFinalizableInstance 2341 2342 resultToEval = "STAXResult.get()"; 2343 } 2344 } 2345 catch (STAXPythonEvaluationException e) 2346 { /* Ignore error and assume not a STAXGlobal object */ } 2347 2348 try 2349 { 2350 fResult = thread.pyObjectEval(resultToEval); 2351 } 2352 catch (STAXPythonEvaluationException e) 2353 { 2354 fResult = Py.None; 2355 } 2356 2357 // Log the result from the job in a Status message in the Job log 2358 2359 STAFMarshallingContext mc = STAFMarshallingContext. 2360 unmarshall(fResult.toString()); 2361 log(STAXJob.JOB_LOG, "status", "Job Result: " + mc); 2362 2363 // Log a Stop message for the job in the STAX Service and Job logs 2364 2365 String msg = "JobID: " + fJobNumber; 2366 log(STAXJob.SERVICE_LOG, "stop", msg); 2367 log(STAXJob.JOB_LOG, "stop", msg); 2368 2369 // Get the current date and time and set as job ending date/time 2370 2371 fEndTimestamp = new STAXTimestamp(); 2372 2373 // Generate job completion event 2374 2375 HashMap<String, String> jobEndMap = new HashMap<String, String>(); 2376 jobEndMap.put("type", "job"); 2377 jobEndMap.put("block", "main"); 2378 jobEndMap.put("status", "end"); 2379 jobEndMap.put("jobID", String.valueOf(fJobNumber)); 2380 jobEndMap.put("result", fResult.toString()); 2381 jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString()); 2382 2383 generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true); 2384 2385 // Query its job log for any messages with level "Error". 2386 // Save the result's marshalling context so it's available when 2387 // writing the job result to a file and when the RETURNRESULT 2388 // option is specified on a STAX EXECUTE request 2389 2390 fJobLogErrorsMC = getJobLogErrors(); 2391 2392 // Write the job result information to files in the STAX job 2393 // directory 2394 2395 writeJobResultsToFile(); 2396 2397 // Send a STAF/Service/STAX/End message to indicate the job has 2398 // completed 2399 2400 submitSync("local", "QUEUE", "QUEUE TYPE STAF/Service/STAX/End " + 2401 "MESSAGE " + STAFUtil.wrapData("")); 2402 2403 // Debug 2404 if (COUNT_PYCODE_CACHES && STAX.CACHE_PYTHON_CODE) 2405 { 2406 STAX.logToJVMLog( 2407 "Debug", thread, 2408 "STAXJob::threadComplete: Job " + fJobNumber + ": " + 2409 " cacheGets=" + fCompiledPyCodeCacheGets + 2410 " cacheAdds=" + fCompiledPyCodeCacheAdds); 2411 } 2412 2413 while (!fCompletionNotifiees.isEmpty()) 2414 { 2415 STAXJobCompleteListener listener = 2416 fCompletionNotifiees.removeFirst(); 2417 2418 if (listener != null) listener.jobComplete(this); 2419 } 2420 2421 try 2422 { 2423 fHandle.unRegister(); 2424 } 2425 catch (STAFException e) 2426 { 2427 /* Do Nothing */ 2428 } 2429 2430 // Clear out job's private variables 2431 2432 fScripts = new ArrayList<String>(); 2433 fScriptFiles = new ArrayList<String>(); 2434 fThreadMap = new LinkedHashMap<Integer, STAXThread>(); 2435 fCompletionNotifiees = new LinkedList<STAXJobCompleteListener>(); 2436 fCompiledPyCodeCache = new HashMap<String, PyCode>(); 2437 2438 // Commented out setting these variables to new (empty) objects, 2439 // as this could cause the "Testcases" and "Testcase Totals" values 2440 // in the job result to be empty due to timing issues 2441 //fTestcaseList = new ArrayList(); 2442 //fTestcaseTotalsMap = new HashMap(); 2443 2444 // Commented out setting the following variables to null because 2445 // STAFQueueMonitor.notifyListeners() is running in another 2446 // thread and could access these variables and get a NPE. 2447 // fQueueListenerMap = null; 2448 // fHandle = null; 2449 // fSTAFQueueMonitor = null; 2450 } 2451 } 2452 2453 /** 2454 * This method is called to clean up a job that is in a pending state. 2455 * This is a job that has had a job number assigned to it but did not 2456 * start execution because either an error occurred (e.g. xml parsing, 2457 * etc) before the job actually started execution or because the TEST 2458 * option was specified, which indicates to test that the job doesn't 2459 * contain xml parsing or Python compile errors and to not actually 2460 * run the job. 2461 */ cleanupPendingJob(STAFResult result)2462 public void cleanupPendingJob(STAFResult result) 2463 { 2464 STAXJob job = fSTAX.removeFromJobMap(getJobNumberAsInteger()); 2465 2466 if (job == null) 2467 { 2468 // The job is not in the job map 2469 return; 2470 } 2471 2472 if (result.rc != STAFResult.Ok) 2473 { 2474 setCompletionStatus(TERMINATED_STATUS); 2475 2476 // Log the error message in the job log 2477 2478 if (STAFMarshallingContext.isMarshalledData(result.result)) 2479 { 2480 try 2481 { 2482 STAFMarshallingContext mc = 2483 STAFMarshallingContext.unmarshall(result.result); 2484 Map resultMap = (Map)mc.getRootObject(); 2485 result.result = (String)resultMap.get("errorMsg"); 2486 } 2487 catch (Exception e) 2488 { 2489 STAX.logToJVMLog( 2490 "Error", fJobNumber, 0, 2491 "STAXJob::cleanupPendingJob: " + e.toString()); 2492 } 2493 } 2494 2495 log(STAXJob.JOB_LOG, "error", result.result); 2496 } 2497 2498 // Log the testcase totals in the Job log by first calling the 2499 // STAXTestcaseActionFactory's initJob() method (to initialize the 2500 // testcase totals to 0) and then by calling its terminateJob() method 2501 // which logs the testcase totals 2502 2503 STAXJobManagementHandler testcaseHandler = 2504 (STAXJobManagementHandler)fSTAX.getActionFactory("testcase"); 2505 2506 testcaseHandler.initJob(this); 2507 testcaseHandler.terminateJob(this); 2508 2509 // Log the result from the job in a Status message in the Job log 2510 2511 STAFMarshallingContext mc = STAFMarshallingContext. 2512 unmarshall(fResult.toString()); 2513 log(STAXJob.JOB_LOG, "status", "Job Result: " + mc); 2514 2515 // Log a Stop message for the job in the STAX Service and Job logs 2516 2517 String msg = "JobID: " + fJobNumber; 2518 log(STAXJob.SERVICE_LOG, "stop", msg); 2519 log(STAXJob.JOB_LOG, "stop", msg); 2520 2521 if (fEndTimestamp == null) 2522 fEndTimestamp = new STAXTimestamp(); 2523 2524 // Generate job completion event 2525 2526 HashMap<String, String> jobEndMap = new HashMap<String, String>(); 2527 jobEndMap.put("type", "job"); 2528 jobEndMap.put("block", "main"); 2529 jobEndMap.put("status", "end"); 2530 jobEndMap.put("jobID", String.valueOf(fJobNumber)); 2531 jobEndMap.put("result", fResult.toString()); 2532 jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString()); 2533 2534 generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true); 2535 2536 // Query its job log for any messages with level "Error". 2537 // Save the result's marshalling context so it's available when 2538 // writing the job result to a file and when the RETURNRESULT 2539 // option is specified on a STAX EXECUTE request 2540 2541 fJobLogErrorsMC = getJobLogErrors(); 2542 2543 // Write the job result information to files in the STAX job 2544 // directory 2545 2546 writeJobResultsToFile(); 2547 2548 if (fHandle != null) 2549 { 2550 // Unregister the job's handle 2551 try 2552 { 2553 fHandle.unRegister(); 2554 } 2555 catch (STAFException e) 2556 { 2557 /* Do Nothing */ 2558 } 2559 } 2560 } 2561 2562 /** 2563 * Query the STAX Job Log to get any messages with level "Error" that 2564 * were logged for this job. 2565 * 2566 * @return STAFMarshallingContext Return a marshalling context for the 2567 * result from the LOG QUERY request whose root object is a list of 2568 * error messages. 2569 */ getJobLogErrors()2570 private STAFMarshallingContext getJobLogErrors() 2571 { 2572 String jobLogName = fSTAX.getServiceName().toUpperCase() + "_Job_" + 2573 getJobNumber(); 2574 2575 String request = "QUERY MACHINE " + fSTAX.getLocalMachineNickname() + 2576 " LOGNAME " + STAFUtil.wrapData(jobLogName) + 2577 " LEVELMASK Error" + 2578 " FROM " + getStartTimestamp().getTimestampString(); 2579 2580 STAFResult result = submitSync("local", "LOG", request); 2581 2582 // RC 4010 from the LOG QUERY request means exceeded the default 2583 // maximum query records 2584 2585 if ((result.rc == STAFResult.Ok) || (result.rc == 4010)) 2586 return result.resultContext; 2587 2588 // Ignore the following errors from the LOG QUERY request: 2589 // - UnknownService (2) in case the LOG service is not registered 2590 // - HandleDoesNotExist (5) in case the STAX job's handle has been 2591 // unregistered indicating the job has completed 2592 2593 if ((result.rc != STAFResult.UnknownService) && 2594 (result.rc != STAFResult.HandleDoesNotExist)) 2595 { 2596 // Log an error in the JVM log 2597 2598 STAX.logToJVMLog( 2599 "Error", getJobNumber(), 0, 2600 "STAXJob::getJobLogErrors() failed for Job ID: " + 2601 getJobNumber() + "\nSTAF local LOG " + request + 2602 " RC=" + result.rc + ", Result=" + result.result); 2603 } 2604 2605 return null; 2606 } 2607 2608 /* 2609 * Write the marshalled job result information to files in the STAX job 2610 * directory 2611 */ writeJobResultsToFile()2612 private void writeJobResultsToFile() 2613 { 2614 // Create the marshalling context for the results without the testcase 2615 // list 2616 2617 STAFMarshallingContext resultMC = new STAFMarshallingContext(); 2618 resultMC.setMapClassDefinition(fSTAX.fResultMapClass); 2619 resultMC.setMapClassDefinition( 2620 STAXTestcaseActionFactory.fTestcaseTotalsMapClass); 2621 2622 Map<String, Object> resultMap = new TreeMap<String, Object>(); 2623 resultMap.put("staf-map-class-name", fSTAX.fResultMapClass.name()); 2624 2625 resultMap = addCommonFieldsToResultMap(resultMap); 2626 2627 resultMC.setRootObject(resultMap); 2628 2629 String marshalledResultString = resultMC.marshall(); 2630 2631 // Write the marshalled string for the results without the testcase 2632 // list to file marshalledResults.txt in directory fJobDataDir 2633 2634 writeStringToFile(fSTAX.getResultFileName(fJobNumber), 2635 marshalledResultString); 2636 2637 // Create the marshalling context for the results with the testcase 2638 // list 2639 2640 resultMC = new STAFMarshallingContext(); 2641 resultMC.setMapClassDefinition(fSTAX.fDetailedResultMapClass); 2642 resultMC.setMapClassDefinition( 2643 STAXTestcaseActionFactory.fTestcaseTotalsMapClass); 2644 resultMC.setMapClassDefinition( 2645 STAXTestcaseActionFactory.fQueryTestcaseMapClass); 2646 2647 resultMap = new TreeMap<String, Object>(); 2648 resultMap.put("staf-map-class-name", 2649 fSTAX.fDetailedResultMapClass.name()); 2650 2651 resultMap = addCommonFieldsToResultMap(resultMap); 2652 2653 resultMap.put("testcaseList", fTestcaseList); 2654 2655 resultMC.setRootObject(resultMap); 2656 2657 marshalledResultString = resultMC.marshall(); 2658 2659 // Write the marshalled string for the results with the testcase 2660 // list to file marshalledResultsLong.txt in directory fJobDataDir 2661 2662 writeStringToFile(fSTAX.getDetailedResultFileName(fJobNumber), 2663 marshalledResultString); 2664 } 2665 addCommonFieldsToResultMap( Map<String, Object> inResultMap)2666 private Map<String, Object> addCommonFieldsToResultMap( 2667 Map<String, Object> inResultMap) 2668 { 2669 Map<String, Object> resultMap = new HashMap<String, Object>(inResultMap); 2670 2671 if (!fJobName.equals("")) 2672 resultMap.put("jobName", fJobName); 2673 2674 resultMap.put("startTimestamp", 2675 fStartTimestamp.getTimestampString()); 2676 resultMap.put("endTimestamp", 2677 fEndTimestamp.getTimestampString()); 2678 resultMap.put("status", getCompletionStatusAsString()); 2679 resultMap.put("result", fResult.toString()); 2680 resultMap.put("testcaseTotals", fTestcaseTotalsMap); 2681 resultMap.put("jobLogErrors", fJobLogErrorsMC); 2682 resultMap.put("xmlFileName", fXmlFile); 2683 resultMap.put("fileMachine", fXmlMachine); 2684 resultMap.put("function", getStartFunction()); 2685 resultMap.put("arguments", 2686 STAFUtil.maskPrivateData(getStartFuncArgs())); 2687 resultMap.put("scriptList", fScripts); 2688 resultMap.put("scriptFileList", fScriptFiles); 2689 2690 if (!fScriptFileMachine.equals("")) 2691 resultMap.put("scriptMachine", fScriptFileMachine); 2692 2693 return resultMap; 2694 } 2695 2696 /** 2697 * Write the contents of a string to the specified file name 2698 */ writeStringToFile(String fileName, String data)2699 private void writeStringToFile(String fileName, String data) 2700 { 2701 FileWriter out = null; 2702 2703 try 2704 { 2705 File outFile = new File(fileName); 2706 out = new FileWriter(outFile); 2707 out.write(data); 2708 } 2709 catch (IOException e) 2710 { 2711 log(STAXJob.JVM_LOG, "Error", 2712 "Error writing to job results file: " + fileName + ".\n" + 2713 e.toString()); 2714 } 2715 finally 2716 { 2717 try 2718 { 2719 if (out != null) out.close(); 2720 } 2721 catch (IOException e) 2722 { 2723 // Do nothing 2724 } 2725 } 2726 } 2727 2728 private STAX fSTAX; 2729 private STAXDocument fDocument; 2730 private STAXCallAction fDefaultCallAction = null; 2731 private Object fNextThreadNumberSynch = new Object(); 2732 private int fNextThreadNumber = 1; 2733 private int fJobNumber = 0; 2734 private Object fNextProcNumberSynch = new Object(); 2735 private int fProcNumber = 1; 2736 private Object fNextCmdNumberSynch = new Object(); 2737 private int fCmdNumber = 1; 2738 private Object fNextProcessKeySynch = new Object(); 2739 private int fProcessKey = 1; 2740 private String fJobDataDir = new String(); 2741 private String fJobName = new String(); 2742 private String fXmlMachine = new String(); 2743 private String fXmlFile = new String(); 2744 private boolean fClearlogs; 2745 private int fMaxSTAXThreads; 2746 private String fWaitTimeout = null; 2747 private String fStartFunction = new String(); 2748 private String fStartFuncArgs = new String(); 2749 private List<String> fScripts = new ArrayList<String>(); 2750 private List<String> fScriptFiles = new ArrayList<String>(); 2751 private String fScriptFileMachine = new String(); 2752 private Map<Integer, STAXThread> fThreadMap = 2753 new LinkedHashMap<Integer, STAXThread>(); 2754 private STAFHandle fHandle = null; 2755 private STAFQueueMonitor fSTAFQueueMonitor = null; 2756 private LinkedList<STAXJobCompleteListener> fCompletionNotifiees = 2757 new LinkedList<STAXJobCompleteListener>(); 2758 2759 /** 2760 * Used if the STAX parameter TimedEventQueue is set to "Job" to indicate 2761 * that a STAXTimedEventQueue should be created for each job instead of 2762 * using a common STAXTimedEventQueue for all jobs. 2763 */ 2764 private STAXTimedEventQueue fTimedEventQueue = null; 2765 2766 // fExecuteAndHold: null indicates no hold 2767 // blank indicates hold with no timeout 2768 // otherwise indicates hold with a timeout 2769 private String fExecuteAndHold = null; 2770 2771 private String fSourceMachine = new String(); // Source == Originating 2772 private String fSourceHandleName = new String(); 2773 private int fSourceHandle; 2774 private int fNotifyOnEnd = STAXJob.NO_NOTIFY_ONEND; 2775 private int fState = PENDING_STATE; 2776 private PyObject fResult = Py.None; 2777 private int fCompletionStatus = STAXJob.NORMAL_STATUS; 2778 private STAXTimestamp fStartTimestamp; 2779 private STAXTimestamp fEndTimestamp; 2780 private HashMap<String, TreeSet<STAXSTAFQueueListener>> fQueueListenerMap = 2781 new HashMap<String, TreeSet<STAXSTAFQueueListener>>(); 2782 private HashMap<String, Object> fDataMap = new HashMap<String, Object>(); 2783 2784 // Map of active STAF requests 2785 private HashMap<Integer, STAXSTAFRequestCompleteListener> fRequestMap = 2786 new HashMap<Integer, STAXSTAFRequestCompleteListener>(); 2787 2788 private boolean fLogTCElapsedTime; 2789 private boolean fLogTCNumStarts; 2790 private boolean fLogTCStartStop; 2791 private int fPythonOutput; 2792 private String fPythonLogLevel; 2793 private int fInvalidLogLevelAction; 2794 private HashMap<String, PyCode> fCompiledPyCodeCache = 2795 new HashMap<String, PyCode>(); 2796 private long fCompiledPyCodeCacheAdds = 0; 2797 private long fCompiledPyCodeCacheGets = 0; 2798 private List<Map<String, Object>> fTestcaseList = 2799 new ArrayList<Map<String, Object>>(); 2800 private List<String> fBreakpointFunctionList = new ArrayList<String>(); 2801 private List<String> fBreakpointLineList = new ArrayList<String>(); 2802 private Map<String, String> fTestcaseTotalsMap = 2803 new HashMap<String, String>(); 2804 private TreeMap<String, STAXBreakpoint> fBreakpointsMap = 2805 new TreeMap<String, STAXBreakpoint>(); 2806 private Object fNextBreakpointNumberSynch = new Object(); 2807 private int fNextBreakpointNumber = 1; 2808 private boolean fBreakpointFirstFunction = false; 2809 private boolean fBreakpointSubjobFirstFunction = false; 2810 private STAFMarshallingContext fJobLogErrorsMC = 2811 new STAFMarshallingContext(); 2812 2813 class STAXSTAFQueueListenerComparator 2814 implements Comparator<STAXSTAFQueueListener> 2815 { compare(STAXSTAFQueueListener o1, STAXSTAFQueueListener o2)2816 public int compare(STAXSTAFQueueListener o1, STAXSTAFQueueListener o2) 2817 { 2818 if (o1.hashCode() < o2.hashCode()) return -1; 2819 else if (o1.hashCode() > o2.hashCode()) return 1; 2820 2821 return 0; 2822 } 2823 } 2824 2825 class STAFQueueMonitor extends Thread 2826 { STAFQueueMonitor(STAXJob job)2827 STAFQueueMonitor(STAXJob job) 2828 { 2829 fJob = job; 2830 } 2831 2832 /** 2833 * Log an error message in the JVM log and the STAX Job log 2834 */ logMessage(String message, Throwable t)2835 public void logMessage(String message, Throwable t) 2836 { 2837 STAXTimestamp currentTimestamp = new STAXTimestamp(); 2838 2839 message = "STAXJob$STAFQueueMonitor.run(): " + message; 2840 2841 if (t != null) 2842 { 2843 // Add the Java stack trace to the message 2844 2845 StringWriter sw = new StringWriter(); 2846 t.printStackTrace(new PrintWriter(sw)); 2847 2848 if (t.getMessage() != null) 2849 message += "\n" + t.getMessage() + "\n" + sw.toString(); 2850 else 2851 message += "\n" + sw.toString(); 2852 } 2853 2854 // Truncate the message to a maximum size of 3000 (in case 2855 // the result from the QUEUE GET WAIT request is very large) 2856 2857 if (message.length() > 3000) 2858 message = message.substring(0, 3000) + "..."; 2859 2860 // Log an error message in the STAX JVM log 2861 2862 STAX.logToJVMLog( 2863 "Error", fJob.getJobNumber(), 0, message); 2864 2865 // Log an error message in the STAX Job log 2866 2867 fJob.log(STAXJob.JOB_LOG, "Error", message); 2868 } 2869 run()2870 public void run() 2871 { 2872 STAFMarshallingContext mc; 2873 List messageList; 2874 int maxMessages = fJob.getSTAX().getMaxGetQueueMessages(); 2875 String request = "GET WAIT FIRST " + maxMessages; 2876 STAFResult result; 2877 int numErrors = 0; 2878 2879 // Maximum consecutive errors submitting a local QUEUE GET WAIT 2880 // request before we decide to exit the infinite loop 2881 int maxErrors = 5; 2882 2883 // Process messages on the STAX job handle's queue until we get 2884 // a "STAF/Service/STAX/End" message or until an error occurs 5 2885 // consecutive times submitting a STAF local QUEUE GET request 2886 // (so that we don't get stuck in an infinite loop eating CPU). 2887 2888 for (;;) 2889 { 2890 result = null; 2891 2892 try 2893 { 2894 // For better performance when more than 1 message is on 2895 // the queue waiting to be processed, get multiple 2896 // messages off the queue at a time (up to maxMessages) 2897 // so that the total size of the messages hopefully won't 2898 // cause an OutOfMemory problem. 2899 // Note: If an error occurs unmarshalling the list of 2900 // messages, all these messages will be lost. 2901 2902 result = submitSync("local", "QUEUE", request); 2903 2904 if (result == null) 2905 { 2906 numErrors++; 2907 2908 logMessage( 2909 "STAF local QUEUE " + request + 2910 " returned null. This may have been caused by " + 2911 "running out of memory creating the result.", 2912 null); 2913 2914 if (numErrors < maxErrors) 2915 { 2916 continue; 2917 } 2918 else 2919 { 2920 logMessage( 2921 "Exiting this thread after the QUEUE GET " + 2922 "request failed " + maxErrors + 2923 " consecutive times.", null); 2924 2925 return; // Exit STAFQueueMonitor thread 2926 } 2927 } 2928 2929 if (result.rc == STAFResult.Ok) 2930 { 2931 numErrors = 0; 2932 } 2933 else if (result.rc == STAFResult.HandleDoesNotExist) 2934 { 2935 // This means that the STAX job's handle has been 2936 // unregistered which means that the STAX job is no 2937 // longer running so we should exit this thread. 2938 // We've seen this happen before this thread gets 2939 // the message with type "STAF/Service/STAX/End" off 2940 // the queue. 2941 2942 return; // Exit STAFQueueMonitor thread 2943 } 2944 else 2945 { 2946 numErrors++; 2947 2948 logMessage( 2949 "STAF local QUEUE " + request + 2950 " failed with RC=" + result.rc + ", Result=" + 2951 result.result, null); 2952 2953 if (numErrors < maxErrors) 2954 { 2955 continue; 2956 } 2957 else 2958 { 2959 logMessage( 2960 "Exiting this thread after the QUEUE GET " + 2961 "request failed " + maxErrors + 2962 " consecutive times", null); 2963 2964 return; // Exit STAFQueueMonitor thread 2965 } 2966 } 2967 } 2968 catch (Throwable t) 2969 { 2970 // Note: One possible reason for an exception occurring 2971 // submitting a QUEUE GET WAIT request can be because 2972 // an error occurred when the result was auto-unmarshalled 2973 // due to invalid marshalled data in the result (though 2974 // in STAF V3.3.3 a fix was made to the Java unmarshall 2975 // method so that this problem should no longer occur). 2976 2977 numErrors++; 2978 2979 logMessage( 2980 "Exception getting message(s) off the queue.", t); 2981 2982 if (numErrors < maxErrors) 2983 { 2984 continue; 2985 } 2986 else 2987 { 2988 logMessage( 2989 "Exiting this thread after the QUEUE GET " + 2990 "request failed " + maxErrors + 2991 " consecutive times", null); 2992 2993 return; // Exit STAFQueueMonitor thread 2994 } 2995 } 2996 2997 try 2998 { 2999 // Unmarshall the result from a QUEUE GET request, but 3000 // don't unmarshall indirect objects 3001 3002 mc = STAFMarshallingContext.unmarshall( 3003 result.result, 3004 STAFMarshallingContext.IGNORE_INDIRECT_OBJECTS); 3005 3006 messageList = (List)mc.getRootObject(); 3007 } 3008 catch (Throwable t) 3009 { 3010 // Log an error message and continue 3011 3012 logMessage( 3013 "Exception unmarshalling queued messages. " + 3014 "\nMarshalled string: " + result.result, t); 3015 continue; 3016 } 3017 3018 // Iterate through the list of messages removed from our 3019 // handle's queue and process each message 3020 3021 Iterator iter = messageList.iterator(); 3022 STAXSTAFMessage msg; 3023 3024 while (iter.hasNext()) 3025 { 3026 // Process the message 3027 3028 try 3029 { 3030 msg = new STAXSTAFMessage((Map)iter.next()); 3031 } 3032 catch (Throwable t) 3033 { 3034 // Log an error message and continue 3035 3036 logMessage("Exception handling a queued message.", t); 3037 continue; 3038 } 3039 3040 String type = msg.getType(); 3041 3042 if (type != null && 3043 type.equalsIgnoreCase("STAF/Service/STAX/End")) 3044 { 3045 return; // Exit STAFQueueMonitorThread 3046 } 3047 3048 try 3049 { 3050 notifyListeners(msg); 3051 } 3052 catch (Throwable t) 3053 { 3054 // Log an error message and continue 3055 3056 logMessage( 3057 "Exception notifying listeners for a message " + 3058 "with type '" + type + 3059 "' from handle " + msg.getHandle() + 3060 " on machine " + msg.getMachine() + 3061 " \nMessage: " + msg.getMessage(), t); 3062 continue; 3063 } 3064 } 3065 } 3066 } 3067 notifyListeners(STAXSTAFMessage msg)3068 public void notifyListeners(STAXSTAFMessage msg) 3069 { 3070 // Perform a lookup of registered message handlers for 3071 // this message type and pass the STAXSTAFMessage to 3072 // the registered message handlers. 3073 3074 String theType = msg.getType(); 3075 int listenersFound = 0; 3076 3077 synchronized (fQueueListenerMap) 3078 { 3079 for (Map.Entry<String, TreeSet<STAXSTAFQueueListener>> entry : 3080 fQueueListenerMap.entrySet()) 3081 { 3082 String msgType = entry.getKey(); 3083 3084 if ((msgType == null) || 3085 // Messages from 3.x clients that STAX is interested 3086 // in will have a type 3087 (theType != null && 3088 theType.equalsIgnoreCase(msgType)) || 3089 // The following is for messages from 2.x clients 3090 // which will have a null type and the message will 3091 // begin with the type (which for processes will be 3092 // STAF/PROCESS/END instead of STAF/Process/End 3093 (theType == null && msg.getMessage().toUpperCase(). 3094 startsWith(msgType.toUpperCase()))) 3095 { 3096 TreeSet<STAXSTAFQueueListener> listenerSet = 3097 entry.getValue(); 3098 3099 if (listenerSet == null) continue; 3100 3101 for (STAXSTAFQueueListener listener : listenerSet) 3102 { 3103 listener.handleQueueMessage(msg, fJob); 3104 listenersFound++; 3105 } 3106 } 3107 } 3108 3109 if ((listenersFound == 0) && 3110 (theType.equals("STAF/RequestComplete") || 3111 theType.equals("STAF/Process/End"))) 3112 { 3113 // Log a warning message in the job log as this could 3114 // indicate a problem in the how the STAX service is 3115 // handling these messages 3116 3117 try 3118 { 3119 fJob.log(STAXJob.JOB_LOG, "warning", 3120 "STAXJob.notifyListeners: " + 3121 "No listener found for message:\n" + 3122 STAFMarshallingContext.unmarshall( 3123 msg.getResult()).toString()); 3124 } 3125 catch (Throwable t) 3126 { 3127 // Ignore 3128 } 3129 } 3130 } 3131 } 3132 3133 STAXJob fJob; 3134 3135 } // end STAFQueueMonitor 3136 3137 } 3138