1 /*****************************************************************************/
2 /* Software Testing Automation Framework (STAF)                              */
3 /* (C) Copyright IBM Corp. 2002, 2004, 2005                                  */
4 /*                                                                           */
5 /* This software is licensed under the Eclipse Public License (EPL) V1.0.    */
6 /*****************************************************************************/
7 
8 package com.ibm.staf.service.stax;
9 import com.ibm.staf.*;
10 import com.ibm.staf.wrapper.STAFLog;
11 import java.util.Map;
12 import java.util.HashMap;
13 import java.util.LinkedHashMap;
14 import java.util.TreeMap;
15 import java.util.TreeSet;
16 import java.util.LinkedHashSet;
17 import java.util.LinkedList;
18 import java.util.ArrayList;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.io.File;
22 import java.io.FileWriter;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 
27 import java.util.Vector;
28 import java.util.StringTokenizer;
29 import java.util.Date;
30 import java.util.Set;
31 
32 import org.python.core.Py;
33 
34 import org.python.core.PyObject;
35 import org.python.core.PyCode;
36 
37 /* Jython 2.1:
38 import org.python.core.__builtin__;
39 */
40 // Jython 2.5:
41 import org.python.core.CompileMode;
42 
43 /**
44  * The representation of a STAX job.  A STAX job is created by the STAXParser
45  * when the STAX service receives an EXECUTE request specifying a file or string
46  * containing XML that defines a STAX job.
47  */
48 public class STAXJob implements STAXThreadCompleteListener,
49                                 STAXSTAFQueueListener
50 {
51     // For debugging - Counts and prints the number of cache gets/adds
52     static final boolean COUNT_PYCODE_CACHES = false;
53 
54     static final String STAX_JOB_EVENT = new String("Job");
55 
56     // Logfile types:
57 
58     /**
59      * Indicates to log to the STAX Service Log file
60      */
61     static final int SERVICE_LOG = 1;
62 
63     /**
64      * Indicates to log to the STAX Job Log file
65      */
66     public static final int JOB_LOG = 2;
67 
68     /**
69      * Indicates to log to the STAX Job User Log file
70      */
71     public static final int USER_JOB_LOG = 3;
72 
73     /**
74      * Indicates to log to the JVM Log file
75      */
76     public static final int JVM_LOG = 4;
77 
78     public static final int NO_NOTIFY_ONEND = 0;
79     public static final int NOTIFY_ONEND_BY_HANDLE = 1;
80     public static final int NOTIFY_ONEND_BY_NAME = 2;
81 
82     /**
83      * Job completion status codes and their string values
84      */
85     public static final int ABNORMAL_STATUS = -1;
86     public static final int NORMAL_STATUS = 0;
87     public static final int TERMINATED_STATUS = 1;
88     public static final int UNKNOWN_STATUS = 2;
89 
90     public static final String ABNORMAL_STATUS_STRING = "Abnormal";
91     public static final String NORMAL_STATUS_STRING = "Normal";
92     public static final String TERMINATED_STATUS_STRING = "Terminated";
93     public static final String UNKNOWN_STATUS_STRING = "Unknown";
94 
95     /**
96      * Job state codes and their string values
97      */
98     public static final int PENDING_STATE = 0;
99     public static final int RUNNING_STATE = 1;
100     public static final String PENDING_STATE_STRING = "Pending";
101     public static final String RUNNING_STATE_STRING = "Running";
102 
103     static final int BREAKPOINT_FUNCTION = 0;
104     static final int BREAKPOINT_LINE = 1;
105 
106     /**
107       * Creates a new STAXJob instance passing in a STAX object which
108       * represents the STAX service that is executing this job.
109       */
STAXJob(STAX staxService)110     public STAXJob(STAX staxService) {
111         this(staxService, new STAXDocument());
112     }
113 
114     /**
115      * Creates a new STAXJob instance using an existing STAX
116      * document.
117      *
118      * @param staxService the STAX service.
119      * @param document the STAX document to be executed by
120      * this job.
121      */
STAXJob(STAX staxService, STAXDocument document)122     public STAXJob(STAX staxService, STAXDocument document) {
123         fSTAX = staxService;
124         fDocument = document;
125         STAXThread thread = new STAXThread(this);
126         thread.addCompletionNotifiee(this);
127         fThreadMap.put(thread.getThreadNumberAsInteger(), thread);
128         fClearlogs = fSTAX.getClearlogs();
129         fLogTCElapsedTime = fSTAX.getLogTCElapsedTime();
130         fLogTCNumStarts   = fSTAX.getLogTCNumStarts();
131         fLogTCStartStop   = fSTAX.getLogTCStartStop();
132         fPythonOutput     = fSTAX.getPythonOutput();
133         fPythonLogLevel   = fSTAX.getPythonLogLevel();
134         fMaxSTAXThreads   = fSTAX.getMaxSTAXThreads();
135     }
136 
137     /**
138      * Gets the STAX object which represents the job's STAX service
139      * @return an instance of the job's STAX service
140      */
getSTAX()141     public STAX getSTAX() { return fSTAX; }
142 
143     /**
144      * Gets the STAX document that is executed by this job.
145      *
146      * @return the job's STAX document.
147      */
getSTAXDocument()148     public STAXDocument getSTAXDocument() { return fDocument; }
149 
150     /**
151      * Sets the STAX document to be executed by this job
152      * @param document the STAX document to be executed by this job
153      */
setSTAXDocument(STAXDocument document)154     public void setSTAXDocument(STAXDocument document)
155     {
156         fDocument = document;
157     }
158 
159     /**
160      * Gets the next number for a thread in a job
161      * @return a number for the next thread in a job
162      */
getNextThreadNumber()163     public int getNextThreadNumber()
164     {
165         synchronized (fNextThreadNumberSynch)
166         {
167             return fNextThreadNumber++;
168         }
169     }
170 
171     /**
172      * Gets the next number for a breakpoint in a job
173      * @return a number for the next breakpoint in a job
174      */
getNextBreakpointNumber()175     public int getNextBreakpointNumber()
176     {
177         synchronized (fNextBreakpointNumberSynch)
178         {
179             return fNextBreakpointNumber++;
180         }
181     }
182 
setDefaultCallAction(STAXCallAction action)183     public void setDefaultCallAction(STAXCallAction action)
184     {
185         fDefaultCallAction = action;
186     }
187 
188     /**
189      * Gets the name of the function that should be called to start the
190      * execution of a job
191      * @return the name of the starting function for a job
192      */
getStartFunction()193     public String getStartFunction() { return fDocument.getStartFunction(); }
194 
195     /**
196      * Sets the name of the function that should be called to start the
197      * execution of a job
198      * @param  startFunction   the name of the starting function for a job
199      */
setStartFunction(String startFunction)200     public void setStartFunction(String startFunction)
201     { fDocument.setStartFunction(startFunction); }
202 
getStartFuncArgs()203     public String getStartFuncArgs() { return fDocument.getStartFunctionArgs(); }
204 
setStartFuncArgs(String startFuncArgs)205     public void setStartFuncArgs(String startFuncArgs)
206     { fDocument.setStartFuncArgs(startFuncArgs); }
207 
setExecuteAndHold()208     public void setExecuteAndHold()
209     { fExecuteAndHold = true; }
210 
211     // Sets the value of the FUNCTION option on a STAX EXECUTE request
setStartFunctionOverride(String startFunction)212     public void setStartFunctionOverride(String startFunction)
213     { fStartFunction = startFunction; }
214 
getStartFunctionOverride()215     public String getStartFunctionOverride() { return fStartFunction; }
216 
217     // Sets the value of the ARGS option on a STAX EXECUTE request
setStartFuncArgsOverride(String startFuncArgs)218     public void setStartFuncArgsOverride(String startFuncArgs)
219     { fStartFuncArgs = startFuncArgs; }
220 
getStartFuncArgsOverride()221     public String getStartFuncArgsOverride() { return fStartFuncArgs; }
222 
getJobName()223     public String getJobName() { return fJobName; }
224 
setJobName(String jobName)225     public void setJobName(String jobName) { fJobName = jobName; }
226 
getJobNumber()227     public int getJobNumber() { return fJobNumber; }
228 
setJobNumber(int jobNumber)229     public void setJobNumber(int jobNumber) { fJobNumber = jobNumber; }
230 
getNextProcNumber()231     public int getNextProcNumber()
232     {
233         synchronized (fNextProcNumberSynch)
234         {
235             return fProcNumber++;
236         }
237     }
238 
getNextCmdNumber()239     public int getNextCmdNumber()
240     {
241         synchronized (fNextCmdNumberSynch)
242         {
243             return fCmdNumber++;
244         }
245     }
246 
getNextProcessKey()247     public int getNextProcessKey()
248     {
249         synchronized (fNextProcessKeySynch)
250         {
251             return fProcessKey++;
252         }
253     }
getJobDataDir()254     public String getJobDataDir() { return fJobDataDir; }
255 
setJobDataDir(String jobDataDir)256     public void setJobDataDir(String jobDataDir) { fJobDataDir = jobDataDir; }
257 
getXmlMachine()258     public String getXmlMachine() { return fXmlMachine; }
259 
setXmlMachine(String machName)260     public void setXmlMachine(String machName)
261     { fXmlMachine = machName; }
262 
getXmlFile()263     public String getXmlFile() { return fXmlFile; }
264 
setXmlFile(String fileName)265     public void setXmlFile(String fileName) { fXmlFile = fileName; }
266 
getScripts()267     public List getScripts() { return fScripts; }
268 
setScript(String script)269     public void setScript(String script) { fScripts.add(script); }
270 
getScriptFiles()271     public List getScriptFiles() { return fScriptFiles; }
272 
setScriptFile(String fileName)273     public void setScriptFile(String fileName) { fScriptFiles.add(fileName); }
274 
getScriptFileMachine()275     public String getScriptFileMachine() { return fScriptFileMachine; }
276 
setScriptFileMachine(String machName)277     public void setScriptFileMachine(String machName)
278     { fScriptFileMachine = machName; }
279 
getSourceMachine()280     public String getSourceMachine() { return fSourceMachine; }
281 
setSourceMachine(String machName)282     public void setSourceMachine(String machName)
283     { fSourceMachine = machName; }
284 
getSourceHandleName()285     public String getSourceHandleName() { return fSourceHandleName; }
286 
setSourceHandleName(String handleName)287     public void setSourceHandleName(String handleName)
288     { fSourceHandleName = handleName; }
289 
getSourceHandle()290     public int getSourceHandle() { return fSourceHandle; }
291 
setSourceHandle(int handle)292     public void setSourceHandle(int handle)
293     { fSourceHandle = handle; }
294 
getClearlogs()295     public boolean getClearlogs() { return fClearlogs; }
296 
getClearLogsAsString()297     public String getClearLogsAsString()
298     {
299         if (fClearlogs)
300             return "Enabled";
301         else
302             return "Disabled";
303     }
304 
setClearlogs(boolean clearlogs)305     public void setClearlogs(boolean clearlogs)
306     { fClearlogs = clearlogs; }
307 
getMaxSTAXThreads()308     public int getMaxSTAXThreads() { return fMaxSTAXThreads; }
309 
getWaitTimeout()310     public String getWaitTimeout() { return fWaitTimeout; }
311 
setWaitTimeout(String timeout)312     public void setWaitTimeout(String timeout)
313     { fWaitTimeout = timeout; }
314 
getNotifyOnEnd()315     public int getNotifyOnEnd() { return fNotifyOnEnd; }
316 
getNotifyOnEndAsString()317     public String getNotifyOnEndAsString()
318     {
319         if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_NAME)
320             return "By Name";
321         else if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_HANDLE)
322             return "By Handle";
323         else
324             return "No";
325     }
326 
setNotifyOnEnd(int notifyFlag)327     public void setNotifyOnEnd(int notifyFlag) { fNotifyOnEnd = notifyFlag; }
328 
getState()329     public int getState() { return fState; }
330 
getStateAsString()331     public String getStateAsString()
332     {
333         if (fState == PENDING_STATE)
334             return PENDING_STATE_STRING;
335         else
336             return RUNNING_STATE_STRING;
337     }
338 
setState(int state)339     public void setState(int state) { fState = state; }
340 
getResult()341     public PyObject getResult() { return fResult; }
342 
setResult(PyObject result)343     public void setResult(PyObject result)
344     { fResult = result; }
345 
getCompletionStatus()346     public int getCompletionStatus() { return fCompletionStatus; }
347 
getCompletionStatusAsString()348     public String getCompletionStatusAsString()
349     {
350         if (fCompletionStatus == STAXJob.NORMAL_STATUS)
351             return STAXJob.NORMAL_STATUS_STRING;
352         else if (fCompletionStatus == STAXJob.TERMINATED_STATUS)
353             return STAXJob.TERMINATED_STATUS_STRING;
354         else if (fCompletionStatus == STAXJob.ABNORMAL_STATUS)
355             return STAXJob.ABNORMAL_STATUS_STRING;
356         else
357             return STAXJob.UNKNOWN_STATUS_STRING;
358     }
359 
setCompletionStatus(int status)360     public void setCompletionStatus(int status)
361     {
362         fCompletionStatus = status;
363     }
364 
getJobLogErrorsMC()365     public STAFMarshallingContext getJobLogErrorsMC()
366     {
367         return fJobLogErrorsMC;
368     }
369 
getTestcaseList()370     public List getTestcaseList() { return fTestcaseList; }
371 
setTestcaseList(List testcaseList)372     public void setTestcaseList(List testcaseList)
373     { fTestcaseList = testcaseList; }
374 
getTestcaseTotalsMap()375     public Map getTestcaseTotalsMap() { return fTestcaseTotalsMap; }
376 
setTestcaseTotalsMap(Map testcaseTotalsMap)377     public void setTestcaseTotalsMap(Map testcaseTotalsMap)
378     { fTestcaseTotalsMap = testcaseTotalsMap; }
379 
getStartTimestamp()380     public STAXTimestamp getStartTimestamp() { return fStartTimestamp; }
381 
setStartTimestamp()382     public void setStartTimestamp()
383     {
384         // Get the current date and time and set as the starting date/time
385         fStartTimestamp = new STAXTimestamp();
386     }
387 
getEndTimestamp()388     public STAXTimestamp getEndTimestamp() { return fEndTimestamp; }
389 
getJobNumberAsInteger()390     public Integer getJobNumberAsInteger() { return new Integer(fJobNumber); }
391 
getSTAFHandle()392     public STAFHandle getSTAFHandle() { return fHandle; }
393 
setSTAFHandle()394     public void setSTAFHandle() throws STAFException
395     {
396         fHandle = new STAFHandle("STAX/Job/" + fJobNumber);
397     }
398 
getLogTCElapsedTime()399     public boolean getLogTCElapsedTime() { return fLogTCElapsedTime; }
400 
getLogTCElapsedTimeAsString()401     public String getLogTCElapsedTimeAsString()
402     {
403         if (fLogTCElapsedTime)
404             return "Enabled";
405         else
406             return "Disabled";
407     }
408 
setLogTCElapsedTime(boolean logTCElapsedTime)409     public void setLogTCElapsedTime(boolean logTCElapsedTime)
410     { fLogTCElapsedTime = logTCElapsedTime; }
411 
getLogTCNumStarts()412     public boolean getLogTCNumStarts() { return fLogTCNumStarts; }
413 
getLogTCNumStartsAsString()414     public String getLogTCNumStartsAsString()
415     {
416         if (fLogTCNumStarts)
417             return "Enabled";
418         else
419             return "Disabled";
420     }
421 
setLogTCNumStarts(boolean logTCNumStarts)422     public void setLogTCNumStarts(boolean logTCNumStarts)
423     { fLogTCNumStarts = logTCNumStarts; }
424 
getLogTCStartStop()425     public boolean getLogTCStartStop() { return fLogTCStartStop; }
426 
getLogTCStartStopAsString()427     public String getLogTCStartStopAsString()
428     {
429         if (fLogTCStartStop)
430             return "Enabled";
431         else
432             return "Disabled";
433     }
434 
setLogTCStartStop(boolean logTCStartStop)435     public void setLogTCStartStop(boolean logTCStartStop)
436     { fLogTCStartStop = logTCStartStop; }
437 
getPythonOutput()438     public int getPythonOutput() { return fPythonOutput; }
439 
setPythonOutput(int pythonOutput)440     public void setPythonOutput(int pythonOutput)
441     {
442         fPythonOutput = pythonOutput;
443     }
444 
getPythonLogLevel()445     public String getPythonLogLevel() { return fPythonLogLevel; }
446 
setPythonLogLevel(String logLevel)447     public void setPythonLogLevel(String logLevel)
448     {
449         fPythonLogLevel = logLevel;
450     }
451 
getNumThreads()452     public int getNumThreads()
453     {
454         return fThreadMap.size();
455     }
456 
getThreadMapCopy()457     public Map getThreadMapCopy()
458     {
459         return fThreadMap;
460     }
461 
getThread(Integer threadNumber)462     public STAXThread getThread(Integer threadNumber)
463     {
464         synchronized (fThreadMap)
465         {
466             return (STAXThread)fThreadMap.get(threadNumber);
467         }
468     }
469 
addThread(STAXThread thread)470     public void addThread(STAXThread thread)
471     {
472         synchronized (fThreadMap)
473         {
474             fThreadMap.put(thread.getThreadNumberAsInteger(), thread);
475         }
476     }
477 
addThreadIfDoesNotExceedMax(STAXThread thread)478     public void addThreadIfDoesNotExceedMax(STAXThread thread)
479         throws STAXExceedsMaxThreadsException
480     {
481         synchronized (fThreadMap)
482         {
483             if ((fMaxSTAXThreads != 0) &&
484                 (fThreadMap.size() >= fMaxSTAXThreads))
485             {
486                 throw new STAXExceedsMaxThreadsException(
487                     "Exceeded MaxSTAXThreads=" + fMaxSTAXThreads +
488                     " (the maximum number of STAX Threads that " +
489                     "can be running simultaneously in a job).");
490             }
491 
492             fThreadMap.put(thread.getThreadNumberAsInteger(), thread);
493         }
494     }
495 
removeThread(Integer threadNumber)496     public void removeThread(Integer threadNumber)
497     {
498         synchronized (fThreadMap)
499         {
500             fThreadMap.remove(threadNumber);
501         }
502     }
503 
addFunction(STAXFunctionAction function)504     public void addFunction(STAXFunctionAction function)
505     {
506         fDocument.addFunction(function);
507     }
508 
getFunction(String name)509     public STAXAction getFunction(String name)
510     {
511         return fDocument.getFunction(name);
512     }
513 
getBreakpointFirstFunction()514     public boolean getBreakpointFirstFunction()
515     {
516         return fBreakpointFirstFunction;
517     }
518 
setBreakpointFirstFunction(boolean breakpointFirstFunction)519     public void setBreakpointFirstFunction(boolean breakpointFirstFunction)
520     {
521         fBreakpointFirstFunction = breakpointFirstFunction;
522     }
523 
getBreakpointSubjobFirstFunction()524     public boolean getBreakpointSubjobFirstFunction()
525     {
526         return fBreakpointSubjobFirstFunction;
527     }
528 
setBreakpointSubjobFirstFunction( boolean breakpointSubjobFirstFunction)529     public void setBreakpointSubjobFirstFunction(
530                     boolean breakpointSubjobFirstFunction)
531     {
532         fBreakpointSubjobFirstFunction = breakpointSubjobFirstFunction;
533     }
534 
functionExists(String name)535     public boolean functionExists(String name)
536     {
537         return fDocument.functionExists(name);
538     }
539 
addDefaultAction(STAXAction action)540     public void addDefaultAction(STAXAction action)
541     {
542         fDocument.addDefaultAction(action);
543     }
544 
addCompletionNotifiee(STAXJobCompleteListener listener)545     public void addCompletionNotifiee(STAXJobCompleteListener listener)
546     {
547         synchronized (fCompletionNotifiees)
548         {
549             fCompletionNotifiees.addLast(listener);
550         }
551     }
552 
addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee)553     public STAFResult addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee)
554     {
555         synchronized (fCompletionNotifiees)
556         {
557             // Check if the notifiee is already in the list
558 
559             Iterator iter = fCompletionNotifiees.iterator();
560 
561             while (iter.hasNext())
562             {
563                 Object notifieeObj = iter.next();
564 
565                 if (notifieeObj instanceof
566                     com.ibm.staf.service.stax.STAXJobCompleteNotifiee)
567                 {
568                     STAXJobCompleteNotifiee aNotifiee =
569                         (STAXJobCompleteNotifiee)notifieeObj;
570 
571                     if (aNotifiee.getMachine().equals(notifiee.getMachine()) &&
572                         aNotifiee.getHandle() == notifiee.getHandle() &&
573                         aNotifiee.getHandleName().equals(notifiee.getHandleName()))
574                     {
575                         return new STAFResult(
576                             STAFResult.AlreadyExists,
577                             "A notifiee is already registered for machine=" +
578                             notifiee.getMachine() +
579                             ", handle=" + notifiee.getHandle() +
580                             ", handleName=" + notifiee.getHandleName());
581                     }
582                 }
583 
584             }
585 
586             // Add to the end of the list
587 
588             fCompletionNotifiees.addLast(notifiee);
589         }
590 
591         return new STAFResult(STAFResult.Ok, "");
592     }
593 
removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee)594     public STAFResult removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee)
595     {
596         boolean found = false;
597 
598         synchronized (fCompletionNotifiees)
599         {
600             // Check if notifiee exists.  If so, remove the notifiee
601 
602             Iterator iter = fCompletionNotifiees.iterator();
603 
604             while (iter.hasNext())
605             {
606                 Object notifieeObj = iter.next();
607 
608                 if (notifieeObj instanceof
609                     com.ibm.staf.service.stax.STAXJobCompleteNotifiee)
610                 {
611                     STAXJobCompleteNotifiee aNotifiee =
612                         (STAXJobCompleteNotifiee)notifieeObj;
613 
614                     if (aNotifiee.getMachine().equals(notifiee.getMachine()) &&
615                         aNotifiee.getHandle() == notifiee.getHandle() &&
616                         aNotifiee.getHandleName().equals(notifiee.getHandleName()))
617                     {
618                         // Remove notifiee from list
619 
620                         fCompletionNotifiees.remove(aNotifiee);
621                         found = true;
622                         return new STAFResult(STAFResult.Ok, "");
623                     }
624                 }
625             }
626         }
627 
628         return new STAFResult(
629             STAFResult.DoesNotExist,
630             "No notifiee registered for machine=" + notifiee.getMachine() +
631             ", handle=" + notifiee.getHandle() +
632             ", handleName=" + notifiee.getHandleName());
633     }
634 
getCompletionNotifiees()635     public LinkedList getCompletionNotifiees()
636     {
637         synchronized(fCompletionNotifiees)
638         {
639             return new LinkedList(fCompletionNotifiees);
640         }
641     }
642 
643     // Gets the compiled Jython code for the specified code string.
644     // Note that we cache compiled Jython code so that if it is used more
645     // than once, it eliminates the overhead of recompiling the code string
646     // each time it is executed.
647     //
648     // Input Arguments:
649     //  - codeString:  must be valid Python code; however, like ordinary
650     //                 Python code, the existence of variables will not be
651     //                 checked until the code is executed.
652     //  - kind:        is either 'exec' if the string is made up of
653     //                 statements, 'eval' if it is an expression.
654 
getCompiledPyCode(String codeString, String kind)655     public PyCode getCompiledPyCode(String codeString, String kind)
656     {
657         synchronized(fCompiledPyCodeCache)
658         {
659             PyCode codeObject = (PyCode)fCompiledPyCodeCache.get(codeString);
660 
661             if (codeObject == null)
662             {
663                 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheAdds++;
664 
665                 if (kind.equals("eval"))
666                 {
667                     // Set to avoid error of setting code object to nothing
668                     if (codeString.equals("")) codeString = "None";
669 
670                     /* Jython 2.1:
671                     codeObject = __builtin__.compile(
672                         "STAXPyEvalResult = " + codeString,
673                         "<pyEval string>", "exec");
674                     */
675                     // Jython 2.5:
676                     codeObject = Py.compile_flags(
677                         codeString, "<pyEval string>",
678                         CompileMode.eval, Py.getCompilerFlags());
679                 }
680                 else
681                 {
682                     /* Jython 2.1:
683                     codeObject = __builtin__.compile(
684                         codeString, "<pyExec string>", "exec");
685                     */
686                     // Jython 2.5:
687                     codeObject = Py.compile_flags(
688                         codeString, "<pyExec string>",
689                         CompileMode.exec, Py.getCompilerFlags());
690                 }
691 
692                 fCompiledPyCodeCache.put(codeString, codeObject);
693             }
694             else
695             {
696                 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheGets++;
697             }
698 
699             return codeObject;
700         }
701     }
702 
703     // Queue listener methods
704 
registerSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)705     public void registerSTAFQueueListener(String msgType,
706                                           STAXSTAFQueueListener listener)
707     {
708         synchronized (fQueueListenerMap)
709         {
710             TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType);
711 
712             if (listenerSet == null)
713             {
714                 listenerSet = new TreeSet(new STAXObjectComparator());
715                 fQueueListenerMap.put(msgType, listenerSet);
716             }
717 
718             listenerSet.add(listener);
719         }
720     }
721 
unregisterSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)722     public void unregisterSTAFQueueListener(String msgType,
723                                             STAXSTAFQueueListener listener)
724     {
725         synchronized (fQueueListenerMap)
726         {
727             TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType);
728 
729             if (listenerSet != null)
730                 listenerSet.remove(listener);
731         }
732     }
733 
734     //
735     // Data management functions
736     //
737 
setData(String dataName, Object data)738     public boolean setData(String dataName, Object data)
739     {
740         // Return true if added successfully, else return false.
741 
742         synchronized (fDataMap)
743         {
744             if (!fDataMap.containsKey(dataName))
745             {
746                 fDataMap.put(dataName, data);
747                 return true;
748             }
749         }
750 
751         return false;
752     }
753 
getData(String dataName)754     public Object getData(String dataName)
755     {
756         synchronized (fDataMap)
757         { return fDataMap.get(dataName); }
758     }
759 
removeData(String dataName)760     public boolean removeData(String dataName)
761     {
762         // Return true if removed successfully, else return false.
763 
764         synchronized (fDataMap)
765         {
766             if (fDataMap.containsKey(dataName))
767             {
768                 fDataMap.remove(dataName);
769                 return true;
770             }
771         }
772 
773         return false;
774     }
775 
776     //
777     // Execution methods
778     //
779 
startExecution()780     public void startExecution()
781         throws STAFException, STAXException
782     {
783         // Check if the MAXRETURNFILESIZE setting is not 0 (no maximum size
784         // limit) and if so, set variable STAF/MaxReturnFileSize for the STAX
785         // job handle
786 
787         if (fSTAX.getMaxReturnFileSize() != 0)
788         {
789             String request = "SET VAR " + STAFUtil.wrapData(
790                 "STAF/MaxReturnFileSize=" + fSTAX.getMaxReturnFileSize());
791 
792             STAFResult result = fHandle.submit2("local", "VAR", request);
793 
794             if (result.rc != STAFResult.Ok)
795             {
796                 String msg = "The STAX service could not set the maximum " +
797                     "file size variable for Job ID " + fJobNumber +
798                     "\nSTAF local VAR " + request + " failed with RC=" +
799                     result.rc + ", Result=" + result.result;
800 
801                 log(STAXJob.JOB_LOG, "error", msg);
802             }
803         }
804 
805         fSTAFQueueMonitor = new STAFQueueMonitor(this);
806         fSTAFQueueMonitor.start();
807 
808         // Initialize data for the Job
809 
810         fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this)
811         {
812             public void visit(Object o, Iterator iter)
813             {
814                 STAXJobManagementHandler handler = (STAXJobManagementHandler)o;
815 
816                 handler.initJob((STAXJob)fData);
817             }
818         });
819 
820         // Register to listen for messages that STAF requests have completed
821 
822         registerSTAFQueueListener("STAF/RequestComplete", this);
823 
824         // Get the main thread, thread 1
825 
826         STAXThread thread = (STAXThread)fThreadMap.get(new Integer(1));
827 
828         // Use a custom OutputStream to redirect the Python Interpreter's
829         // stdout and stderr to somewhere other than the JVM Log and/or
830         // to reformat the output (e.g. add a timestamp, job ID) when
831         // logging to the JVM Log.  Use the log level specified for the
832         // job's python output.
833 
834         thread.setPythonInterpreterStdout(new STAXPythonOutput(this));
835 
836         // Override the log level to use to always be "Error" for stderr
837         thread.setPythonInterpreterStderr(new STAXPythonOutput(this, "Error"));
838 
839         // Get the starting function for the job
840 
841         STAXAction action = (STAXAction)fDocument.getFunction(
842             fDocument.getStartFunction());
843 
844         if (action == null)
845         {
846             throw new STAXInvalidStartFunctionException(
847                 "'" + fDocument.getStartFunction() +
848                 "' is not a valid function name.  No function with " +
849                 "this name is defined.");
850         }
851         else
852         {
853             // Call the main function passing any arguments
854 
855             String startFunctionUneval = "'" + fDocument.getStartFunction() +
856                 "'";
857 
858             if (fDefaultCallAction != null)
859             {
860                 STAXCallAction callAction = fDefaultCallAction;
861                 callAction.setFunction(startFunctionUneval);
862                 callAction.setArgs(fDocument.getStartFunctionArgs());
863                 action = callAction;
864             }
865             else
866             {
867                 STAXCallAction callAction = new STAXCallAction(
868                     startFunctionUneval, fDocument.getStartFunctionArgs());
869                 callAction.setLineNumber(
870                     "<External>", "<Error in ARGS option>");
871                 callAction.setXmlFile(getXmlFile());
872                 callAction.setXmlMachine(getXmlMachine());
873                 action = callAction;
874             }
875         }
876 
877         ArrayList actionList = new ArrayList();
878 
879         // If HOLD was specified on EXECUTE request, add hold action as
880         // the first action in the actionList.
881         if (fExecuteAndHold)
882            actionList.add(new STAXHoldAction("'main'"));
883 
884         // Add additional Python variables about the Job
885         thread.pySetVar("STAXJobID", new Integer(fJobNumber));
886         thread.pySetVar("STAXJobName", fJobName);
887         thread.pySetVar("STAXJobXMLFile", fXmlFile);
888         thread.pySetVar("STAXJobXMLMachine", fXmlMachine);
889         thread.pySetVar("STAXJobStartDate", fStartTimestamp.getDateString());
890         thread.pySetVar("STAXJobStartTime", fStartTimestamp.getTimeString());
891         thread.pySetVar("STAXJobSourceMachine", fSourceMachine);
892         thread.pySetVar("STAXJobSourceHandleName", fSourceHandleName);
893         thread.pySetVar("STAXJobSourceHandle", new Integer(fSourceHandle));
894         thread.pySetVar("STAXJobStartFunctionName",
895                         fDocument.getStartFunction());
896         thread.pySetVar("STAXJobStartFunctionArgs",
897                         fDocument.getStartFunctionArgs());
898         thread.pySetVar("STAXCurrentFunction", Py.None);
899         thread.pySetVar("STAXCurrentXMLFile", fXmlFile);
900         thread.pySetVar("STAXCurrentXMLMachine", fXmlMachine);
901 
902         if (!fScriptFileMachine.equals(""))
903             thread.pySetVar("STAXJobScriptFileMachine", fScriptFileMachine);
904         else
905             thread.pySetVar("STAXJobScriptFileMachine", fSourceMachine);
906 
907         thread.pySetVar("STAXJobScriptFiles", fScriptFiles.toArray());
908         thread.pySetVar("STAXJobHandle", fHandle);
909 
910         thread.pySetVar("STAXServiceName", fSTAX.getServiceName());
911         thread.pySetVar("STAXServiceMachine", fSTAX.getLocalMachineName());
912         thread.pySetVar("STAXServiceMachineNickname",
913                         fSTAX.getLocalMachineNickname());
914         thread.pySetVar("STAXEventServiceName", fSTAX.getEventServiceName());
915         thread.pySetVar("STAXEventServiceMachine",
916                         fSTAX.getEventServiceMachine());
917 
918         thread.pySetVar("STAXJobUserLog", new STAFLog(
919             STAFLog.MACHINE,
920             fSTAX.getServiceName().toUpperCase() + "_Job_" +
921             fJobNumber + "_User",
922             fHandle, 0));
923         thread.pySetVar("STAXJobLogName",
924                         fSTAX.getServiceName().toUpperCase() +
925                         "_Job_" + fJobNumber);
926         thread.pySetVar("STAXJobUserLogName",
927                         fSTAX.getServiceName().toUpperCase() +
928                         "_Job_" + fJobNumber + "_User");
929 
930         thread.pySetVar("STAXJobWriteLocation", fJobDataDir);
931         thread.pySetVar("STAXMessageLog", new Integer(0));
932         thread.pySetVar("STAXLogMessage", new Integer(0));
933 
934         if (fLogTCElapsedTime)
935             thread.pySetVar("STAXLogTCElapsedTime", new Integer(1));
936         else
937             thread.pySetVar("STAXLogTCElapsedTime", new Integer(0));
938 
939         if (fLogTCNumStarts)
940             thread.pySetVar("STAXLogTCNumStarts", new Integer(1));
941         else
942             thread.pySetVar("STAXLogTCNumStarts", new Integer(0));
943 
944         if (fLogTCStartStop)
945             thread.pySetVar("STAXLogTCStartStop", new Integer(1));
946         else
947             thread.pySetVar("STAXLogTCStartStop", new Integer(0));
948 
949         thread.pySetVar("STAXPythonOutput",
950                         STAXPythonOutput.getPythonOutputAsString(
951                             getPythonOutput()));
952 
953         thread.pySetVar("STAXPythonLogLevel", getPythonLogLevel());
954 
955         // Add default signal handlers to the actionList for the main block.
956         addDefaultSignalHandlers(actionList);
957 
958         // Put the "main" function/block at the bottom of the stack,
959         // with its action being a sequence group that contains the
960         // default actions (scripts and signalhandlers) in the <stax>
961         // element and then the call of the main function.
962 
963         LinkedList defaultActions = fDocument.getDefaultActions();
964 
965         while (!defaultActions.isEmpty())
966         {
967             actionList.add((STAXAction)defaultActions.removeLast());
968         }
969 
970         actionList.add(action);  // Add call of main function
971 
972         // Create a sequence action for the "main" function/block
973         STAXActionDefaultImpl mainSequence = new STAXSequenceAction(actionList);
974         mainSequence.setLineNumber("sequence", "<Internal>");
975         mainSequence.setXmlFile(getXmlFile());
976         mainSequence.setXmlMachine(getXmlMachine());
977 
978         STAXActionDefaultImpl mainBlock = new STAXBlockAction(
979             "main", mainSequence);
980         mainBlock.setLineNumber("block", "<Internal>");
981         mainBlock.setXmlFile(getXmlFile());
982         mainBlock.setXmlMachine(getXmlMachine());
983 
984         thread.pushAction(mainBlock);
985 
986         // Change the job's state from pending to running
987         fState = RUNNING_STATE;
988 
989         // Generate the job is running event
990 
991         HashMap jobRunningMap = new HashMap();
992         jobRunningMap.put("type", "job");
993         jobRunningMap.put("status", "run");
994         jobRunningMap.put("jobID", String.valueOf(fJobNumber));
995         jobRunningMap.put("startFunction", fDocument.getStartFunction());
996         jobRunningMap.put("jobName", fJobName);
997         jobRunningMap.put("startTimestamp",
998                           fStartTimestamp.getTimestampString());
999 
1000         generateEvent(STAXJob.STAX_JOB_EVENT, jobRunningMap, true);
1001 
1002         thread.pySetVar("STAXJob", this);
1003 
1004         thread.setBreakpointFirstFunction(fBreakpointFirstFunction);
1005 
1006         // Schedule the thread to run
1007         thread.schedule();
1008     }
1009 
1010     /**
1011      * This is a recursive function that checks if a function needs to
1012      * import other xml files that contain functions it requires.
1013      *
1014      * Checks if the specified function has any function-import sub-elements
1015      * that specify files to be imported.  If so, it parses the files (if not
1016      * already in the cache) and adds their required functions to the job's
1017      * fFunctionMap. For any new functions added to the job's fFunctionMap,
1018      * it recursively calls the addImportedFunctions method.
1019      */
addImportedFunctions(STAXFunctionAction functionAction)1020     public void addImportedFunctions(STAXFunctionAction functionAction)
1021         throws STAFException, STAXException
1022     {
1023         List importList = functionAction.getImportList();
1024 
1025         if (importList.size() == 0)
1026             return;
1027 
1028         // Process function-import list (since not empty)
1029 
1030         String evalElem = "function-import";
1031         int evalIndex = 0;
1032         Iterator iter = importList.iterator();
1033 
1034         while (iter.hasNext())
1035         {
1036             STAXFunctionImport functionImport =
1037                 (STAXFunctionImport)iter.next();
1038 
1039             String machine = functionImport.getMachine();
1040             String file = functionImport.getFile();
1041             String directory = functionImport.getDirectory();
1042 
1043             boolean directorySpecified = false;
1044 
1045             if (directory != null)
1046                 directorySpecified = true;
1047 
1048             boolean machineSpecified = false;
1049 
1050             if (machine != null)
1051             {
1052                 machineSpecified = true;
1053 
1054                 // Check if "machine" contains any STAF variables
1055 
1056                 if (machine.indexOf("{") != -1)
1057                 {
1058                     // Resolve variables on the local STAX service machine
1059 
1060                     STAFResult result = this.submitSync(
1061                         "local", "VAR", "RESOLVE STRING " +
1062                         STAFUtil.wrapData(machine));
1063 
1064                     if (result.rc != STAFResult.Ok)
1065                     {
1066                         String errorMsg = "Cause:  Error resolving STAF " +
1067                             "variables in the \"machine\" attribute " +
1068                             "for element type \" function-import\"," +
1069                             "\nRC: " + result.rc +
1070                             ", Result: " + result.result;
1071 
1072                         functionAction.setElementInfo(
1073                             new STAXElementInfo(
1074                                 evalElem, "machine", evalIndex, errorMsg));
1075 
1076                         throw new STAXFunctionImportException(
1077                             STAXUtil.formatErrorMessage(functionAction));
1078                     }
1079 
1080                     machine = result.result;
1081                 }
1082             }
1083             else
1084             {
1085                 machine = fXmlMachine;
1086             }
1087 
1088             // Check if file or directory contains any STAF variables.
1089 
1090             String evalAttr = "file";
1091             String unresValue = file;
1092 
1093             if (directorySpecified)
1094             {
1095                 evalAttr = "directory";
1096                 unresValue = directory;
1097             }
1098 
1099             if (unresValue.indexOf("{") != -1)
1100             {
1101                 // Resolve variables on the local STAX service machine
1102 
1103                 STAFResult result = this.submitSync(
1104                     "local", "VAR", "RESOLVE STRING " +
1105                     STAFUtil.wrapData(unresValue));
1106 
1107                 if (result.rc != STAFResult.Ok)
1108                 {
1109                     String errorMsg = "Cause:  Error resolving STAF " +
1110                         "variables in the \"" + evalAttr + "\" attribute " +
1111                         "for element type \"function-import\"." +
1112                         "\nRC: " + result.rc +
1113                         ", Result: " + result.result;
1114 
1115                     functionAction.setElementInfo(
1116                         new STAXElementInfo(
1117                             evalElem, evalAttr, evalIndex, errorMsg));
1118 
1119                     throw new STAXFunctionImportException(
1120                         STAXUtil.formatErrorMessage(functionAction));
1121                 }
1122 
1123                 if (!directorySpecified)
1124                     file = result.result;
1125                 else
1126                     directory = result.result;
1127             }
1128 
1129             // Handle any functions specified to be imported
1130 
1131             String functions = functionImport.getFunctions();
1132 
1133             Vector importedFunctionList = new Vector();
1134 
1135             if (functions != null)
1136             {
1137                 // Convert string containing a whitespace-separated list of
1138                 // functions to a vector.
1139                 //
1140                 // Note: The tokenizer uses the default delimiter set, which
1141                 // is " \t\n\r\f" and consists of the space character,
1142                 // the tab character, the newline character, the
1143                 // carriage-return character, and the form-feed character.
1144                 // Delimiter characters themselves will not be treated as
1145                 // tokens.
1146 
1147                 StringTokenizer st = new StringTokenizer(functions);
1148 
1149                 while (st.hasMoreElements())
1150                 {
1151                     importedFunctionList.add(st.nextElement());
1152                 }
1153             }
1154 
1155             // Get the file separator for the machine where the import file/
1156             // directory resides as this is needed to normalize the path name
1157             // and may also be needed to determine if its a relative file
1158             // name, and to get the parent directory.
1159 
1160             STAFResult result = STAXFileCache.getFileSep(
1161                 machine, this.getSTAX().getSTAFHandle());
1162 
1163             if (result.rc != STAFResult.Ok)
1164             {
1165                 String errorMsg = "Cause:  No response from machine \"" +
1166                     machine + "\" when trying to get the contents of a " +
1167                     "file specified by element type \"function-import\"." +
1168                     "\nRC: " + result.rc + ", Result: " + result.result;
1169 
1170                 functionAction.setElementInfo(
1171                     new STAXElementInfo(
1172                         evalElem, "machine", evalIndex, errorMsg));
1173 
1174                 throw new STAXFunctionImportException(
1175                     STAXUtil.formatErrorMessage(functionAction));
1176             }
1177 
1178             String fileSep = result.result;
1179 
1180             // Set a flag to indicate if the file/directory name is
1181             // case-sensitive (e.g. true if it resides on a Unix machine,
1182             // false if Windows)
1183 
1184             boolean caseSensitiveFileName = true;
1185 
1186             if (fileSep.equals("\\"))
1187             {
1188                 // Windows machine so not case-sensitive
1189 
1190                 caseSensitiveFileName = false;
1191             }
1192 
1193             // Import the file specified in the function-import element.
1194             // If the file specified in the function-import element isn't
1195             // already in the file cache, get the file and parse it.
1196             // Then, add the functions defined in this file to the main job's
1197             // function map.
1198 
1199             // If no machine attribute is specified, then the file specified
1200             // could be a relative file name so need to assign its absolute
1201             // file name
1202 
1203             if (!machineSpecified)
1204             {
1205                 // Check if a relative path was specified
1206 
1207                 String entry = file;
1208 
1209                 if (directorySpecified)
1210                     entry = directory;
1211 
1212                 if (STAXUtil.isRelativePath(entry, fileSep))
1213                 {
1214                     // Assign the absolute name assuming it is relative
1215                     // to the parent xml file's path
1216 
1217                     String currentFile = functionAction.getXmlFile();
1218 
1219                     if (currentFile.equals(STAX.INLINE_DATA))
1220                     {
1221                         // Cannot specify a relative file path if the parent
1222                         // xml file is STAX.INLINE_DATA
1223 
1224                         String errorMsg = "Cause:  Cannot specify a " +
1225                             "relative path in attribute \"" + evalAttr +
1226                             "\" for element type \"function-import\" when " +
1227                             "the parent xml file is " + STAX.INLINE_DATA;
1228 
1229                         functionAction.setElementInfo(
1230                             new STAXElementInfo(
1231                                 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1232                                 evalIndex, errorMsg));
1233 
1234                         throw new STAXFunctionImportException(
1235                             STAXUtil.formatErrorMessage(functionAction));
1236                     }
1237 
1238                     entry = STAXUtil.getParentPath(currentFile, fileSep) +
1239                         entry;
1240 
1241                     if (!directorySpecified)
1242                         file = entry;
1243                     else
1244                         directory = entry;
1245                 }
1246             }
1247 
1248             // Normalize the import file/directory name so that we have a
1249             // better chance at matching file names that are already cached
1250 
1251             if (!directorySpecified)
1252                 file = STAXUtil.normalizeFilePath(file, fileSep);
1253             else
1254                 directory = STAXUtil.normalizeFilePath(directory, fileSep);
1255 
1256             // Create a STAX XML Parser
1257 
1258             STAXParser parser = null;
1259 
1260             try
1261             {
1262                 parser = new STAXParser(getSTAX());
1263             }
1264             catch (Exception ex)
1265             {
1266                 String errorMsg = ex.getClass().getName() + "\n" +
1267                     ex.getMessage();
1268 
1269                 functionAction.setElementInfo(
1270                     new STAXElementInfo(
1271                         evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1272                         evalIndex, errorMsg));
1273 
1274                 throw new STAXFunctionImportException(
1275                     STAXUtil.formatErrorMessage(functionAction));
1276             }
1277 
1278             // Create a list of files to process
1279 
1280             List theFileList = new ArrayList();
1281 
1282             if (!directorySpecified)
1283             {
1284                 // There will be just one file to process
1285 
1286                 theFileList.add(file);
1287             }
1288             else
1289             {
1290                 // Submit a FS LIST DIRECTORY request for all *.xml files
1291                 // in the directory
1292 
1293                 result = submitSync(
1294                     machine, "FS", "LIST DIRECTORY " +
1295                     STAFUtil.wrapData(directory) +
1296                     " TYPE F EXT xml CASEINSENSITIVE");
1297 
1298                 if (result.rc != 0)
1299                 {
1300                     evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME;
1301                     String errorMsg = "Cause:  ";
1302 
1303                     if (result.rc == STAFResult.NoPathToMachine)
1304                     {
1305                         errorMsg = errorMsg + "No response from machine ";
1306                         evalAttr = "machine";
1307                     }
1308                     else
1309                     {
1310                         errorMsg = errorMsg + "Error ";
1311                     }
1312 
1313                     errorMsg = errorMsg + "when submitting a FS LIST " +
1314                         "DIRECTORY request to list all *.xml files in the " +
1315                         "directory specified by element type " +
1316                         "\"function-import\":" +
1317                         "\n  Directory: " + directory +
1318                         "\n  Machine: " + machine +
1319                         "\n\nRC: " + result.rc + ", Result: " + result.result;
1320 
1321                     functionAction.setElementInfo(
1322                         new STAXElementInfo(
1323                             evalElem, evalAttr, evalIndex, errorMsg));
1324 
1325                     throw new STAXFunctionImportException(
1326                         STAXUtil.formatErrorMessage(functionAction));
1327                 }
1328 
1329                 Iterator dirListIter = ((List)result.resultObj).iterator();
1330 
1331                 while (dirListIter.hasNext())
1332                 {
1333                     theFileList.add(directory + fileSep +
1334                                     (String)dirListIter.next());
1335                 }
1336             }
1337 
1338             Iterator fileListIter = theFileList.iterator();
1339 
1340             while (fileListIter.hasNext())
1341             {
1342                 file = (String)fileListIter.next();
1343 
1344                 Date dLastModified = null;
1345                 STAXJob job = null;
1346 
1347                 // If file caching is enabled, find the modification date of
1348                 // the file being imported
1349 
1350                 if (getSTAX().getFileCaching())
1351                 {
1352                     if (STAXFileCache.get().isLocalMachine(machine))
1353                     {
1354                         File fileObj = new File(file);
1355 
1356                         // Make sure the file exists
1357 
1358                         if (fileObj.exists())
1359                         {
1360                             long lastModified = fileObj.lastModified();
1361 
1362                             if (lastModified > 0)
1363                             {
1364                                 // Chop off the milliseconds because some
1365                                 // systems don't report modTime to milliseconds
1366 
1367                                 lastModified = ((long)(lastModified/1000))*1000;
1368 
1369                                 dLastModified = new Date(lastModified);
1370                             }
1371                         }
1372                     }
1373 
1374                     if (dLastModified == null)
1375                     {
1376                         // Find the remote file mod time using STAF
1377 
1378                         STAFResult entryResult = submitSync(
1379                             machine, "FS", "GET ENTRY " +
1380                             STAFUtil.wrapData(file) + " MODTIME");
1381 
1382                         if (entryResult.rc == 0)
1383                         {
1384                             String modDate = entryResult.result;
1385                             dLastModified = STAXFileCache.convertSTAXDate(
1386                                 modDate);
1387                         }
1388                     }
1389 
1390                     // Check for an up-to-date file in the cache
1391 
1392                     if ((dLastModified != null) &&
1393                         STAXFileCache.get().checkCache(
1394                             machine, file, dLastModified,
1395                             caseSensitiveFileName))
1396                     {
1397                         // Get the doc from cache
1398 
1399                         STAXDocument doc = STAXFileCache.get().getDocument(
1400                             machine, file, caseSensitiveFileName);
1401 
1402                         if (doc != null)
1403                         {
1404                             job = new STAXJob(getSTAX(), doc);
1405                         }
1406                     }
1407                 }
1408 
1409                 // If the file was not in cache, then retrieve it using STAF
1410 
1411                 if (job == null)
1412                 {
1413                     result = submitSync(
1414                         machine, "FS", "GET FILE " + STAFUtil.wrapData(file));
1415 
1416                     if (result.rc != 0)
1417                     {
1418                         evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME;
1419                         String errorMsg = "Cause:  ";
1420 
1421                         if (result.rc == STAFResult.NoPathToMachine)
1422                         {
1423                             errorMsg = errorMsg + "No response from machine ";
1424                             evalAttr = "machine";
1425                         }
1426                         else
1427                         {
1428                             errorMsg = errorMsg + "Error ";
1429                         }
1430 
1431                         errorMsg = errorMsg + "when submitting a FS GET " +
1432                             "request to get a file specified by element type " +
1433                             "\"function-import\":" +
1434                             "\n  File: " + file +
1435                             "\n  Machine: " + machine +
1436                             "\n\nRC: " + result.rc + ", Result: " + result.result;
1437 
1438                         functionAction.setElementInfo(
1439                             new STAXElementInfo(
1440                                 evalElem, evalAttr, evalIndex, errorMsg));
1441 
1442                         throw new STAXFunctionImportException(
1443                             STAXUtil.formatErrorMessage(functionAction));
1444                     }
1445 
1446                     // Parse the XML document
1447 
1448                     try
1449                     {
1450                         job = parser.parse(result.result, file, machine);
1451                     }
1452                     catch (Exception ex)
1453                     {
1454                         String errorMsg = "Cause: " +
1455                             ex.getClass().getName() + "\n";
1456 
1457                         // Make sure that the error message contains the File:
1458                         // and Machine: where the error occurred
1459 
1460                         if (ex.getMessage().indexOf("File: ") == -1 ||
1461                             ex.getMessage().indexOf("Machine: ") == -1)
1462                         {
1463                             errorMsg = errorMsg + "\nFile: " + file +
1464                                 ", Machine: " + machine;
1465                         }
1466 
1467                         errorMsg = errorMsg + ex.getMessage();
1468 
1469                         functionAction.setElementInfo(
1470                             new STAXElementInfo(
1471                                 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1472                                 evalIndex, errorMsg));
1473 
1474                         throw new STAXFunctionImportException(
1475                             STAXUtil.formatErrorMessage(functionAction));
1476                     }
1477 
1478                     // Add the XML document to the cache
1479 
1480                     if (getSTAX().getFileCaching() && (dLastModified != null))
1481                     {
1482                         STAXFileCache.get().addDocument(
1483                             machine, file, job.getSTAXDocument(),
1484                             dLastModified, caseSensitiveFileName);
1485                     }
1486                 }
1487 
1488                 // Get a map of the functions in this xml file being imported
1489 
1490                 HashMap functionMap = job.getSTAXDocument().getFunctionMap();
1491 
1492                 // Verify that if function names were specified in the
1493                 // function-import element, verify that all the function names
1494                 // specified exist in the xml file
1495 
1496                 if (!importedFunctionList.isEmpty())
1497                 {
1498                     Iterator functionIterator = importedFunctionList.iterator();
1499 
1500                     while (functionIterator.hasNext())
1501                     {
1502                         String functionName = (String)functionIterator.next();
1503 
1504                         if (!functionMap.containsKey(functionName))
1505                         {
1506                             // Function name specified in the function-import
1507                             // element's "functions" attribute does not exist
1508 
1509                             String errorMsg = "Cause:  Function \"" +
1510                                 functionName + "\" does not exist in file \"" +
1511                                 file + "\" on machine \"" + machine + "\".";
1512 
1513                             functionAction.setElementInfo(
1514                                 new STAXElementInfo(
1515                                     evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1516                                     evalIndex, errorMsg));
1517 
1518                             throw new STAXFunctionImportException(
1519                                 STAXUtil.formatErrorMessage(functionAction));
1520                         }
1521                     }
1522                 }
1523 
1524                 // Create a set containing the functions requested to be
1525                 // imported.  If no functions were specifically requested to
1526                 // be imported, add all the functions in the xml file to the
1527                 // set.
1528 
1529                 Set functionSet;
1530 
1531                 if (importedFunctionList.isEmpty())
1532                     functionSet = functionMap.keySet();
1533                 else
1534                     functionSet = new LinkedHashSet(importedFunctionList);
1535 
1536                 // Add the functions requested to be imported via a
1537                 // function-import element to the main job's fFunctionMap and
1538                 // add any other functions that these functions also require
1539                 // to the main job's fFunctionMap
1540 
1541                 Iterator functionIterator = functionSet.iterator();
1542                 Vector requiredFunctionList = new Vector();
1543 
1544                 while (functionIterator.hasNext())
1545                 {
1546                     String functionName = (String)functionIterator.next();
1547 
1548                     STAXFunctionAction function =
1549                         (STAXFunctionAction)functionMap.get(functionName);
1550 
1551                     // If this function does not exist, add the function to
1552                     // the job's fFunctionMap
1553 
1554                     if (!(functionExists(functionName)))
1555                     {
1556                         // Add the function to the job's function map
1557 
1558                         this.addFunction(function);
1559 
1560                         // Check if this function requires any other functions
1561                         // and if so, add them to the list of required
1562                         // functions
1563 
1564                         StringTokenizer requiredFunctions =
1565                             new StringTokenizer(function.getRequires(), " ");
1566 
1567                         while (requiredFunctions.hasMoreElements())
1568                         {
1569                             requiredFunctionList.add(
1570                                 requiredFunctions.nextElement());
1571                         }
1572 
1573                         // Recursive call to add functions this functions
1574                         // requires if specified by function-import elements
1575 
1576                         addImportedFunctions(function);
1577                     }
1578                 }
1579 
1580                 // Process required functions (functions that the imported
1581                 // functions also require)
1582 
1583                 for (int i = 0; i < requiredFunctionList.size(); i++)
1584                 {
1585                     String functionName =
1586                         (String)requiredFunctionList.elementAt(i);
1587 
1588                     if (!functionExists(functionName))
1589                     {
1590                         addRequiredFunctions(functionName, functionMap);
1591                     }
1592                 }
1593             } // End while (fileListIter.hasNext())
1594 
1595             evalIndex++;
1596         } // End while
1597     }
1598 
1599     /**
1600      * This recursive method adds the specified required function to the
1601      * main job's fFunctionMap.  And, if this function has any function-import
1602      * elements, it recursively adds these functions that are required by the
1603      * imported function.  And, if this function has any required functions
1604      * from the same xml file, it recursively adds these required functions.
1605      */
addRequiredFunctions(String functionName, HashMap functionMap)1606     private void addRequiredFunctions(String functionName, HashMap functionMap)
1607         throws STAFException, STAXException
1608     {
1609         STAXFunctionAction function =
1610             (STAXFunctionAction)functionMap.get(functionName);
1611 
1612         this.addFunction(function);
1613 
1614         // If this function has any function-import elements, recursively
1615         // add these functions from the imported xml file that are
1616         // required by the imported function to the main job's
1617         // fFunctionMap if not already present
1618 
1619         addImportedFunctions(function);
1620 
1621         // If this function has a "requires" attribute specifying
1622         // additional functions in the same xml file that it requires,
1623         // recursively add these required functions to the main job's
1624         // fFunctionMap if not already present
1625 
1626         StringTokenizer requiredFunctions = new StringTokenizer(
1627             function.getRequires(), " ");
1628 
1629         while (requiredFunctions.hasMoreElements())
1630         {
1631             String reqFunctionName = (String)requiredFunctions.nextElement();
1632 
1633             if (!functionExists(reqFunctionName))
1634             {
1635                 addRequiredFunctions(reqFunctionName, functionMap);
1636             }
1637         }
1638     }
1639 
addDefaultSignalHandlers(ArrayList actionList)1640     public void addDefaultSignalHandlers(ArrayList actionList)
1641     {
1642         // Array of default SignalHandlers.  Each signal is described by
1643         // a signal name, an action, and a signal message variable name.
1644         // If the signal message variable name is not null, a message will
1645         // be sent to the STAX Monitor and logged with level 'error'.
1646 
1647         String[][] defaultSHArray =
1648         {
1649             {
1650                 "STAXPythonEvaluationError", "terminate", "STAXPythonEvalMsg"
1651             },
1652             {
1653                 "STAXProcessStartError", "continue", "STAXProcessStartErrorMsg"
1654             },
1655             {
1656                 "STAXProcessStartTimeout", "continue",
1657                 "STAXProcessStartTimeoutMsg"
1658             },
1659             {
1660                 "STAXCommandStartError", "terminate",
1661                 "STAXCommandStartErrorMsg"
1662             },
1663             {
1664                 "STAXFunctionDoesNotExist", "terminate",
1665                 "STAXFunctionDoesNotExistMsg"
1666             },
1667             {
1668                 "STAXInvalidBlockName", "terminate", "STAXInvalidBlockNameMsg"
1669             },
1670             {
1671                 "STAXBlockDoesNotExist", "continue", "STAXBlockDoesNotExistMsg"
1672             },
1673             {
1674                 "STAXLogError", "continue", "STAXLogMsg"
1675             },
1676             {
1677                 "STAXTestcaseMissingError", "continue",
1678                 "STAXTestcaseMissingMsg"
1679             },
1680             {
1681                 "STAXInvalidTcStatusResult", "continue",
1682                 "STAXInvalidTcStatusResultMsg"
1683             },
1684             {
1685                 "STAXInvalidTimerValue", "terminate",
1686                 "STAXInvalidTimerValueMsg"
1687             },
1688             {
1689                 "STAXNoSuchSignalHandler", "continue",
1690                 "STAXNoSuchSignalHandlerMsg"
1691             },
1692             {
1693                 "STAXEmptyList", "continue", null
1694             },
1695             {
1696                 "STAXMaxThreadsExceeded", "terminate",
1697                 "STAXMaxThreadsExceededMsg"
1698             },
1699             {
1700                 "STAXInvalidMaxThreads", "terminate",
1701                 "STAXInvalidMaxThreadsMsg"
1702             },
1703             {
1704                 "STAXFunctionArgValidate", "terminate",
1705                 "STAXFunctionArgValidateMsg"
1706             },
1707             {
1708                 "STAXImportError", "terminate", "STAXImportErrorMsg"
1709             },
1710             {
1711                 "STAXFunctionImportError", "terminate",
1712                 "STAXFunctionImportErrorMsg"
1713             },
1714             {
1715                 "STAXInvalidTestcaseMode", "continue",
1716                 "STAXInvalidTestcaseModeMsg"
1717             }
1718         };
1719 
1720         // Add default SignalHandlers to actionList
1721 
1722         for (int i = 0; i < defaultSHArray.length; i++)
1723         {
1724             ArrayList signalHandlerActionList = new ArrayList();
1725 
1726             String signalName = defaultSHArray[i][0];
1727             String signalAction = defaultSHArray[i][1];
1728             String signalMsgVarName = defaultSHArray[i][2];
1729             String signalMsgText = "";
1730 
1731             if (signalAction.equals("terminate"))
1732             {
1733                 signalMsgText = "'" + signalName + " signal raised. " +
1734                     "Terminating job. '" + " + " + signalMsgVarName;
1735             }
1736             else if (signalAction.equals("continue"))
1737             {
1738                 signalMsgText = "'" + signalName + " signal raised. " +
1739                     "Continuing job. '" + " + " + signalMsgVarName;
1740             }
1741 
1742             if (signalMsgVarName == null)
1743             {
1744                 // Add a No Operation (nop) Action to the action list
1745 
1746                 signalHandlerActionList.add(new STAXNopAction());
1747             }
1748             else
1749             {
1750                 // Add a Log Action to the action list
1751 
1752                 int logfile = STAXJob.JOB_LOG;
1753 
1754                 if (signalName == "STAXLogError")
1755                 {
1756                     // Log the error message in the STAX JVM log (otherwise
1757                     // may get a duplicate STAXLogError signal)
1758 
1759                     logfile = STAXJob.JVM_LOG;
1760                 }
1761 
1762                 // Log the message with level 'Error' and send the message
1763                 // to the STAX Monitor
1764 
1765                 signalHandlerActionList.add(new STAXLogAction(
1766                     signalMsgText, "'error'", "1", "1", logfile));
1767             }
1768 
1769             if (signalAction.equals("terminate"))
1770             {
1771                 // Add a Terminate Job Action to the action list
1772 
1773                 signalHandlerActionList.add(
1774                     new STAXTerminateAction("'main'"));
1775             }
1776 
1777             // Add the signalhandler action to the action list
1778 
1779             if (signalHandlerActionList.size() == 1)
1780             {
1781                 // Don't need a STAXSequenceAction since only 1 action in list
1782 
1783                 actionList.add(new STAXSignalHandlerAction(
1784                     "'" + signalName + "'",
1785                     (STAXAction)signalHandlerActionList.get(0)));
1786             }
1787             else
1788             {
1789                 // Wrap the action list is a STAXSequenceAction
1790 
1791                 actionList.add(new STAXSignalHandlerAction(
1792                     "'" + signalName + "'",
1793                     new STAXSequenceAction(signalHandlerActionList)));
1794             }
1795         }
1796     }
1797 
1798     //
1799     // Event methods
1800     //
1801 
generateEvent(String eventSubType, Map details)1802     public void generateEvent(String eventSubType, Map details)
1803     {
1804         generateEvent(eventSubType, details, false);
1805     }
1806 
generateEvent(String eventSubType, Map details, boolean notifyAll)1807     public void generateEvent(String eventSubType, Map details,
1808                               boolean notifyAll)
1809     {
1810         StringBuffer detailsString = new StringBuffer();
1811         String key = "";
1812         String value = "";
1813 
1814         Iterator keyIter = details.keySet().iterator();
1815 
1816         while (keyIter.hasNext())
1817         {
1818             key = (String)keyIter.next();
1819 
1820             value = (String)details.get(key);
1821 
1822             if (value == null)
1823             {
1824                 // do nothing
1825             }
1826             else
1827             {
1828                 detailsString.append("PROPERTY ").append(
1829                     STAFUtil.wrapData(key + "=" + value)).append(" ");
1830             }
1831         }
1832 
1833         // The type machine should always be the local machine
1834         // The details parm must already be in the :length: format
1835         STAFResult result = submitSync(
1836                         fSTAX.getEventServiceMachine(),
1837                         fSTAX.getEventServiceName(), "GENERATE TYPE " +
1838                         fSTAX.getServiceName().toUpperCase() + "/" +
1839                         fSTAX.getLocalMachineName() + "/" + fJobNumber +
1840                         " SUBTYPE " + STAFUtil.wrapData(eventSubType) +
1841                         " ASYNC " + detailsString.toString());
1842 
1843         // Debug
1844         if (false)
1845         {
1846             System.out.println("Event:\n" +  "GENERATE TYPE " +
1847                         fSTAX.getServiceName().toUpperCase() + "/" +
1848                         fSTAX.getLocalMachineName() + "/" + fJobNumber +
1849                         " SUBTYPE " + STAFUtil.wrapData(eventSubType) +
1850                         " ASYNC " + details);
1851         }
1852 
1853         if (notifyAll)
1854         {
1855             submitSync(fSTAX.getEventServiceMachine(),
1856                        fSTAX.getEventServiceName(), "GENERATE TYPE " +
1857                        fSTAX.getServiceName().toUpperCase() + "/" +
1858                        fSTAX.getLocalMachineName() + " SUBTYPE " +
1859                        STAFUtil.wrapData(eventSubType) +
1860                        " ASYNC " + detailsString.toString());
1861 
1862             // Debug
1863             if (false)
1864             {
1865                 System.out.println("Event:\n" + "GENERATE TYPE " +
1866                             fSTAX.getServiceName().toUpperCase() + "/" +
1867                             fSTAX.getLocalMachineName() + " SUBTYPE " +
1868                             STAFUtil.wrapData(eventSubType) +
1869                             " ASYNC " + details);
1870             }
1871         }
1872     }
1873 
log(int logfile, String level, String message)1874     public STAFResult log(int logfile, String level, String message)
1875     {
1876         if (logfile == STAXJob.JVM_LOG)
1877         {
1878             // Log to the JVM log instead of a STAX Job log
1879 
1880             STAXTimestamp currentTimestamp = new STAXTimestamp();
1881 
1882             System.out.println(
1883                 currentTimestamp.getTimestampString() + " " + message);
1884 
1885             return new STAFResult();
1886         }
1887 
1888         String serviceName = fSTAX.getServiceName().toUpperCase();
1889         String logName;
1890 
1891         // Don't resolve messages logged to the STAX_Service log and to
1892         // STAX Job logs to avoid possible RC 13 errors which would prevent
1893         // the message from being logged.
1894 
1895         boolean noResolveMessage = false;
1896 
1897         if (logfile == STAXJob.SERVICE_LOG)
1898         {
1899             logName = serviceName + "_Service";
1900             noResolveMessage = true;
1901         }
1902         else if (logfile == STAXJob.JOB_LOG)
1903         {
1904             logName = serviceName + "_Job_" + fJobNumber;
1905             noResolveMessage = true;
1906         }
1907         else if (logfile == STAXJob.USER_JOB_LOG)
1908         {
1909             logName = serviceName + "_Job_" + fJobNumber + "_User";
1910         }
1911         else
1912         {
1913             // Log to STAX_Service log if invalid logfile specified
1914 
1915             STAXTimestamp currentTimestamp = new STAXTimestamp();
1916             System.out.println(currentTimestamp.getTimestampString() +
1917                                " STAX Service Error: Invalid Logfile " +
1918                                logfile);
1919 
1920             logName = serviceName + "_Service";
1921             noResolveMessage = true;
1922         }
1923 
1924         String logRequest = "LOG MACHINE LOGNAME " +
1925             STAFUtil.wrapData(logName) + " LEVEL " + level;
1926 
1927         if (noResolveMessage) logRequest += " NORESOLVEMESSAGE";
1928 
1929         logRequest += " MESSAGE " + STAFUtil.wrapData(message);
1930 
1931         STAFResult result = submitSync("LOCAL", "LOG", logRequest);
1932 
1933         // Check if the result was unsuccessful except ignore the following
1934         // errors:
1935         // - UnknownService (2) in case the LOG service is not registered
1936         // - HandleDoesNotExist (5) in case the STAX job's handle has been
1937         //   unregistered indicating the job has completed
1938 
1939         if ((result.rc != STAFResult.Ok) &&
1940             (result.rc != STAFResult.UnknownService) &&
1941             (result.rc != STAFResult.HandleDoesNotExist))
1942         {
1943             if (logfile != STAXJob.USER_JOB_LOG)
1944             {
1945                 STAXTimestamp currentTimestamp = new STAXTimestamp();
1946 
1947                 System.out.println(
1948                     currentTimestamp.getTimestampString() +
1949                     " Log request failed with RC " + result.rc +
1950                     " and Result " + result.result +
1951                     "  level: " + level +
1952                     "  logRequest: " + logRequest);
1953             }
1954             else if ((result.rc == STAFResult.VariableDoesNotExist) &&
1955                      (!noResolveMessage))
1956             {
1957                 // Retry logging the error message without resolving
1958                 // variables in the message
1959 
1960                 logRequest += " NORESOLVEMESSAGE";
1961 
1962                 result = submitSync("LOCAL", "LOG", logRequest);
1963             }
1964         }
1965 
1966         return result;
1967     }
1968 
clearLogs()1969     public void clearLogs()
1970     {
1971         String serviceName = fSTAX.getServiceName().toUpperCase();
1972         String jobLogName;
1973         String userJobLogName;
1974 
1975         jobLogName = serviceName + "_Job_" + fJobNumber;
1976         userJobLogName = serviceName + "_Job_" + fJobNumber + "_User";
1977 
1978         String logRequest = "DELETE MACHINE " +
1979             fSTAX.getLocalMachineNickname() + " LOGNAME " +
1980             STAFUtil.wrapData(jobLogName) + " CONFIRM";
1981 
1982         STAFResult result = submitSync("LOCAL", "LOG", logRequest);
1983 
1984         logRequest = "DELETE MACHINE " + fSTAX.getLocalMachineNickname() +
1985             " LOGNAME " +  STAFUtil.wrapData(userJobLogName) + " CONFIRM";
1986 
1987         result = submitSync("LOCAL", "LOG", logRequest);
1988     }
1989 
clearJobDataDir()1990     public void clearJobDataDir()
1991     {
1992         // Delete the job data directory and recreate it
1993 
1994         File dir = new File(fJobDataDir);
1995 
1996         if (dir.exists())
1997         {
1998             String deleteDirRequest = "DELETE ENTRY " +
1999                 STAFUtil.wrapData(fJobDataDir) + " RECURSE CONFIRM";
2000 
2001             submitSync("local", "FS", deleteDirRequest);
2002         }
2003 
2004         if (!dir.exists())
2005         {
2006             dir.mkdirs();
2007         }
2008     }
2009 
getBreakpointFunctionList()2010     public List getBreakpointFunctionList()
2011     {
2012         return fBreakpointFunctionList;
2013     }
2014 
setBreakpointFunctionList(List functionList)2015     public void setBreakpointFunctionList(List functionList)
2016     {
2017         synchronized(fBreakpointFunctionList)
2018         {
2019             fBreakpointFunctionList = functionList;
2020         }
2021     }
2022 
addBreakpointFunction(String functionName)2023     public int addBreakpointFunction(String functionName)
2024     {
2025         int breakpointID = getNextBreakpointNumber();
2026 
2027         synchronized(fBreakpointFunctionList)
2028         {
2029             fBreakpointFunctionList.add(functionName);
2030         }
2031 
2032         synchronized (fBreakpointsMap)
2033         {
2034             fBreakpointsMap.put(String.valueOf(breakpointID),
2035                 new STAXBreakpoint(BREAKPOINT_FUNCTION,
2036                                   functionName, "", "", ""));
2037         }
2038 
2039         return breakpointID;
2040     }
2041 
isBreakpointFunction(String functionName)2042     public boolean isBreakpointFunction(String functionName)
2043     {
2044         synchronized (fBreakpointsMap)
2045         {
2046             Iterator iter = fBreakpointsMap.keySet().iterator();
2047 
2048             while (iter.hasNext())
2049             {
2050                 String id = (String)iter.next();
2051 
2052                 STAXBreakpoint breakpoint =
2053                     (STAXBreakpoint)fBreakpointsMap.get(id);
2054 
2055                 if (functionName.equals(breakpoint.getFunction()))
2056                 {
2057                     return true;
2058                 }
2059             }
2060         }
2061         return false;
2062     }
2063 
getBreakpointLineList()2064     public List getBreakpointLineList()
2065     {
2066         return fBreakpointLineList;
2067     }
2068 
getBreakpointsMap()2069     public TreeMap getBreakpointsMap()
2070     {
2071         return fBreakpointsMap;
2072     }
2073 
setBreakpointLineList(List lineList)2074     public void setBreakpointLineList(List lineList)
2075     {
2076         synchronized(fBreakpointLineList)
2077         {
2078             fBreakpointLineList = lineList;
2079         }
2080     }
2081 
removeBreakpoint(String id)2082     public STAFResult removeBreakpoint(String id)
2083     {
2084         synchronized(fBreakpointsMap)
2085         {
2086             if (!(fBreakpointsMap.containsKey(id)))
2087             {
2088                 return new STAFResult(STAFResult.DoesNotExist, id);
2089             }
2090             else
2091             {
2092                 fBreakpointsMap.remove(id);
2093 
2094                 return new STAFResult(STAFResult.Ok);
2095             }
2096         }
2097     }
2098 
addBreakpointLine(String line, String file, String machine)2099     public int addBreakpointLine(String line, String file, String machine)
2100     {
2101         int breakpointID = getNextBreakpointNumber();
2102 
2103         synchronized(fBreakpointLineList)
2104         {
2105             fBreakpointLineList.add(line + " " + machine + " " + file);
2106         }
2107 
2108         synchronized (fBreakpointsMap)
2109         {
2110             fBreakpointsMap.put(String.valueOf(breakpointID),
2111                 new STAXBreakpoint(BREAKPOINT_LINE,
2112                                    "", line, file, machine));
2113         }
2114 
2115         return breakpointID;
2116     }
2117 
isBreakpointLine(String line, String file, String machine)2118     public boolean isBreakpointLine(String line, String file, String machine)
2119     {
2120         synchronized (fBreakpointsMap)
2121         {
2122             Iterator iter = fBreakpointsMap.keySet().iterator();
2123 
2124             while (iter.hasNext())
2125             {
2126                 String id = (String)iter.next();
2127 
2128                 STAXBreakpoint breakpoint =
2129                     (STAXBreakpoint)fBreakpointsMap.get(id);
2130 
2131                 if (line.equals(breakpoint.getLine()) &&
2132                     file.equals(breakpoint.getFile()) &&
2133                     (breakpoint.getMachine().equals("") ||
2134                     machine.equalsIgnoreCase(breakpoint.getMachine())))
2135                 {
2136                     return true;
2137                 }
2138             }
2139         }
2140         return false;
2141     }
2142 
breakpointsEmpty()2143     public boolean breakpointsEmpty()
2144     {
2145         return (fBreakpointsMap.isEmpty());
2146     }
2147 
2148     // Submit methods
2149 
submitAsync(String location, String service, String request, STAXSTAFRequestCompleteListener listener)2150     public STAFResult submitAsync(String location, String service,
2151         String request, STAXSTAFRequestCompleteListener listener)
2152     {
2153         STAFResult result;
2154 
2155         synchronized(fRequestMap)
2156         {
2157             result = fHandle.submit2(
2158                 STAFHandle.ReqQueue, location, service, request);
2159 
2160             if (result.rc == STAFResult.Ok)
2161             {
2162                 try
2163                 {
2164                     // Convert request number to a negative integer if greater
2165                     // then Integer.MAX_VALUE
2166 
2167                     Integer requestNumber = STAXUtil.convertRequestNumber(
2168                         result.result);
2169 
2170                     fRequestMap.put(requestNumber, listener);
2171 
2172                     result.result = requestNumber.toString();
2173                 }
2174                 catch (NumberFormatException e)
2175                 {
2176                     System.out.println(
2177                         (new STAXTimestamp()).getTimestampString() +
2178                         " STAXJob::submitAsync - " + e.toString());
2179 
2180                     result.result = "0";
2181                 }
2182             }
2183         }
2184 
2185         return result;
2186     }
2187 
submitSync(String location, String service, String request)2188     public STAFResult submitSync(String location, String service,
2189                                  String request)
2190     {
2191         STAFHandle theHandle = fHandle;
2192 
2193         // If the STAX job's handle has not yet been assigned, use the
2194         // STAX service's handle to submit the STAF service request
2195 
2196         if (fHandle == null)
2197             theHandle = fSTAX.getSTAFHandle();
2198 
2199         STAFResult result = theHandle.submit2(
2200             STAFHandle.ReqSync, location, service, request);
2201 
2202         return result;
2203     }
2204 
submitAsyncForget(String location, String service, String request)2205     public STAFResult submitAsyncForget(String location, String service,
2206                                         String request)
2207     {
2208         STAFResult result = fHandle.submit2(STAFHandle.ReqFireAndForget,
2209             location, service, request);
2210 
2211         return result;
2212     }
2213 
2214     // STAXSTAFQueueListener method
2215 
handleQueueMessage(STAXSTAFMessage message, STAXJob job)2216     public void handleQueueMessage(STAXSTAFMessage message, STAXJob job)
2217     {
2218         int requestNumber = message.getRequestNumber();
2219 
2220         STAXSTAFRequestCompleteListener listener = null;
2221 
2222         synchronized (fRequestMap)
2223         {
2224             Integer key = new Integer(requestNumber);
2225 
2226             listener = (STAXSTAFRequestCompleteListener)fRequestMap.get(key);
2227 
2228             if (listener != null)
2229             {
2230                 fRequestMap.remove(key);
2231             }
2232         }
2233 
2234         if (listener != null)
2235         {
2236             listener.requestComplete(requestNumber, new STAFResult(
2237                 message.getRequestRC(), message.getRequestResult()));
2238         }
2239         else
2240         {   // Log a message in the job log
2241             String msg = "STAXJob.handleQueueMessage: " +
2242                          " No listener found for message:\n" +
2243                          STAFMarshallingContext.unmarshall(
2244                              message.getResult()).toString();
2245             job.log(STAXJob.JOB_LOG, "warning", msg);
2246         }
2247     }
2248 
2249     // STAXThreadCompleteListener method
2250 
threadComplete(STAXThread thread, int endCode)2251     public void threadComplete(STAXThread thread, int endCode)
2252     {
2253         // Debug:
2254         if (false)
2255         {
2256             System.out.println("Thread #" + thread.getThreadNumber() +
2257                                " complete");
2258         }
2259 
2260         boolean jobComplete = false;
2261 
2262         synchronized (fThreadMap)
2263         {
2264             fThreadMap.remove(thread.getThreadNumberAsInteger());
2265 
2266             if (fThreadMap.isEmpty())
2267                 jobComplete = true;
2268         }
2269 
2270         if (jobComplete == true)
2271         {
2272             // Perform terminateJob on interested parties
2273 
2274             fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this)
2275             {
2276                 public void visit(Object o, Iterator iter)
2277                 {
2278                     STAXJobManagementHandler handler =
2279                                              (STAXJobManagementHandler)o;
2280 
2281                     handler.terminateJob((STAXJob)fData);
2282                 }
2283             });
2284 
2285             // Set the result so that it is available upon job completion.
2286 
2287             String resultToEval = "STAXResult";
2288 
2289             try
2290             {
2291                 if (thread.pyBoolEval("isinstance(STAXResult, STAXGlobal)"))
2292                 {
2293                     // Use the STAXGlobal class's get() method so that the job
2294                     // result when using toString() will be it's contents and
2295                     // not org.python.core.PyFinalizableInstance
2296 
2297                     resultToEval = "STAXResult.get()";
2298                 }
2299             }
2300             catch (STAXPythonEvaluationException e)
2301             {   /* Ignore error and assume not a STAXGlobal object */ }
2302 
2303             try
2304             {
2305                 fResult = thread.pyObjectEval(resultToEval);
2306             }
2307             catch (STAXPythonEvaluationException e)
2308             {
2309                 fResult = Py.None;
2310             }
2311 
2312             // Log the result from the job in a Status message in the Job log
2313 
2314             STAFMarshallingContext mc = STAFMarshallingContext.
2315                 unmarshall(fResult.toString());
2316             log(STAXJob.JOB_LOG, "status", "Job Result: " + mc);
2317 
2318             // Log a Stop message for the job in the STAX Service and Job logs
2319 
2320             String msg = "JobID: " + fJobNumber;
2321             log(STAXJob.SERVICE_LOG, "stop", msg);
2322             log(STAXJob.JOB_LOG, "stop", msg);
2323 
2324             // Get the current date and time and set as job ending date/time
2325 
2326             fEndTimestamp = new STAXTimestamp();
2327 
2328             // Generate job completion event
2329 
2330             HashMap jobEndMap = new HashMap();
2331             jobEndMap.put("type", "job");
2332             jobEndMap.put("block", "main");
2333             jobEndMap.put("status", "end");
2334             jobEndMap.put("jobID", String.valueOf(fJobNumber));
2335             jobEndMap.put("result", fResult.toString());
2336             jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString());
2337 
2338             generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true);
2339 
2340             // Query its job log for any messages with level "Error".
2341             // Save the result's marshalling context so it's available when
2342             // writing the job result to a file and when the RETURNRESULT
2343             // option is specified on a STAX EXECUTE request
2344 
2345             fJobLogErrorsMC = getJobLogErrors();
2346 
2347             // Write the job result information to files in the STAX job
2348             // directory
2349 
2350             writeJobResultsToFile();
2351 
2352             // Send a STAF/Service/STAX/End message to indicate the job has
2353             // completed
2354 
2355             submitSync("local", "QUEUE", "QUEUE TYPE STAF/Service/STAX/End " +
2356                        "MESSAGE " + STAFUtil.wrapData(""));
2357 
2358             // Debug
2359             if (COUNT_PYCODE_CACHES && STAX.CACHE_PYTHON_CODE)
2360             {
2361                 System.out.println(
2362                     "Job " + fJobNumber + ": " +
2363                     " cacheGets=" + fCompiledPyCodeCacheGets +
2364                     " cacheAdds=" + fCompiledPyCodeCacheAdds);
2365             }
2366 
2367             while (!fCompletionNotifiees.isEmpty())
2368             {
2369                 STAXJobCompleteListener listener =
2370                     (STAXJobCompleteListener)fCompletionNotifiees.removeFirst();
2371 
2372                 if (listener != null) listener.jobComplete(this);
2373             }
2374 
2375             try
2376             {
2377                 fHandle.unRegister();
2378             }
2379             catch (STAFException e)
2380             {
2381                 /* Do Nothing */
2382             }
2383 
2384             // Clear out job's private variables
2385 
2386             fScripts = new ArrayList();
2387             fScriptFiles = new ArrayList();
2388             fThreadMap = new LinkedHashMap();
2389             fCompletionNotifiees = new LinkedList();
2390             fCompiledPyCodeCache = new HashMap();
2391 
2392             // Commented out setting these variables to new (empty) objects,
2393             // as this could cause the "Testcases" and "Testcase Totals" values
2394             // in the job result to be empty due to timing issues
2395             //fTestcaseList = new ArrayList();
2396             //fTestcaseTotalsMap = new HashMap();
2397 
2398             // Commented out setting the following variables to null because
2399             // STAFQueueMonitor.notifyListeners() is running in another
2400             // thread and could access these variables and get a NPE.
2401             // fQueueListenerMap = null;
2402             // fHandle = null;
2403             // fSTAFQueueMonitor = null;
2404             // fRequestMap = new HashMap();
2405             //
2406             // synchronized (fDataMap)
2407             // {
2408             //     fDataMap = new HashMap();
2409             // }
2410         }
2411     }
2412 
2413     /**
2414      * This method is called to clean up a job that is in a pending state.
2415      * This is a job that has had a job number assigned to it but did not
2416      * start execution because either an error occurred (e.g. xml parsing,
2417      * etc) before the job actually started execution or because the TEST
2418      * option was specified, which indicates to test that the job doesn't
2419      * contain xml parsing or Python compile errors and to not actually
2420      * run the job.
2421      */
cleanupPendingJob(STAFResult result)2422     public void cleanupPendingJob(STAFResult result)
2423     {
2424         STAXJob job = fSTAX.removeFromJobMap(getJobNumberAsInteger());
2425 
2426         if (job == null)
2427         {
2428             // The job is not in the job map
2429             return;
2430         }
2431 
2432         if (result.rc != STAFResult.Ok)
2433         {
2434             setCompletionStatus(TERMINATED_STATUS);
2435 
2436             // Log the error message in the job log
2437 
2438             if (STAFMarshallingContext.isMarshalledData(result.result))
2439             {
2440                 try
2441                 {
2442                     STAFMarshallingContext mc =
2443                         STAFMarshallingContext.unmarshall(result.result);
2444                     Map resultMap = (Map)mc.getRootObject();
2445                     result.result = (String)resultMap.get("errorMsg");
2446                 }
2447                 catch (Exception e)
2448                 {
2449                     // Ignore any errors
2450                     System.out.println(e.toString());
2451                 }
2452             }
2453 
2454             log(STAXJob.JOB_LOG, "error", result.result);
2455         }
2456 
2457         // Log the testcase totals in the Job log by first calling the
2458         // STAXTestcaseActionFactory's initJob() method (to initialize the
2459         // testcase totals to 0) and then by calling its terminateJob() method
2460         // which logs the testcase totals
2461 
2462         STAXJobManagementHandler testcaseHandler =
2463             (STAXJobManagementHandler)fSTAX.getActionFactory("testcase");
2464 
2465         testcaseHandler.initJob(this);
2466         testcaseHandler.terminateJob(this);
2467 
2468         // Log the result from the job in a Status message in the Job log
2469 
2470         STAFMarshallingContext mc = STAFMarshallingContext.
2471             unmarshall(fResult.toString());
2472         log(STAXJob.JOB_LOG, "status", "Job Result: " + mc);
2473 
2474         // Log a Stop message for the job in the STAX Service and Job logs
2475 
2476         String msg = "JobID: " + fJobNumber;
2477         log(STAXJob.SERVICE_LOG, "stop", msg);
2478         log(STAXJob.JOB_LOG, "stop", msg);
2479 
2480         if (fEndTimestamp == null)
2481             fEndTimestamp = new STAXTimestamp();
2482 
2483         // Generate job completion event
2484 
2485         HashMap jobEndMap = new HashMap();
2486         jobEndMap.put("type", "job");
2487         jobEndMap.put("block", "main");
2488         jobEndMap.put("status", "end");
2489         jobEndMap.put("jobID", String.valueOf(fJobNumber));
2490         jobEndMap.put("result", fResult.toString());
2491         jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString());
2492 
2493         generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true);
2494 
2495         // Query its job log for any messages with level "Error".
2496         // Save the result's marshalling context so it's available when
2497         // writing the job result to a file and when the RETURNRESULT
2498         // option is specified on a STAX EXECUTE request
2499 
2500         fJobLogErrorsMC = getJobLogErrors();
2501 
2502         // Write the job result information to files in the STAX job
2503         // directory
2504 
2505         writeJobResultsToFile();
2506 
2507         if (fHandle != null)
2508         {
2509             // Unregister the job's handle
2510             try
2511             {
2512                 fHandle.unRegister();
2513             }
2514             catch (STAFException e)
2515             {
2516                 /* Do Nothing */
2517             }
2518         }
2519     }
2520 
2521     /**
2522      * Query the STAX Job Log to get any messages with level "Error" that
2523      * were logged for this job.
2524      *
2525      * @return STAFMarshallingContext Return a marshalling context for the
2526      * result from the LOG QUERY request whose root object is a list of
2527      * error messages.
2528      */
getJobLogErrors()2529     private STAFMarshallingContext getJobLogErrors()
2530     {
2531         String jobLogName = fSTAX.getServiceName().toUpperCase() + "_Job_" +
2532             getJobNumber();
2533 
2534         String request = "QUERY MACHINE " + fSTAX.getLocalMachineNickname() +
2535             " LOGNAME " + STAFUtil.wrapData(jobLogName) +
2536             " LEVELMASK Error" +
2537             " FROM " + getStartTimestamp().getTimestampString();
2538 
2539         STAFResult result = submitSync("local", "LOG", request);
2540 
2541         // RC 4010 from the LOG QUERY request means exceeded the default
2542         // maximum query records
2543 
2544         if ((result.rc == STAFResult.Ok) || (result.rc == 4010))
2545             return result.resultContext;
2546 
2547         // Ignore the following errors from the LOG QUERY request:
2548         // - UnknownService (2) in case the LOG service is not registered
2549         // - HandleDoesNotExist (5) in case the STAX job's handle has been
2550         //   unregistered indicating the job has completed
2551 
2552         if ((result.rc != STAFResult.UnknownService) &&
2553             (result.rc != STAFResult.HandleDoesNotExist))
2554         {
2555             // Log an error in the JVM log
2556 
2557             STAXTimestamp currentTimestamp = new STAXTimestamp();
2558 
2559             System.out.println(
2560                 currentTimestamp.getTimestampString() + " " +
2561                 "STAXJob::getJobLogErrors() failed for Job ID: " +
2562                 getJobNumber() + "\nSTAF local LOG " + request +
2563                 "  RC=" + result.rc + ", Result=" + result.result);
2564         }
2565 
2566         return null;
2567     }
2568 
2569     /*
2570      * Write the marshalled job result information to files in the STAX job
2571      * directory
2572      */
writeJobResultsToFile()2573     private void writeJobResultsToFile()
2574     {
2575         // Create the marshalling context for the results without the testcase
2576         // list
2577 
2578         STAFMarshallingContext resultMC = new STAFMarshallingContext();
2579         resultMC.setMapClassDefinition(fSTAX.fResultMapClass);
2580         resultMC.setMapClassDefinition(
2581             STAXTestcaseActionFactory.fTestcaseTotalsMapClass);
2582 
2583         Map resultMap = fSTAX.fResultMapClass.createInstance();
2584 
2585         resultMap = addCommonFieldsToResultMap(resultMap);
2586 
2587         resultMC.setRootObject(resultMap);
2588 
2589         String marshalledResultString = resultMC.marshall();
2590 
2591         // Write the marshalled string for the results without the testcase
2592         // list to file marshalledResults.txt in directory fJobDataDir
2593 
2594         writeStringToFile(fSTAX.getResultFileName(fJobNumber),
2595                           marshalledResultString);
2596 
2597         // Create the marshalling context for the results with the testcase
2598         // list
2599 
2600         resultMC = new STAFMarshallingContext();
2601         resultMC.setMapClassDefinition(fSTAX.fDetailedResultMapClass);
2602         resultMC.setMapClassDefinition(
2603             STAXTestcaseActionFactory.fTestcaseTotalsMapClass);
2604         resultMC.setMapClassDefinition(
2605             STAXTestcaseActionFactory.fQueryTestcaseMapClass);
2606 
2607         resultMap = fSTAX.fDetailedResultMapClass.createInstance();
2608 
2609         resultMap = addCommonFieldsToResultMap(resultMap);
2610 
2611         resultMap.put("testcaseList", fTestcaseList);
2612 
2613         resultMC.setRootObject(resultMap);
2614 
2615         marshalledResultString = resultMC.marshall();
2616 
2617         // Write the marshalled string for the results with the testcase
2618         // list to file marshalledResultsLong.txt in directory fJobDataDir
2619 
2620         writeStringToFile(fSTAX.getDetailedResultFileName(fJobNumber),
2621                           marshalledResultString);
2622     }
2623 
addCommonFieldsToResultMap(Map inResultMap)2624     private Map addCommonFieldsToResultMap(Map inResultMap)
2625     {
2626         Map resultMap = new HashMap(inResultMap);
2627 
2628         if (!fJobName.equals(""))
2629             resultMap.put("jobName", fJobName);
2630 
2631         resultMap.put("startTimestamp",
2632                       fStartTimestamp.getTimestampString());
2633         resultMap.put("endTimestamp",
2634                       fEndTimestamp.getTimestampString());
2635         resultMap.put("status", getCompletionStatusAsString());
2636         resultMap.put("result", fResult.toString());
2637         resultMap.put("testcaseTotals", fTestcaseTotalsMap);
2638         resultMap.put("jobLogErrors", fJobLogErrorsMC);
2639         resultMap.put("xmlFileName", fXmlFile);
2640         resultMap.put("fileMachine", fXmlMachine);
2641         resultMap.put("function", getStartFunction());
2642         resultMap.put("arguments",
2643                       STAFUtil.maskPrivateData(getStartFuncArgs()));
2644         resultMap.put("scriptList", fScripts);
2645         resultMap.put("scriptFileList", fScriptFiles);
2646 
2647         if (!fScriptFileMachine.equals(""))
2648             resultMap.put("scriptMachine", fScriptFileMachine);
2649 
2650         return resultMap;
2651     }
2652 
2653     /**
2654      *  Write the contents of a string to the specified file name
2655      */
writeStringToFile(String fileName, String data)2656     private void writeStringToFile(String fileName, String data)
2657     {
2658         FileWriter out = null;
2659 
2660         try
2661         {
2662             File outFile = new File(fileName);
2663             out = new FileWriter(outFile);
2664             out.write(data);
2665         }
2666         catch (IOException e)
2667         {
2668             log(STAXJob.JVM_LOG, "Error",
2669                 "Error writing to job results file: " + fileName + ".\n" +
2670                 e.toString());
2671         }
2672         finally
2673         {
2674             try
2675             {
2676                 if (out != null) out.close();
2677             }
2678             catch (IOException e)
2679             {
2680                 // Do nothing
2681             }
2682         }
2683     }
2684 
2685     private STAX fSTAX;
2686     private STAXDocument fDocument;
2687     private STAXCallAction fDefaultCallAction = null;
2688     private Object fNextThreadNumberSynch = new Object();
2689     private int fNextThreadNumber = 1;
2690     private int fJobNumber = 0;
2691     private Object fNextProcNumberSynch = new Object();
2692     private int fProcNumber = 1;
2693     private Object fNextCmdNumberSynch = new Object();
2694     private int fCmdNumber = 1;
2695     private Object fNextProcessKeySynch = new Object();
2696     private int fProcessKey = 1;
2697     private String fJobDataDir = new String();
2698     private String fJobName = new String();
2699     private String fXmlMachine = new String();
2700     private String fXmlFile = new String();
2701     private boolean fClearlogs;
2702     private int fMaxSTAXThreads;
2703     private String fWaitTimeout = null;
2704     private String fStartFunction = new String();
2705     private String fStartFuncArgs = new String();
2706     private List fScripts = new ArrayList();
2707     private List fScriptFiles = new ArrayList();
2708     private String fScriptFileMachine = new String();
2709     private Map fThreadMap = new LinkedHashMap();
2710     private STAFHandle fHandle = null;
2711     private STAFQueueMonitor fSTAFQueueMonitor = null;
2712     private LinkedList fCompletionNotifiees = new LinkedList();
2713     private boolean fExecuteAndHold = false;
2714     private String fSourceMachine = new String();  // Source == Originating
2715     private String fSourceHandleName = new String();
2716     private int fSourceHandle;
2717     private int fNotifyOnEnd = STAXJob.NO_NOTIFY_ONEND;
2718     private int fState = PENDING_STATE;
2719     private PyObject fResult = Py.None;
2720     private int fCompletionStatus = STAXJob.NORMAL_STATUS;
2721     private STAXTimestamp fStartTimestamp;
2722     private STAXTimestamp fEndTimestamp;
2723     private HashMap fQueueListenerMap = new HashMap();
2724     private HashMap fDataMap = new HashMap();
2725     private HashMap fRequestMap = new HashMap(); // Map of active STAF requests
2726     private boolean fLogTCElapsedTime;
2727     private boolean fLogTCNumStarts;
2728     private boolean fLogTCStartStop;
2729     private int fPythonOutput;
2730     private String fPythonLogLevel;
2731     private HashMap fCompiledPyCodeCache = new HashMap();
2732     private long fCompiledPyCodeCacheAdds = 0;
2733     private long fCompiledPyCodeCacheGets = 0;
2734     private List fTestcaseList = new ArrayList();
2735     private List fBreakpointFunctionList = new ArrayList();
2736     private List fBreakpointLineList = new ArrayList();
2737     private Map fTestcaseTotalsMap = new HashMap();
2738     private TreeMap fBreakpointsMap = new TreeMap();
2739     private Object fNextBreakpointNumberSynch = new Object();
2740     private int fNextBreakpointNumber = 1;
2741     private boolean fBreakpointFirstFunction = false;
2742     private boolean fBreakpointSubjobFirstFunction = false;
2743     private STAFMarshallingContext fJobLogErrorsMC =
2744         new STAFMarshallingContext();
2745 
2746     class STAFQueueMonitor extends Thread
2747     {
STAFQueueMonitor(STAXJob job)2748         STAFQueueMonitor(STAXJob job)
2749         {
2750             fJob = job;
2751         }
2752 
2753         /**
2754          *  Log an error message in the JVM log and the STAX Job log
2755          */
logMessage(String message, Throwable t)2756         public void logMessage(String message, Throwable t)
2757         {
2758             STAXTimestamp currentTimestamp = new STAXTimestamp();
2759 
2760             message = "STAXJob$STAFQueueMonitor.run(): " + message;
2761 
2762             if (t != null)
2763             {
2764                 // Add the Java stack trace to the message
2765 
2766                 StringWriter sw = new StringWriter();
2767                 t.printStackTrace(new PrintWriter(sw));
2768 
2769                 if (t.getMessage() != null)
2770                     message += "\n" + t.getMessage() + "\n" + sw.toString();
2771                 else
2772                     message += "\n" + sw.toString();
2773             }
2774 
2775             // Truncate the message to a maximum size of 3000 (in case
2776             // the result from the QUEUE GET WAIT request is very large)
2777 
2778             if (message.length() > 3000)
2779                 message = message.substring(0, 3000) + "...";
2780 
2781             // Log an error message in the STAX JVM log
2782 
2783             System.out.println(
2784                 currentTimestamp.getTimestampString() +
2785                 " Error: STAX Job ID " + fJob.getJobNumber() + ". " +
2786                 message);
2787 
2788             // Log an error message in the STAX Job log
2789 
2790             fJob.log(STAXJob.JOB_LOG, "Error", message);
2791         }
2792 
run()2793         public void run()
2794         {
2795             STAFMarshallingContext mc;
2796             List messageList;
2797             int maxMessages = fJob.getSTAX().getMaxGetQueueMessages();
2798             String request = "GET WAIT FIRST " + maxMessages;
2799             STAFResult result;
2800             int numErrors = 0;
2801 
2802             // Maximum consecutive errors submitting a local QUEUE GET WAIT
2803             // request before we decide to exit the infinite loop
2804             int maxErrors = 5;
2805 
2806             // Process messages on the STAX job handle's queue until we get
2807             // a "STAF/Service/STAX/End" message or until an error occurs 5
2808             // consecutive times submitting a STAF local QUEUE GET request
2809             // (so that we don't get stuck in an infinite loop eating CPU).
2810 
2811             for (;;)
2812             {
2813                 result = null;
2814 
2815                 try
2816                 {
2817                     // For better performance when more than 1 message is on
2818                     // the queue waiting to be processed, get multiple
2819                     // messages off the queue at a time (up to maxMessages)
2820                     // so that the total size of the messages hopefully won't
2821                     // cause an OutOfMemory problem.
2822                     // Note:  If an error occurs unmarshalling the list of
2823                     // messages, all these messages will be lost.
2824 
2825                     result = submitSync("local", "QUEUE", request);
2826 
2827                     if (result == null)
2828                     {
2829                         numErrors++;
2830 
2831                         logMessage(
2832                             "STAF local QUEUE " + request +
2833                             " returned null. This may have been caused by " +
2834                             "running out of memory creating the result.",
2835                             null);
2836 
2837                         if (numErrors < maxErrors)
2838                         {
2839                             continue;
2840                         }
2841                         else
2842                         {
2843                             logMessage(
2844                                 "Exiting this thread after the QUEUE GET " +
2845                                 "request failed " + maxErrors +
2846                                 " consecutive times.", null);
2847 
2848                             return;  // Exit STAFQueueMonitor thread
2849                         }
2850                     }
2851 
2852                     if (result.rc == STAFResult.Ok)
2853                     {
2854                         numErrors = 0;
2855                     }
2856                     else if (result.rc == STAFResult.HandleDoesNotExist)
2857                     {
2858                         // This means that the STAX job's handle has been
2859                         // unregistered which means that the STAX job is no
2860                         // longer running so we should exit this thread.
2861                         // We've seen this happen before this thread gets
2862                         // the message with type "STAF/Service/STAX/End" off
2863                         // the queue.
2864 
2865                         return;  // Exit STAFQueueMonitor thread
2866                     }
2867                     else
2868                     {
2869                         numErrors++;
2870 
2871                         logMessage(
2872                             "STAF local QUEUE " + request +
2873                             " failed with RC=" + result.rc + ", Result=" +
2874                             result.result, null);
2875 
2876                         if (numErrors < maxErrors)
2877                         {
2878                             continue;
2879                         }
2880                         else
2881                         {
2882                             logMessage(
2883                                 "Exiting this thread after the QUEUE GET " +
2884                                 "request failed " + maxErrors +
2885                                 " consecutive times", null);
2886 
2887                             return;  // Exit STAFQueueMonitor thread
2888                         }
2889                     }
2890                 }
2891                 catch (Throwable t)
2892                 {
2893                     // Note: One possible reason for an exception occurring
2894                     // submitting a QUEUE GET WAIT request can be because
2895                     // an error occurred when the result was auto-unmarshalled
2896                     // due to invalid marshalled data in the result (though
2897                     // in STAF V3.3.3 a fix was made to the Java unmarshall
2898                     // method so that this problem should no longer occur).
2899 
2900                     numErrors++;
2901 
2902                     logMessage(
2903                         "Exception getting message(s) off the queue.", t);
2904 
2905                     if (numErrors < maxErrors)
2906                     {
2907                         continue;
2908                     }
2909                     else
2910                     {
2911                         logMessage(
2912                             "Exiting this thread after the QUEUE GET " +
2913                             "request failed " + maxErrors +
2914                             " consecutive times", null);
2915 
2916                         return;  // Exit STAFQueueMonitor thread
2917                     }
2918                 }
2919 
2920                 try
2921                 {
2922                     // Unmarshall the result from a QUEUE GET request, but
2923                     // don't unmarshall indirect objects
2924 
2925                     mc = STAFMarshallingContext.unmarshall(
2926                         result.result,
2927                         STAFMarshallingContext.IGNORE_INDIRECT_OBJECTS);
2928 
2929                     messageList = (List)mc.getRootObject();
2930                 }
2931                 catch (Throwable t)
2932                 {
2933                     // Log an error message and continue
2934 
2935                     logMessage(
2936                         "Exception unmarshalling queued messages. " +
2937                         "\nMarshalled string: " + result.result, t);
2938                     continue;
2939                 }
2940 
2941                 // Iterate through the list of messages removed from our
2942                 // handle's queue and process each message
2943 
2944                 Iterator iter = messageList.iterator();
2945                 STAXSTAFMessage msg;
2946 
2947                 while (iter.hasNext())
2948                 {
2949                     // Process the message
2950 
2951                     try
2952                     {
2953                         msg = new STAXSTAFMessage((Map)iter.next());
2954                     }
2955                     catch (Throwable t)
2956                     {
2957                         // Log an error message and continue
2958 
2959                         logMessage("Exception handling a queued message.", t);
2960                         continue;
2961                     }
2962 
2963                     String type = msg.getType();
2964 
2965                     if (type != null &&
2966                         type.equalsIgnoreCase("STAF/Service/STAX/End"))
2967                     {
2968                         return;  // Exit STAFQueueMonitorThread
2969                     }
2970 
2971                     try
2972                     {
2973                         notifyListeners(msg);
2974                     }
2975                     catch (Throwable t)
2976                     {
2977                         // Log an error message and continue
2978 
2979                         logMessage(
2980                             "Exception notifying listeners for a message " +
2981                             "with type '" + type +
2982                             "' from handle " + msg.getHandle() +
2983                             " on machine " + msg.getMachine() +
2984                             " \nMessage: " + msg.getMessage(), t);
2985                         continue;
2986                     }
2987                 }
2988             }
2989         }
2990 
notifyListeners(STAXSTAFMessage msg)2991         public void notifyListeners(STAXSTAFMessage msg)
2992         {
2993             // Perform a lookup of registered message handlers for
2994             // this message type and pass the STAXSTAFMessage to
2995             // the registered message handlers.
2996 
2997             String theType = msg.getType();
2998             int listenersFound = 0;
2999 
3000             synchronized (fQueueListenerMap)
3001             {
3002                 Iterator mapIter = fQueueListenerMap.keySet().iterator();
3003 
3004                 while (mapIter.hasNext())
3005                 {
3006                     String msgType = (String)mapIter.next();
3007 
3008                     if ((msgType == null) ||
3009                         // Messages from 3.x clients that STAX is interested
3010                         // in will have a type
3011                         (theType != null &&
3012                          theType.equalsIgnoreCase(msgType)) ||
3013                         // The following is for messages from 2.x clients
3014                         // which will have a null type and the message will
3015                         // begin with the type (which for processes will be
3016                         // STAF/PROCESS/END instead of STAF/Process/End
3017                         (theType == null && msg.getMessage().toUpperCase().
3018                          startsWith(msgType.toUpperCase())))
3019                     {
3020                         TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(
3021                             msgType);
3022 
3023                         if (listenerSet == null) continue;
3024 
3025                         Iterator iter = listenerSet.iterator();
3026 
3027                         while (iter.hasNext())
3028                         {
3029                             STAXSTAFQueueListener listener =
3030                                 (STAXSTAFQueueListener)iter.next();
3031 
3032                             listener.handleQueueMessage(msg, fJob);
3033                             listenersFound++;
3034                         }
3035                     }
3036                 }
3037 
3038                 if ((listenersFound == 0) &&
3039                     (theType.equals("STAF/RequestComplete") ||
3040                      theType.equals("STAF/Process/End")))
3041                 {
3042                     // Log a warning message in the job log as this could
3043                     // indicate a problem in the how the STAX service is
3044                     // handling these messages
3045 
3046                     try
3047                     {
3048                         fJob.log(STAXJob.JOB_LOG, "warning",
3049                                  "STAXJob.notifyListeners: " +
3050                                  "No listener found for message:\n" +
3051                                  STAFMarshallingContext.unmarshall(
3052                                      msg.getResult()).toString());
3053                     }
3054                     catch (Throwable t)
3055                     {
3056                         // Ignore
3057                     }
3058                 }
3059             }
3060         }
3061 
3062         STAXJob fJob;
3063 
3064     } // end STAFQueueMonitor
3065 
3066 }
3067