1 /*****************************************************************************/
2 /* Software Testing Automation Framework (STAF)                              */
3 /* (C) Copyright IBM Corp. 2002, 2004, 2005                                  */
4 /*                                                                           */
5 /* This software is licensed under the Eclipse Public License (EPL) V1.0.    */
6 /*****************************************************************************/
7 
8 package com.ibm.staf.service.stax;
9 import com.ibm.staf.*;
10 import com.ibm.staf.wrapper.STAFLog;
11 import java.util.Map;
12 import java.util.HashMap;
13 import java.util.LinkedHashMap;
14 import java.util.TreeMap;
15 import java.util.TreeSet;
16 import java.util.LinkedHashSet;
17 import java.util.LinkedList;
18 import java.util.ArrayList;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Comparator;
22 import java.io.File;
23 import java.io.FileWriter;
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 
28 import java.util.Vector;
29 import java.util.StringTokenizer;
30 import java.util.Date;
31 import java.util.Set;
32 
33 import org.python.core.Py;
34 
35 import org.python.core.PyObject;
36 import org.python.core.PyCode;
37 import org.python.core.CompilerFlags;
38 
39 /* Jython 2.1:
40 import org.python.core.__builtin__;
41 */
42 // Jython 2.5:
43 import org.python.core.CompileMode;
44 
45 /**
46  * The representation of a STAX job.  A STAX job is created by the STAXParser
47  * when the STAX service receives an EXECUTE request specifying a file or string
48  * containing XML that defines a STAX job.
49  */
50 public class STAXJob implements STAXThreadCompleteListener,
51                                 STAXSTAFQueueListener
52 {
53     // For debugging - Counts and prints the number of cache gets/adds
54     static final boolean COUNT_PYCODE_CACHES = false;
55 
56     static final String STAX_JOB_EVENT = new String("Job");
57 
58     // Logfile types:
59 
60     /**
61      * Indicates to log to the STAX Service Log file
62      */
63     static final int SERVICE_LOG = 1;
64 
65     /**
66      * Indicates to log to the STAX Job Log file
67      */
68     public static final int JOB_LOG = 2;
69 
70     /**
71      * Indicates to log to the STAX Job User Log file
72      */
73     public static final int USER_JOB_LOG = 3;
74 
75     /**
76      * Indicates to log to the JVM Log file
77      */
78     public static final int JVM_LOG = 4;
79 
80     public static final int NO_NOTIFY_ONEND = 0;
81     public static final int NOTIFY_ONEND_BY_HANDLE = 1;
82     public static final int NOTIFY_ONEND_BY_NAME = 2;
83 
84     /**
85      * Job completion status codes and their string values
86      */
87     public static final int ABNORMAL_STATUS = -1;
88     public static final int NORMAL_STATUS = 0;
89     public static final int TERMINATED_STATUS = 1;
90     public static final int UNKNOWN_STATUS = 2;
91 
92     public static final String ABNORMAL_STATUS_STRING = "Abnormal";
93     public static final String NORMAL_STATUS_STRING = "Normal";
94     public static final String TERMINATED_STATUS_STRING = "Terminated";
95     public static final String UNKNOWN_STATUS_STRING = "Unknown";
96 
97     /**
98      * Job state codes and their string values
99      */
100     public static final int PENDING_STATE = 0;
101     public static final int RUNNING_STATE = 1;
102     public static final String PENDING_STATE_STRING = "Pending";
103     public static final String RUNNING_STATE_STRING = "Running";
104 
105     static final int BREAKPOINT_FUNCTION = 0;
106     static final int BREAKPOINT_LINE = 1;
107 
108     /**
109       * Creates a new STAXJob instance passing in a STAX object which
110       * represents the STAX service that is executing this job.
111       */
STAXJob(STAX staxService)112     public STAXJob(STAX staxService) {
113         this(staxService, new STAXDocument());
114     }
115 
116     /**
117      * Creates a new STAXJob instance using an existing STAX
118      * document.
119      *
120      * @param staxService the STAX service.
121      * @param document the STAX document to be executed by
122      * this job.
123      */
STAXJob(STAX staxService, STAXDocument document)124     public STAXJob(STAX staxService, STAXDocument document) {
125         fSTAX = staxService;
126         fDocument = document;
127         STAXThread thread = new STAXThread(this);
128         thread.addCompletionNotifiee(this);
129         fThreadMap.put(thread.getThreadNumberAsInteger(), thread);
130         fClearlogs = fSTAX.getClearlogs();
131         fLogTCElapsedTime = fSTAX.getLogTCElapsedTime();
132         fLogTCNumStarts   = fSTAX.getLogTCNumStarts();
133         fLogTCStartStop   = fSTAX.getLogTCStartStop();
134         fPythonOutput     = fSTAX.getPythonOutput();
135         fPythonLogLevel   = fSTAX.getPythonLogLevel();
136         fInvalidLogLevelAction = fSTAX.getInvalidLogLevelAction();
137         fMaxSTAXThreads   = fSTAX.getMaxSTAXThreads();
138     }
139 
140     /**
141      * Gets the STAX object which represents the job's STAX service
142      * @return an instance of the job's STAX service
143      */
getSTAX()144     public STAX getSTAX() { return fSTAX; }
145 
146     /**
147      * Gets the STAX document that is executed by this job.
148      *
149      * @return the job's STAX document.
150      */
getSTAXDocument()151     public STAXDocument getSTAXDocument() { return fDocument; }
152 
153     /**
154      * Sets the STAX document to be executed by this job
155      * @param document the STAX document to be executed by this job
156      */
setSTAXDocument(STAXDocument document)157     public void setSTAXDocument(STAXDocument document)
158     {
159         fDocument = document;
160     }
161 
162     /**
163      * Gets the next number for a thread in a job
164      * @return a number for the next thread in a job
165      */
getNextThreadNumber()166     public int getNextThreadNumber()
167     {
168         synchronized (fNextThreadNumberSynch)
169         {
170             return fNextThreadNumber++;
171         }
172     }
173 
174     /**
175      * Gets the next number for a breakpoint in a job
176      * @return a number for the next breakpoint in a job
177      */
getNextBreakpointNumber()178     public int getNextBreakpointNumber()
179     {
180         synchronized (fNextBreakpointNumberSynch)
181         {
182             return fNextBreakpointNumber++;
183         }
184     }
185 
setDefaultCallAction(STAXCallAction action)186     public void setDefaultCallAction(STAXCallAction action)
187     {
188         fDefaultCallAction = action;
189     }
190 
191     /**
192      * Gets the name of the function that should be called to start the
193      * execution of a job
194      * @return the name of the starting function for a job
195      */
getStartFunction()196     public String getStartFunction() { return fDocument.getStartFunction(); }
197 
198     /**
199      * Sets the name of the function that should be called to start the
200      * execution of a job
201      * @param  startFunction   the name of the starting function for a job
202      */
setStartFunction(String startFunction)203     public void setStartFunction(String startFunction)
204     { fDocument.setStartFunction(startFunction); }
205 
getStartFuncArgs()206     public String getStartFuncArgs() { return fDocument.getStartFunctionArgs(); }
207 
setStartFuncArgs(String startFuncArgs)208     public void setStartFuncArgs(String startFuncArgs)
209     { fDocument.setStartFuncArgs(startFuncArgs); }
210 
setExecuteAndHold(String holdTimeout)211     public void setExecuteAndHold(String holdTimeout)
212     { fExecuteAndHold = holdTimeout; }
213 
214     // Sets the value of the FUNCTION option on a STAX EXECUTE request
setStartFunctionOverride(String startFunction)215     public void setStartFunctionOverride(String startFunction)
216     { fStartFunction = startFunction; }
217 
getStartFunctionOverride()218     public String getStartFunctionOverride() { return fStartFunction; }
219 
220     // Sets the value of the ARGS option on a STAX EXECUTE request
setStartFuncArgsOverride(String startFuncArgs)221     public void setStartFuncArgsOverride(String startFuncArgs)
222     { fStartFuncArgs = startFuncArgs; }
223 
getStartFuncArgsOverride()224     public String getStartFuncArgsOverride() { return fStartFuncArgs; }
225 
getJobName()226     public String getJobName() { return fJobName; }
227 
setJobName(String jobName)228     public void setJobName(String jobName) { fJobName = jobName; }
229 
getJobNumber()230     public int getJobNumber() { return fJobNumber; }
231 
setJobNumber(int jobNumber)232     public void setJobNumber(int jobNumber) { fJobNumber = jobNumber; }
233 
getNextProcNumber()234     public int getNextProcNumber()
235     {
236         synchronized (fNextProcNumberSynch)
237         {
238             return fProcNumber++;
239         }
240     }
241 
getNextCmdNumber()242     public int getNextCmdNumber()
243     {
244         synchronized (fNextCmdNumberSynch)
245         {
246             return fCmdNumber++;
247         }
248     }
249 
getNextProcessKey()250     public int getNextProcessKey()
251     {
252         synchronized (fNextProcessKeySynch)
253         {
254             return fProcessKey++;
255         }
256     }
getJobDataDir()257     public String getJobDataDir() { return fJobDataDir; }
258 
setJobDataDir(String jobDataDir)259     public void setJobDataDir(String jobDataDir) { fJobDataDir = jobDataDir; }
260 
getXmlMachine()261     public String getXmlMachine() { return fXmlMachine; }
262 
setXmlMachine(String machName)263     public void setXmlMachine(String machName)
264     { fXmlMachine = machName; }
265 
getXmlFile()266     public String getXmlFile() { return fXmlFile; }
267 
setXmlFile(String fileName)268     public void setXmlFile(String fileName) { fXmlFile = fileName; }
269 
getScripts()270     public List<String> getScripts() { return fScripts; }
271 
setScript(String script)272     public void setScript(String script) { fScripts.add(script); }
273 
getScriptFiles()274     public List<String> getScriptFiles() { return fScriptFiles; }
275 
setScriptFile(String fileName)276     public void setScriptFile(String fileName) { fScriptFiles.add(fileName); }
277 
getScriptFileMachine()278     public String getScriptFileMachine() { return fScriptFileMachine; }
279 
setScriptFileMachine(String machName)280     public void setScriptFileMachine(String machName)
281     { fScriptFileMachine = machName; }
282 
getSourceMachine()283     public String getSourceMachine() { return fSourceMachine; }
284 
setSourceMachine(String machName)285     public void setSourceMachine(String machName)
286     { fSourceMachine = machName; }
287 
getSourceHandleName()288     public String getSourceHandleName() { return fSourceHandleName; }
289 
setSourceHandleName(String handleName)290     public void setSourceHandleName(String handleName)
291     { fSourceHandleName = handleName; }
292 
getSourceHandle()293     public int getSourceHandle() { return fSourceHandle; }
294 
setSourceHandle(int handle)295     public void setSourceHandle(int handle)
296     { fSourceHandle = handle; }
297 
getClearlogs()298     public boolean getClearlogs() { return fClearlogs; }
299 
getClearLogsAsString()300     public String getClearLogsAsString()
301     {
302         if (fClearlogs)
303             return "Enabled";
304         else
305             return "Disabled";
306     }
307 
setClearlogs(boolean clearlogs)308     public void setClearlogs(boolean clearlogs)
309     { fClearlogs = clearlogs; }
310 
getMaxSTAXThreads()311     public int getMaxSTAXThreads() { return fMaxSTAXThreads; }
312 
getWaitTimeout()313     public String getWaitTimeout() { return fWaitTimeout; }
314 
setWaitTimeout(String timeout)315     public void setWaitTimeout(String timeout)
316     { fWaitTimeout = timeout; }
317 
getNotifyOnEnd()318     public int getNotifyOnEnd() { return fNotifyOnEnd; }
319 
getNotifyOnEndAsString()320     public String getNotifyOnEndAsString()
321     {
322         if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_NAME)
323             return "By Name";
324         else if (fNotifyOnEnd == STAXJob.NOTIFY_ONEND_BY_HANDLE)
325             return "By Handle";
326         else
327             return "No";
328     }
329 
setNotifyOnEnd(int notifyFlag)330     public void setNotifyOnEnd(int notifyFlag) { fNotifyOnEnd = notifyFlag; }
331 
getState()332     public int getState() { return fState; }
333 
getStateAsString()334     public String getStateAsString()
335     {
336         if (fState == PENDING_STATE)
337             return PENDING_STATE_STRING;
338         else
339             return RUNNING_STATE_STRING;
340     }
341 
setState(int state)342     public void setState(int state) { fState = state; }
343 
getResult()344     public PyObject getResult() { return fResult; }
345 
setResult(PyObject result)346     public void setResult(PyObject result)
347     { fResult = result; }
348 
getCompletionStatus()349     public int getCompletionStatus() { return fCompletionStatus; }
350 
getCompletionStatusAsString()351     public String getCompletionStatusAsString()
352     {
353         if (fCompletionStatus == STAXJob.NORMAL_STATUS)
354             return STAXJob.NORMAL_STATUS_STRING;
355         else if (fCompletionStatus == STAXJob.TERMINATED_STATUS)
356             return STAXJob.TERMINATED_STATUS_STRING;
357         else if (fCompletionStatus == STAXJob.ABNORMAL_STATUS)
358             return STAXJob.ABNORMAL_STATUS_STRING;
359         else
360             return STAXJob.UNKNOWN_STATUS_STRING;
361     }
362 
setCompletionStatus(int status)363     public void setCompletionStatus(int status)
364     {
365         fCompletionStatus = status;
366     }
367 
getJobLogErrorsMC()368     public STAFMarshallingContext getJobLogErrorsMC()
369     {
370         return fJobLogErrorsMC;
371     }
372 
getTestcaseList()373     public List<Map<String, Object>> getTestcaseList() { return fTestcaseList; }
374 
setTestcaseList(List<Map<String, Object>> testcaseList)375     public void setTestcaseList(List<Map<String, Object>> testcaseList)
376     { fTestcaseList = testcaseList; }
377 
getTestcaseTotalsMap()378     public Map<String, String> getTestcaseTotalsMap()
379     { return fTestcaseTotalsMap; }
380 
setTestcaseTotalsMap(Map<String, String> testcaseTotalsMap)381     public void setTestcaseTotalsMap(Map<String, String> testcaseTotalsMap)
382     { fTestcaseTotalsMap = testcaseTotalsMap; }
383 
getStartTimestamp()384     public STAXTimestamp getStartTimestamp() { return fStartTimestamp; }
385 
setStartTimestamp()386     public void setStartTimestamp()
387     {
388         // Get the current date and time and set as the starting date/time
389         fStartTimestamp = new STAXTimestamp();
390     }
391 
getEndTimestamp()392     public STAXTimestamp getEndTimestamp() { return fEndTimestamp; }
393 
getJobNumberAsInteger()394     public Integer getJobNumberAsInteger() { return new Integer(fJobNumber); }
395 
getSTAFHandle()396     public STAFHandle getSTAFHandle() { return fHandle; }
397 
setSTAFHandle()398     public void setSTAFHandle() throws STAFException
399     {
400         fHandle = new STAFHandle("STAX/Job/" + fJobNumber);
401     }
402 
getLogTCElapsedTime()403     public boolean getLogTCElapsedTime() { return fLogTCElapsedTime; }
404 
getLogTCElapsedTimeAsString()405     public String getLogTCElapsedTimeAsString()
406     {
407         if (fLogTCElapsedTime)
408             return "Enabled";
409         else
410             return "Disabled";
411     }
412 
setLogTCElapsedTime(boolean logTCElapsedTime)413     public void setLogTCElapsedTime(boolean logTCElapsedTime)
414     { fLogTCElapsedTime = logTCElapsedTime; }
415 
getLogTCNumStarts()416     public boolean getLogTCNumStarts() { return fLogTCNumStarts; }
417 
getLogTCNumStartsAsString()418     public String getLogTCNumStartsAsString()
419     {
420         if (fLogTCNumStarts)
421             return "Enabled";
422         else
423             return "Disabled";
424     }
425 
setLogTCNumStarts(boolean logTCNumStarts)426     public void setLogTCNumStarts(boolean logTCNumStarts)
427     { fLogTCNumStarts = logTCNumStarts; }
428 
getLogTCStartStop()429     public boolean getLogTCStartStop() { return fLogTCStartStop; }
430 
getLogTCStartStopAsString()431     public String getLogTCStartStopAsString()
432     {
433         if (fLogTCStartStop)
434             return "Enabled";
435         else
436             return "Disabled";
437     }
438 
setLogTCStartStop(boolean logTCStartStop)439     public void setLogTCStartStop(boolean logTCStartStop)
440     { fLogTCStartStop = logTCStartStop; }
441 
getPythonOutput()442     public int getPythonOutput() { return fPythonOutput; }
443 
setPythonOutput(int pythonOutput)444     public void setPythonOutput(int pythonOutput)
445     {
446         fPythonOutput = pythonOutput;
447     }
448 
getPythonLogLevel()449     public String getPythonLogLevel() { return fPythonLogLevel; }
450 
setPythonLogLevel(String logLevel)451     public void setPythonLogLevel(String logLevel)
452     {
453         fPythonLogLevel = logLevel;
454     }
455 
getInvalidLogLevelAction()456     public int getInvalidLogLevelAction() { return fInvalidLogLevelAction; }
457 
setInvalidLogLevelAction(int action)458     public void setInvalidLogLevelAction(int action)
459     {
460         fInvalidLogLevelAction = action;
461     }
462 
getNumThreads()463     public int getNumThreads()
464     {
465         return fThreadMap.size();
466     }
467 
getThreadMapCopy()468     public Map<Integer, STAXThread> getThreadMapCopy()
469     {
470         return fThreadMap;
471     }
472 
getThread(Integer threadNumber)473     public STAXThread getThread(Integer threadNumber)
474     {
475         synchronized (fThreadMap)
476         {
477             return fThreadMap.get(threadNumber);
478         }
479     }
480 
addThread(STAXThread thread)481     public void addThread(STAXThread thread)
482     {
483         synchronized (fThreadMap)
484         {
485             fThreadMap.put(thread.getThreadNumberAsInteger(), thread);
486         }
487     }
488 
addThreadIfDoesNotExceedMax(STAXThread thread)489     public void addThreadIfDoesNotExceedMax(STAXThread thread)
490         throws STAXExceedsMaxThreadsException
491     {
492         synchronized (fThreadMap)
493         {
494             if ((fMaxSTAXThreads != 0) &&
495                 (fThreadMap.size() >= fMaxSTAXThreads))
496             {
497                 throw new STAXExceedsMaxThreadsException(
498                     "Exceeded MaxSTAXThreads=" + fMaxSTAXThreads +
499                     " (the maximum number of STAX Threads that " +
500                     "can be running simultaneously in a job).");
501             }
502 
503             fThreadMap.put(thread.getThreadNumberAsInteger(), thread);
504         }
505     }
506 
removeThread(Integer threadNumber)507     public void removeThread(Integer threadNumber)
508     {
509         synchronized (fThreadMap)
510         {
511             fThreadMap.remove(threadNumber);
512         }
513     }
514 
getTimedEventQueue()515     public STAXTimedEventQueue getTimedEventQueue()
516     {
517         if (fSTAX.getTimedEventQueuePerJob())
518             return fTimedEventQueue;
519         else
520             return fSTAX.getTimedEventQueue();
521     }
522 
addFunction(STAXFunctionAction function)523     public void addFunction(STAXFunctionAction function)
524     {
525         fDocument.addFunction(function);
526     }
527 
getFunction(String name)528     public STAXAction getFunction(String name)
529     {
530         return fDocument.getFunction(name);
531     }
532 
getBreakpointFirstFunction()533     public boolean getBreakpointFirstFunction()
534     {
535         return fBreakpointFirstFunction;
536     }
537 
setBreakpointFirstFunction(boolean breakpointFirstFunction)538     public void setBreakpointFirstFunction(boolean breakpointFirstFunction)
539     {
540         fBreakpointFirstFunction = breakpointFirstFunction;
541     }
542 
getBreakpointSubjobFirstFunction()543     public boolean getBreakpointSubjobFirstFunction()
544     {
545         return fBreakpointSubjobFirstFunction;
546     }
547 
setBreakpointSubjobFirstFunction( boolean breakpointSubjobFirstFunction)548     public void setBreakpointSubjobFirstFunction(
549                     boolean breakpointSubjobFirstFunction)
550     {
551         fBreakpointSubjobFirstFunction = breakpointSubjobFirstFunction;
552     }
553 
functionExists(String name)554     public boolean functionExists(String name)
555     {
556         return fDocument.functionExists(name);
557     }
558 
addDefaultAction(STAXAction action)559     public void addDefaultAction(STAXAction action)
560     {
561         fDocument.addDefaultAction(action);
562     }
563 
addCompletionNotifiee(STAXJobCompleteListener listener)564     public void addCompletionNotifiee(STAXJobCompleteListener listener)
565     {
566         synchronized (fCompletionNotifiees)
567         {
568             fCompletionNotifiees.addLast(listener);
569         }
570     }
571 
addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee)572     public STAFResult addCompletionNotifiee2(STAXJobCompleteNotifiee notifiee)
573     {
574         synchronized (fCompletionNotifiees)
575         {
576             // Check if the notifiee is already in the list
577 
578             for (STAXJobCompleteListener listener : fCompletionNotifiees)
579             {
580                 if (listener instanceof
581                     com.ibm.staf.service.stax.STAXJobCompleteNotifiee)
582                 {
583                     STAXJobCompleteNotifiee aNotifiee =
584                         (STAXJobCompleteNotifiee)listener;
585 
586                     if (aNotifiee.getMachine().equals(notifiee.getMachine()) &&
587                         aNotifiee.getHandle() == notifiee.getHandle() &&
588                         aNotifiee.getHandleName().equals(notifiee.getHandleName()))
589                     {
590                         return new STAFResult(
591                             STAFResult.AlreadyExists,
592                             "A notifiee is already registered for machine=" +
593                             notifiee.getMachine() +
594                             ", handle=" + notifiee.getHandle() +
595                             ", handleName=" + notifiee.getHandleName());
596                     }
597                 }
598 
599             }
600 
601             // Add to the end of the list
602 
603             fCompletionNotifiees.addLast(notifiee);
604         }
605 
606         return new STAFResult(STAFResult.Ok, "");
607     }
608 
removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee)609     public STAFResult removeCompletionNotifiee(STAXJobCompleteNotifiee notifiee)
610     {
611         boolean found = false;
612 
613         synchronized (fCompletionNotifiees)
614         {
615             // Check if notifiee exists.  If so, remove the notifiee
616 
617             Iterator<STAXJobCompleteListener> iter =
618                 fCompletionNotifiees.iterator();
619 
620             while (iter.hasNext())
621             {
622                 STAXJobCompleteListener listener = iter.next();
623 
624                 if (listener instanceof
625                     com.ibm.staf.service.stax.STAXJobCompleteNotifiee)
626                 {
627                     STAXJobCompleteNotifiee aNotifiee =
628                         (STAXJobCompleteNotifiee)listener;
629 
630                     if (aNotifiee.getMachine().equals(notifiee.getMachine()) &&
631                         aNotifiee.getHandle() == notifiee.getHandle() &&
632                         aNotifiee.getHandleName().equals(notifiee.getHandleName()))
633                     {
634                         // Remove notifiee from list
635 
636                         fCompletionNotifiees.remove(aNotifiee);
637                         found = true;
638                         return new STAFResult(STAFResult.Ok, "");
639                     }
640                 }
641             }
642         }
643 
644         return new STAFResult(
645             STAFResult.DoesNotExist,
646             "No notifiee registered for machine=" + notifiee.getMachine() +
647             ", handle=" + notifiee.getHandle() +
648             ", handleName=" + notifiee.getHandleName());
649     }
650 
getCompletionNotifiees()651     public LinkedList<STAXJobCompleteListener> getCompletionNotifiees()
652     {
653         synchronized(fCompletionNotifiees)
654         {
655             return new LinkedList<STAXJobCompleteListener>
656                 (fCompletionNotifiees);
657         }
658     }
659 
660     // Gets the compiled Jython code for the specified code string.
661     // Note that we cache compiled Jython code so that if it is used more
662     // than once, it eliminates the overhead of recompiling the code string
663     // each time it is executed.
664     //
665     // Input Arguments:
666     //  - codeString:  must be valid Python code; however, like ordinary
667     //                 Python code, the existence of variables will not be
668     //                 checked until the code is executed.
669     //  - kind:        is either 'exec' if the string is made up of
670     //                 statements, 'eval' if it is an expression.
671 
getCompiledPyCode(String codeString, String kind)672     public PyCode getCompiledPyCode(String codeString, String kind)
673     {
674         synchronized(fCompiledPyCodeCache)
675         {
676             PyCode codeObject = fCompiledPyCodeCache.get(codeString);
677 
678             if (codeObject == null)
679             {
680                 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheAdds++;
681 
682                 if (kind.equals("eval"))
683                 {
684                     // Set to avoid error of setting code object to nothing
685                     if (codeString.equals("")) codeString = "None";
686 
687                     /* Jython 2.1:
688                     codeObject = __builtin__.compile(
689                         "STAXPyEvalResult = " + codeString,
690                         "<pyEval string>", "exec");
691                     */
692                     // Jython 2.5:
693                     codeObject = Py.compile_flags(
694                         codeString, "<pyEval string>",
695                         CompileMode.eval,
696                         Py.getCompilerFlags().combine(
697                             CompilerFlags.PyCF_SOURCE_IS_UTF8));
698                 }
699                 else
700                 {
701                     /* Jython 2.1:
702                     codeObject = __builtin__.compile(
703                         codeString, "<pyExec string>", "exec");
704                     */
705                     // Jython 2.5:
706                     codeObject = Py.compile_flags(
707                         codeString, "<pyExec string>",
708                         CompileMode.exec,
709                         Py.getCompilerFlags().combine(
710                             CompilerFlags.PyCF_SOURCE_IS_UTF8));
711                 }
712 
713                 fCompiledPyCodeCache.put(codeString, codeObject);
714             }
715             else
716             {
717                 if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheGets++;
718             }
719 
720             return codeObject;
721         }
722     }
723 
724     // Queue listener methods
725 
registerSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)726     public void registerSTAFQueueListener(String msgType,
727                                           STAXSTAFQueueListener listener)
728     {
729         synchronized (fQueueListenerMap)
730         {
731             TreeSet<STAXSTAFQueueListener> listenerSet =
732                 fQueueListenerMap.get(msgType);
733 
734             if (listenerSet == null)
735             {
736                 listenerSet = new TreeSet<STAXSTAFQueueListener>(
737                     new STAXSTAFQueueListenerComparator());
738                 fQueueListenerMap.put(msgType, listenerSet);
739             }
740 
741             listenerSet.add(listener);
742         }
743     }
744 
unregisterSTAFQueueListener(String msgType, STAXSTAFQueueListener listener)745     public void unregisterSTAFQueueListener(String msgType,
746                                             STAXSTAFQueueListener listener)
747     {
748         synchronized (fQueueListenerMap)
749         {
750             TreeSet<STAXSTAFQueueListener> listenerSet =
751                 fQueueListenerMap.get(msgType);
752 
753             if (listenerSet != null)
754                 listenerSet.remove(listener);
755         }
756     }
757 
758     //
759     // Data management functions
760     //
761 
setData(String dataName, Object data)762     public boolean setData(String dataName, Object data)
763     {
764         // Return true if added successfully, else return false.
765 
766         synchronized (fDataMap)
767         {
768             if (!fDataMap.containsKey(dataName))
769             {
770                 fDataMap.put(dataName, data);
771                 return true;
772             }
773         }
774 
775         return false;
776     }
777 
getData(String dataName)778     public Object getData(String dataName)
779     {
780         synchronized (fDataMap)
781         { return fDataMap.get(dataName); }
782     }
783 
removeData(String dataName)784     public boolean removeData(String dataName)
785     {
786         // Return true if removed successfully, else return false.
787 
788         synchronized (fDataMap)
789         {
790             if (fDataMap.containsKey(dataName))
791             {
792                 fDataMap.remove(dataName);
793                 return true;
794             }
795         }
796 
797         return false;
798     }
799 
800     //
801     // Execution methods
802     //
803 
startExecution()804     public void startExecution()
805         throws STAFException, STAXException
806     {
807         // Check if the MAXRETURNFILESIZE setting is not 0 (no maximum size
808         // limit) and if so, set variable STAF/MaxReturnFileSize for the STAX
809         // job handle
810 
811         if (fSTAX.getMaxReturnFileSize() != 0)
812         {
813             String request = "SET VAR " + STAFUtil.wrapData(
814                 "STAF/MaxReturnFileSize=" + fSTAX.getMaxReturnFileSize());
815 
816             STAFResult result = fHandle.submit2("local", "VAR", request);
817 
818             if (result.rc != STAFResult.Ok)
819             {
820                 String msg = "The STAX service could not set the maximum " +
821                     "file size variable for Job ID " + fJobNumber +
822                     "\nSTAF local VAR " + request + " failed with RC=" +
823                     result.rc + ", Result=" + result.result;
824 
825                 log(STAXJob.JOB_LOG, "error", msg);
826             }
827         }
828 
829         if (fSTAX.getTimedEventQueuePerJob())
830         {
831             // Create a STAXTimedEventQueue thread for this job and start it
832             fTimedEventQueue = new STAXTimedEventQueue();
833         }
834 
835         fSTAFQueueMonitor = new STAFQueueMonitor(this);
836         fSTAFQueueMonitor.start();
837 
838         // Initialize data for the Job
839 
840         fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this)
841         {
842             public void visit(Object o, Iterator iter)
843             {
844                 STAXJobManagementHandler handler = (STAXJobManagementHandler)o;
845 
846                 handler.initJob((STAXJob)fData);
847             }
848         });
849 
850         // Register to listen for messages that STAF requests have completed
851 
852         registerSTAFQueueListener("STAF/RequestComplete", this);
853 
854         // Get the main thread, thread 1
855 
856         STAXThread thread = fThreadMap.get(new Integer(1));
857 
858         // Use a custom OutputStream to redirect the Python Interpreter's
859         // stdout and stderr to somewhere other than the JVM Log and/or
860         // to reformat the output (e.g. add a timestamp, job ID) when
861         // logging to the JVM Log.  Use the log level specified for the
862         // job's python output.
863 
864         thread.setPythonInterpreterStdout(new STAXPythonOutput(this));
865 
866         // Override the log level to use to always be "Error" for stderr
867         thread.setPythonInterpreterStderr(new STAXPythonOutput(this, "Error"));
868 
869         // Get the starting function for the job
870 
871         STAXAction action = fDocument.getFunction(
872             fDocument.getStartFunction());
873 
874         if (action == null)
875         {
876             throw new STAXInvalidStartFunctionException(
877                 "'" + fDocument.getStartFunction() +
878                 "' is not a valid function name.  No function with " +
879                 "this name is defined.");
880         }
881         else
882         {
883             // Call the main function passing any arguments
884 
885             String startFunctionUneval = "'" + fDocument.getStartFunction() +
886                 "'";
887 
888             if (fDefaultCallAction != null)
889             {
890                 STAXCallAction callAction = fDefaultCallAction;
891                 callAction.setFunction(startFunctionUneval);
892                 callAction.setArgs(fDocument.getStartFunctionArgs());
893                 action = callAction;
894             }
895             else
896             {
897                 STAXCallAction callAction = new STAXCallAction(
898                     startFunctionUneval, fDocument.getStartFunctionArgs());
899                 callAction.setLineNumber(
900                     "<External>", "<Error in ARGS option>");
901                 callAction.setXmlFile(getXmlFile());
902                 callAction.setXmlMachine(getXmlMachine());
903                 action = callAction;
904             }
905         }
906 
907         ArrayList<STAXAction> actionList = new ArrayList<STAXAction>();
908 
909         // If HOLD was specified on EXECUTE request, add hold action as
910         // the first action in the actionList.
911 
912         if (fExecuteAndHold != null)
913         {
914             if (fExecuteAndHold.length() == 0)
915             {
916                 // Add a hold action with no timeout
917 
918                 actionList.add(new STAXHoldAction("'main'"));
919             }
920             else
921             {
922                 // Add a hold action with a timeout
923 
924                 actionList.add(
925                     new STAXHoldAction("'main'", "1", fExecuteAndHold));
926             }
927         }
928 
929         // Add additional Python variables about the Job
930         thread.pySetVar("STAXJobID", new Integer(fJobNumber));
931         thread.pySetVar("STAXJobName", fJobName);
932         thread.pySetVar("STAXJobXMLFile", fXmlFile);
933         thread.pySetVar("STAXJobXMLMachine", fXmlMachine);
934         thread.pySetVar("STAXJobStartDate", fStartTimestamp.getDateString());
935         thread.pySetVar("STAXJobStartTime", fStartTimestamp.getTimeString());
936         thread.pySetVar("STAXJobSourceMachine", fSourceMachine);
937         thread.pySetVar("STAXJobSourceHandleName", fSourceHandleName);
938         thread.pySetVar("STAXJobSourceHandle", new Integer(fSourceHandle));
939         thread.pySetVar("STAXJobStartFunctionName",
940                         fDocument.getStartFunction());
941         thread.pySetVar("STAXJobStartFunctionArgs",
942                         fDocument.getStartFunctionArgs());
943         thread.pySetVar("STAXCurrentFunction", Py.None);
944         thread.pySetVar("STAXCurrentXMLFile", fXmlFile);
945         thread.pySetVar("STAXCurrentXMLMachine", fXmlMachine);
946 
947         if (!fScriptFileMachine.equals(""))
948             thread.pySetVar("STAXJobScriptFileMachine", fScriptFileMachine);
949         else
950             thread.pySetVar("STAXJobScriptFileMachine", fSourceMachine);
951 
952         thread.pySetVar("STAXJobScriptFiles", fScriptFiles.toArray());
953         thread.pySetVar("STAXJobHandle", fHandle);
954 
955         thread.pySetVar("STAXServiceName", fSTAX.getServiceName());
956         thread.pySetVar("STAXServiceMachine", fSTAX.getLocalMachineName());
957         thread.pySetVar("STAXServiceMachineNickname",
958                         fSTAX.getLocalMachineNickname());
959         thread.pySetVar("STAXEventServiceName", fSTAX.getEventServiceName());
960         thread.pySetVar("STAXEventServiceMachine",
961                         fSTAX.getEventServiceMachine());
962         thread.pySetVar("STAXServicePath", fSTAX.getServicePath());
963 
964         thread.pySetVar("STAXJobUserLog", new STAFLog(
965             STAFLog.MACHINE,
966             fSTAX.getServiceName().toUpperCase() + "_Job_" +
967             fJobNumber + "_User",
968             fHandle, 0));
969         thread.pySetVar("STAXJobLogName",
970                         fSTAX.getServiceName().toUpperCase() +
971                         "_Job_" + fJobNumber);
972         thread.pySetVar("STAXJobUserLogName",
973                         fSTAX.getServiceName().toUpperCase() +
974                         "_Job_" + fJobNumber + "_User");
975 
976         thread.pySetVar("STAXJobWriteLocation", fJobDataDir);
977         thread.pySetVar("STAXMessageLog", new Integer(0));
978         thread.pySetVar("STAXLogMessage", new Integer(0));
979 
980         if (fLogTCElapsedTime)
981             thread.pySetVar("STAXLogTCElapsedTime", new Integer(1));
982         else
983             thread.pySetVar("STAXLogTCElapsedTime", new Integer(0));
984 
985         if (fLogTCNumStarts)
986             thread.pySetVar("STAXLogTCNumStarts", new Integer(1));
987         else
988             thread.pySetVar("STAXLogTCNumStarts", new Integer(0));
989 
990         if (fLogTCStartStop)
991             thread.pySetVar("STAXLogTCStartStop", new Integer(1));
992         else
993             thread.pySetVar("STAXLogTCStartStop", new Integer(0));
994 
995         thread.pySetVar("STAXPythonOutput",
996                         STAXPythonOutput.getPythonOutputAsString(
997                             getPythonOutput()));
998 
999         thread.pySetVar("STAXPythonLogLevel", getPythonLogLevel());
1000 
1001         thread.pySetVar("STAXInvalidLogLevelAction",
1002                         STAXLogAction.getInvalidLogLevelActionAsString(
1003                             getInvalidLogLevelAction()));
1004 
1005         // Add default signal handlers to the actionList for the main block.
1006         addDefaultSignalHandlers(actionList);
1007 
1008         // Put the "main" function/block at the bottom of the stack,
1009         // with its action being a sequence group that contains the
1010         // default actions (scripts and signalhandlers) in the <stax>
1011         // element and then the call of the main function.
1012 
1013         LinkedList<STAXAction> defaultActions = fDocument.getDefaultActions();
1014 
1015         while (!defaultActions.isEmpty())
1016         {
1017             actionList.add(defaultActions.removeLast());
1018         }
1019 
1020         actionList.add(action);  // Add call of main function
1021 
1022         // Create a sequence action for the "main" function/block
1023         STAXActionDefaultImpl mainSequence = new STAXSequenceAction(actionList);
1024         mainSequence.setLineNumber("sequence", "<Internal>");
1025         mainSequence.setXmlFile(getXmlFile());
1026         mainSequence.setXmlMachine(getXmlMachine());
1027 
1028         STAXActionDefaultImpl mainBlock = new STAXBlockAction(
1029             "main", mainSequence);
1030         mainBlock.setLineNumber("block", "<Internal>");
1031         mainBlock.setXmlFile(getXmlFile());
1032         mainBlock.setXmlMachine(getXmlMachine());
1033 
1034         thread.pushAction(mainBlock);
1035 
1036         // Change the job's state from pending to running
1037         fState = RUNNING_STATE;
1038 
1039         // Generate the job is running event
1040 
1041         HashMap<String, String> jobRunningMap = new HashMap<String, String>();
1042         jobRunningMap.put("type", "job");
1043         jobRunningMap.put("status", "run");
1044         jobRunningMap.put("jobID", String.valueOf(fJobNumber));
1045         jobRunningMap.put("startFunction", fDocument.getStartFunction());
1046         jobRunningMap.put("jobName", fJobName);
1047         jobRunningMap.put("startTimestamp",
1048                           fStartTimestamp.getTimestampString());
1049 
1050         generateEvent(STAXJob.STAX_JOB_EVENT, jobRunningMap, true);
1051 
1052         thread.pySetVar("STAXJob", this);
1053 
1054         thread.setBreakpointFirstFunction(fBreakpointFirstFunction);
1055 
1056         // Schedule the thread to run
1057         thread.schedule();
1058     }
1059 
1060     /**
1061      * This is a recursive function that checks if a function needs to
1062      * import other xml files that contain functions it requires.
1063      *
1064      * Checks if the specified function has any function-import sub-elements
1065      * that specify files to be imported.  If so, it parses the files (if not
1066      * already in the cache) and adds their required functions to the job's
1067      * fFunctionMap. For any new functions added to the job's fFunctionMap,
1068      * it recursively calls the addImportedFunctions method.
1069      */
addImportedFunctions(STAXFunctionAction functionAction)1070     public void addImportedFunctions(STAXFunctionAction functionAction)
1071         throws STAFException, STAXException
1072     {
1073         List<STAXFunctionImport> importList = functionAction.getImportList();
1074 
1075         if (importList.size() == 0)
1076             return;
1077 
1078         // Process function-import list (since not empty)
1079 
1080         String evalElem = "function-import";
1081         int evalIndex = 0;
1082 
1083         for (STAXFunctionImport functionImport : importList)
1084         {
1085             String machine = functionImport.getMachine();
1086             String file = functionImport.getFile();
1087             String directory = functionImport.getDirectory();
1088 
1089             boolean directorySpecified = false;
1090 
1091             if (directory != null)
1092                 directorySpecified = true;
1093 
1094             boolean machineSpecified = false;
1095 
1096             if (machine != null)
1097             {
1098                 machineSpecified = true;
1099 
1100                 // Check if "machine" contains any STAF variables
1101 
1102                 if (machine.indexOf("{") != -1)
1103                 {
1104                     // Resolve variables on the local STAX service machine
1105 
1106                     STAFResult result = this.submitSync(
1107                         "local", "VAR", "RESOLVE STRING " +
1108                         STAFUtil.wrapData(machine));
1109 
1110                     if (result.rc != STAFResult.Ok)
1111                     {
1112                         String errorMsg = "Cause:  Error resolving STAF " +
1113                             "variables in the \"machine\" attribute " +
1114                             "for element type \" function-import\"," +
1115                             "\nRC: " + result.rc +
1116                             ", Result: " + result.result;
1117 
1118                         functionAction.setElementInfo(
1119                             new STAXElementInfo(
1120                                 evalElem, "machine", evalIndex, errorMsg));
1121 
1122                         throw new STAXFunctionImportException(
1123                             STAXUtil.formatErrorMessage(functionAction));
1124                     }
1125 
1126                     machine = result.result;
1127                 }
1128             }
1129             else
1130             {
1131                 machine = fXmlMachine;
1132             }
1133 
1134             // Check if file or directory contains any STAF variables.
1135 
1136             String evalAttr = "file";
1137             String unresValue = file;
1138 
1139             if (directorySpecified)
1140             {
1141                 evalAttr = "directory";
1142                 unresValue = directory;
1143             }
1144 
1145             if (unresValue.indexOf("{") != -1)
1146             {
1147                 // Resolve variables on the local STAX service machine
1148 
1149                 STAFResult result = this.submitSync(
1150                     "local", "VAR", "RESOLVE STRING " +
1151                     STAFUtil.wrapData(unresValue));
1152 
1153                 if (result.rc != STAFResult.Ok)
1154                 {
1155                     String errorMsg = "Cause:  Error resolving STAF " +
1156                         "variables in the \"" + evalAttr + "\" attribute " +
1157                         "for element type \"function-import\"." +
1158                         "\nRC: " + result.rc +
1159                         ", Result: " + result.result;
1160 
1161                     functionAction.setElementInfo(
1162                         new STAXElementInfo(
1163                             evalElem, evalAttr, evalIndex, errorMsg));
1164 
1165                     throw new STAXFunctionImportException(
1166                         STAXUtil.formatErrorMessage(functionAction));
1167                 }
1168 
1169                 if (!directorySpecified)
1170                     file = result.result;
1171                 else
1172                     directory = result.result;
1173             }
1174 
1175             // Handle any functions specified to be imported
1176 
1177             String functions = functionImport.getFunctions();
1178 
1179             Vector<String> importedFunctionList = new Vector<String>();
1180 
1181             if (functions != null)
1182             {
1183                 // Convert string containing a whitespace-separated list of
1184                 // functions to a vector.
1185                 //
1186                 // Note: The tokenizer uses the default delimiter set, which
1187                 // is " \t\n\r\f" and consists of the space character,
1188                 // the tab character, the newline character, the
1189                 // carriage-return character, and the form-feed character.
1190                 // Delimiter characters themselves will not be treated as
1191                 // tokens.
1192 
1193                 StringTokenizer st = new StringTokenizer(functions);
1194 
1195                 while (st.hasMoreTokens())
1196                 {
1197                     importedFunctionList.add(st.nextToken());
1198                 }
1199             }
1200 
1201             // Get the file separator for the machine where the import file/
1202             // directory resides as this is needed to normalize the path name
1203             // and may also be needed to determine if its a relative file
1204             // name, and to get the parent directory.
1205 
1206             STAFResult result = STAXFileCache.getFileSep(
1207                 machine, this.getSTAX().getSTAFHandle());
1208 
1209             if (result.rc != STAFResult.Ok)
1210             {
1211                 String errorMsg = "Cause:  No response from machine \"" +
1212                     machine + "\" when trying to get the contents of a " +
1213                     "file specified by element type \"function-import\"." +
1214                     "\nRC: " + result.rc + ", Result: " + result.result;
1215 
1216                 functionAction.setElementInfo(
1217                     new STAXElementInfo(
1218                         evalElem, "machine", evalIndex, errorMsg));
1219 
1220                 throw new STAXFunctionImportException(
1221                     STAXUtil.formatErrorMessage(functionAction));
1222             }
1223 
1224             String fileSep = result.result;
1225 
1226             // Set a flag to indicate if the file/directory name is
1227             // case-sensitive (e.g. true if it resides on a Unix machine,
1228             // false if Windows)
1229 
1230             boolean caseSensitiveFileName = true;
1231 
1232             if (fileSep.equals("\\"))
1233             {
1234                 // Windows machine so not case-sensitive
1235 
1236                 caseSensitiveFileName = false;
1237             }
1238 
1239             // Import the file specified in the function-import element.
1240             // If the file specified in the function-import element isn't
1241             // already in the file cache, get the file and parse it.
1242             // Then, add the functions defined in this file to the main job's
1243             // function map.
1244 
1245             // If no machine attribute is specified, then the file specified
1246             // could be a relative file name so need to assign its absolute
1247             // file name
1248 
1249             if (!machineSpecified)
1250             {
1251                 // Check if a relative path was specified
1252 
1253                 String entry = file;
1254 
1255                 if (directorySpecified)
1256                     entry = directory;
1257 
1258                 if (STAXUtil.isRelativePath(entry, fileSep))
1259                 {
1260                     // Assign the absolute name assuming it is relative
1261                     // to the parent xml file's path
1262 
1263                     String currentFile = functionAction.getXmlFile();
1264 
1265                     if (currentFile.equals(STAX.INLINE_DATA))
1266                     {
1267                         // Cannot specify a relative file path if the parent
1268                         // xml file is STAX.INLINE_DATA
1269 
1270                         String errorMsg = "Cause:  Cannot specify a " +
1271                             "relative path in attribute \"" + evalAttr +
1272                             "\" for element type \"function-import\" when " +
1273                             "the parent xml file is " + STAX.INLINE_DATA;
1274 
1275                         functionAction.setElementInfo(
1276                             new STAXElementInfo(
1277                                 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1278                                 evalIndex, errorMsg));
1279 
1280                         throw new STAXFunctionImportException(
1281                             STAXUtil.formatErrorMessage(functionAction));
1282                     }
1283 
1284                     entry = STAXUtil.getParentPath(currentFile, fileSep) +
1285                         entry;
1286 
1287                     if (!directorySpecified)
1288                         file = entry;
1289                     else
1290                         directory = entry;
1291                 }
1292             }
1293 
1294             // Normalize the import file/directory name so that we have a
1295             // better chance at matching file names that are already cached
1296 
1297             if (!directorySpecified)
1298                 file = STAXUtil.normalizeFilePath(file, fileSep);
1299             else
1300                 directory = STAXUtil.normalizeFilePath(directory, fileSep);
1301 
1302             // Create a STAX XML Parser
1303 
1304             STAXParser parser = null;
1305 
1306             try
1307             {
1308                 parser = new STAXParser(getSTAX());
1309             }
1310             catch (Exception ex)
1311             {
1312                 String errorMsg = ex.getClass().getName() + "\n" +
1313                     ex.getMessage();
1314 
1315                 functionAction.setElementInfo(
1316                     new STAXElementInfo(
1317                         evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1318                         evalIndex, errorMsg));
1319 
1320                 throw new STAXFunctionImportException(
1321                     STAXUtil.formatErrorMessage(functionAction));
1322             }
1323 
1324             // Create a list of files to process
1325 
1326             List<String> theFileList = new ArrayList<String>();
1327 
1328             if (!directorySpecified)
1329             {
1330                 // There will be just one file to process
1331 
1332                 theFileList.add(file);
1333             }
1334             else
1335             {
1336                 // Submit a FS LIST DIRECTORY request for all *.xml files
1337                 // in the directory
1338 
1339                 result = submitSync(
1340                     machine, "FS", "LIST DIRECTORY " +
1341                     STAFUtil.wrapData(directory) +
1342                     " TYPE F EXT xml CASEINSENSITIVE");
1343 
1344                 if (result.rc != 0)
1345                 {
1346                     evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME;
1347                     String errorMsg = "Cause:  ";
1348 
1349                     if (result.rc == STAFResult.NoPathToMachine)
1350                     {
1351                         errorMsg = errorMsg + "No response from machine ";
1352                         evalAttr = "machine";
1353                     }
1354                     else
1355                     {
1356                         errorMsg = errorMsg + "Error ";
1357                     }
1358 
1359                     errorMsg = errorMsg + "when submitting a FS LIST " +
1360                         "DIRECTORY request to list all *.xml files in the " +
1361                         "directory specified by element type " +
1362                         "\"function-import\":" +
1363                         "\n  Directory: " + directory +
1364                         "\n  Machine: " + machine +
1365                         "\n\nRC: " + result.rc + ", Result: " + result.result;
1366 
1367                     functionAction.setElementInfo(
1368                         new STAXElementInfo(
1369                             evalElem, evalAttr, evalIndex, errorMsg));
1370 
1371                     throw new STAXFunctionImportException(
1372                         STAXUtil.formatErrorMessage(functionAction));
1373                 }
1374 
1375                 Iterator fileIter = ((List)result.resultObj).iterator();
1376 
1377                 while (fileIter.hasNext())
1378                 {
1379                     theFileList.add(
1380                         directory + fileSep + (String)fileIter.next());
1381                 }
1382             }
1383 
1384             for (String fileName : theFileList)
1385             {
1386                 Date dLastModified = null;
1387                 STAXJob job = null;
1388 
1389                 // If file caching is enabled, find the modification date of
1390                 // the file being imported
1391 
1392                 if (getSTAX().getFileCaching())
1393                 {
1394                     if (STAXFileCache.get().isLocalMachine(machine))
1395                     {
1396                         File fileObj = new File(fileName);
1397 
1398                         // Make sure the file exists
1399 
1400                         if (fileObj.exists())
1401                         {
1402                             long lastModified = fileObj.lastModified();
1403 
1404                             if (lastModified > 0)
1405                             {
1406                                 // Chop off the milliseconds because some
1407                                 // systems don't report modTime to milliseconds
1408 
1409                                 lastModified = ((long)(lastModified/1000))*1000;
1410 
1411                                 dLastModified = new Date(lastModified);
1412                             }
1413                         }
1414                     }
1415 
1416                     if (dLastModified == null)
1417                     {
1418                         // Find the remote file mod time using STAF
1419 
1420                         STAFResult entryResult = submitSync(
1421                             machine, "FS", "GET ENTRY " +
1422                             STAFUtil.wrapData(fileName) + " MODTIME");
1423 
1424                         if (entryResult.rc == 0)
1425                         {
1426                             String modDate = entryResult.result;
1427                             dLastModified = STAXFileCache.convertSTAXDate(
1428                                 modDate);
1429                         }
1430                     }
1431 
1432                     // Check for an up-to-date file in the cache
1433 
1434                     if ((dLastModified != null) &&
1435                         STAXFileCache.get().checkCache(
1436                             machine, fileName, dLastModified,
1437                             caseSensitiveFileName))
1438                     {
1439                         // Get the doc from cache
1440 
1441                         STAXDocument doc = STAXFileCache.get().getDocument(
1442                             machine, fileName, caseSensitiveFileName);
1443 
1444                         if (doc != null)
1445                         {
1446                             job = new STAXJob(getSTAX(), doc);
1447                         }
1448                     }
1449                 }
1450 
1451                 // If the file was not in cache, then retrieve it using STAF
1452 
1453                 if (job == null)
1454                 {
1455                     result = submitSync(
1456                         machine, "FS", "GET FILE " + STAFUtil.wrapData(
1457                             fileName));
1458 
1459                     if (result.rc != 0)
1460                     {
1461                         evalAttr = STAXElementInfo.NO_ATTRIBUTE_NAME;
1462                         String errorMsg = "Cause:  ";
1463 
1464                         if (result.rc == STAFResult.NoPathToMachine)
1465                         {
1466                             errorMsg = errorMsg + "No response from machine ";
1467                             evalAttr = "machine";
1468                         }
1469                         else
1470                         {
1471                             errorMsg = errorMsg + "Error ";
1472                         }
1473 
1474                         errorMsg = errorMsg + "when submitting a FS GET " +
1475                             "request to get a file specified by element type " +
1476                             "\"function-import\":" +
1477                             "\n  File: " + fileName +
1478                             "\n  Machine: " + machine +
1479                             "\n\nRC: " + result.rc + ", Result: " + result.result;
1480 
1481                         functionAction.setElementInfo(
1482                             new STAXElementInfo(
1483                                 evalElem, evalAttr, evalIndex, errorMsg));
1484 
1485                         throw new STAXFunctionImportException(
1486                             STAXUtil.formatErrorMessage(functionAction));
1487                     }
1488 
1489                     // Parse the XML document
1490 
1491                     try
1492                     {
1493                         job = parser.parse(result.result, fileName, machine);
1494                     }
1495                     catch (Exception ex)
1496                     {
1497                         String errorMsg = "Cause: " +
1498                             ex.getClass().getName() + "\n";
1499 
1500                         // Make sure that the error message contains the File:
1501                         // and Machine: where the error occurred
1502 
1503                         if (ex.getMessage().indexOf("File: ") == -1 ||
1504                             ex.getMessage().indexOf("Machine: ") == -1)
1505                         {
1506                             errorMsg = errorMsg + "\nFile: " + fileName +
1507                                 ", Machine: " + machine;
1508                         }
1509 
1510                         errorMsg = errorMsg + ex.getMessage();
1511 
1512                         functionAction.setElementInfo(
1513                             new STAXElementInfo(
1514                                 evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1515                                 evalIndex, errorMsg));
1516 
1517                         throw new STAXFunctionImportException(
1518                             STAXUtil.formatErrorMessage(functionAction));
1519                     }
1520 
1521                     // Add the XML document to the cache
1522 
1523                     if (getSTAX().getFileCaching() && (dLastModified != null))
1524                     {
1525                         STAXFileCache.get().addDocument(
1526                             machine, fileName, job.getSTAXDocument(),
1527                             dLastModified, caseSensitiveFileName);
1528                     }
1529                 }
1530 
1531                 // Get a map of the functions in this xml file being imported
1532 
1533                 HashMap<String, STAXFunctionAction> functionMap =
1534                     job.getSTAXDocument().getFunctionMap();
1535 
1536                 // Verify that if function names were specified in the
1537                 // function-import element, verify that all the function names
1538                 // specified exist in the xml file
1539 
1540                 if (!importedFunctionList.isEmpty())
1541                 {
1542                     for (String functionName : importedFunctionList)
1543                     {
1544                         if (!functionMap.containsKey(functionName))
1545                         {
1546                             // Function name specified in the function-import
1547                             // element's "functions" attribute does not exist
1548 
1549                             String errorMsg = "Cause:  Function \"" +
1550                                 functionName + "\" does not exist in file \"" +
1551                                 fileName + "\" on machine \"" + machine + "\".";
1552 
1553                             functionAction.setElementInfo(
1554                                 new STAXElementInfo(
1555                                     evalElem, STAXElementInfo.NO_ATTRIBUTE_NAME,
1556                                     evalIndex, errorMsg));
1557 
1558                             throw new STAXFunctionImportException(
1559                                 STAXUtil.formatErrorMessage(functionAction));
1560                         }
1561                     }
1562                 }
1563 
1564                 // Create a set containing the functions requested to be
1565                 // imported.  If no functions were specifically requested to
1566                 // be imported, add all the functions in the xml file to the
1567                 // set.
1568 
1569                 Set<String> functionSet;
1570 
1571                 if (importedFunctionList.isEmpty())
1572                     functionSet = functionMap.keySet();
1573                 else
1574                     functionSet = new LinkedHashSet<String>(importedFunctionList);
1575 
1576                 // Add the functions requested to be imported via a
1577                 // function-import element to the main job's fFunctionMap and
1578                 // add any other functions that these functions also require
1579                 // to the main job's fFunctionMap
1580 
1581                 Vector<String> requiredFunctionList = new Vector<String>();
1582 
1583                 for (String functionName : functionSet)
1584                 {
1585                     STAXFunctionAction function = functionMap.get(functionName);
1586 
1587                     // If this function does not exist, add the function to
1588                     // the job's fFunctionMap
1589 
1590                     if (!(functionExists(functionName)))
1591                     {
1592                         // Add the function to the job's function map
1593 
1594                         this.addFunction(function);
1595 
1596                         // Check if this function requires any other functions
1597                         // and if so, add them to the list of required
1598                         // functions
1599 
1600                         StringTokenizer requiredFunctions =
1601                             new StringTokenizer(function.getRequires(), " ");
1602 
1603                         while (requiredFunctions.hasMoreTokens())
1604                         {
1605                             requiredFunctionList.add(
1606                                 requiredFunctions.nextToken());
1607                         }
1608 
1609                         // Recursive call to add functions this functions
1610                         // requires if specified by function-import elements
1611 
1612                         addImportedFunctions(function);
1613                     }
1614                 }
1615 
1616                 // Process required functions (functions that the imported
1617                 // functions also require)
1618 
1619                 for (int i = 0; i < requiredFunctionList.size(); i++)
1620                 {
1621                     String functionName =
1622                         (String)requiredFunctionList.elementAt(i);
1623 
1624                     if (!functionExists(functionName))
1625                     {
1626                         addRequiredFunctions(functionName, functionMap);
1627                     }
1628                 }
1629             } // End while (fileListIter.hasNext())
1630 
1631             evalIndex++;
1632         } // End while
1633     }
1634 
1635     /**
1636      * This recursive method adds the specified required function to the
1637      * main job's fFunctionMap.  And, if this function has any function-import
1638      * elements, it recursively adds these functions that are required by the
1639      * imported function.  And, if this function has any required functions
1640      * from the same xml file, it recursively adds these required functions.
1641      */
addRequiredFunctions(String functionName, HashMap functionMap)1642     private void addRequiredFunctions(String functionName, HashMap functionMap)
1643         throws STAFException, STAXException
1644     {
1645         STAXFunctionAction function =
1646             (STAXFunctionAction)functionMap.get(functionName);
1647 
1648         this.addFunction(function);
1649 
1650         // If this function has any function-import elements, recursively
1651         // add these functions from the imported xml file that are
1652         // required by the imported function to the main job's
1653         // fFunctionMap if not already present
1654 
1655         addImportedFunctions(function);
1656 
1657         // If this function has a "requires" attribute specifying
1658         // additional functions in the same xml file that it requires,
1659         // recursively add these required functions to the main job's
1660         // fFunctionMap if not already present
1661 
1662         StringTokenizer requiredFunctions = new StringTokenizer(
1663             function.getRequires(), " ");
1664 
1665         while (requiredFunctions.hasMoreElements())
1666         {
1667             String reqFunctionName = (String)requiredFunctions.nextElement();
1668 
1669             if (!functionExists(reqFunctionName))
1670             {
1671                 addRequiredFunctions(reqFunctionName, functionMap);
1672             }
1673         }
1674     }
1675 
addDefaultSignalHandlers(ArrayList<STAXAction> actionList)1676     public void addDefaultSignalHandlers(ArrayList<STAXAction> actionList)
1677     {
1678         // Array of default SignalHandlers.  Each signal is described by
1679         // a signal name, an action, and a signal message variable name.
1680         // If the signal message variable name is not null, a message will
1681         // be sent to the STAX Monitor and logged with level 'error'.
1682 
1683         String[][] defaultSHArray =
1684         {
1685             {
1686                 "STAXPythonEvaluationError", "terminate", "STAXPythonEvalMsg"
1687             },
1688             {
1689                 "STAXProcessStartError", "continue", "STAXProcessStartErrorMsg"
1690             },
1691             {
1692                 "STAXProcessStartTimeout", "continue",
1693                 "STAXProcessStartTimeoutMsg"
1694             },
1695             {
1696                 "STAXCommandStartError", "terminate",
1697                 "STAXCommandStartErrorMsg"
1698             },
1699             {
1700                 "STAXFunctionDoesNotExist", "terminate",
1701                 "STAXFunctionDoesNotExistMsg"
1702             },
1703             {
1704                 "STAXInvalidBlockName", "terminate", "STAXInvalidBlockNameMsg"
1705             },
1706             {
1707                 "STAXBlockDoesNotExist", "continue", "STAXBlockDoesNotExistMsg"
1708             },
1709             {
1710                 "STAXLogError", "continue", "STAXLogMsg"
1711             },
1712             {
1713                 "STAXTestcaseMissingError", "continue",
1714                 "STAXTestcaseMissingMsg"
1715             },
1716             {
1717                 "STAXInvalidTcStatusResult", "continue",
1718                 "STAXInvalidTcStatusResultMsg"
1719             },
1720             {
1721                 "STAXInvalidTimerValue", "terminate",
1722                 "STAXInvalidTimerValueMsg"
1723             },
1724             {
1725                 "STAXNoSuchSignalHandler", "continue",
1726                 "STAXNoSuchSignalHandlerMsg"
1727             },
1728             {
1729                 "STAXEmptyList", "continue", null
1730             },
1731             {
1732                 "STAXMaxThreadsExceeded", "terminate",
1733                 "STAXMaxThreadsExceededMsg"
1734             },
1735             {
1736                 "STAXInvalidMaxThreads", "terminate",
1737                 "STAXInvalidMaxThreadsMsg"
1738             },
1739             {
1740                 "STAXFunctionArgValidate", "terminate",
1741                 "STAXFunctionArgValidateMsg"
1742             },
1743             {
1744                 "STAXImportError", "terminate", "STAXImportErrorMsg"
1745             },
1746             {
1747                 "STAXFunctionImportError", "terminate",
1748                 "STAXFunctionImportErrorMsg"
1749             },
1750             {
1751                 "STAXInvalidTestcaseMode", "continue",
1752                 "STAXInvalidTestcaseModeMsg"
1753             }
1754         };
1755 
1756         // Add default SignalHandlers to actionList
1757 
1758         for (int i = 0; i < defaultSHArray.length; i++)
1759         {
1760             ArrayList<STAXAction> signalHandlerActionList =
1761                 new ArrayList<STAXAction>();
1762 
1763             String signalName = defaultSHArray[i][0];
1764             String signalAction = defaultSHArray[i][1];
1765             String signalMsgVarName = defaultSHArray[i][2];
1766             String signalMsgText = "";
1767 
1768             if (signalAction.equals("terminate"))
1769             {
1770                 signalMsgText = "'" + signalName + " signal raised. " +
1771                     "Terminating job. '" + " + " + signalMsgVarName;
1772             }
1773             else if (signalAction.equals("continue"))
1774             {
1775                 signalMsgText = "'" + signalName + " signal raised. " +
1776                     "Continuing job. '" + " + " + signalMsgVarName;
1777             }
1778 
1779             if (signalMsgVarName == null)
1780             {
1781                 // Add a No Operation (nop) Action to the action list
1782 
1783                 signalHandlerActionList.add(new STAXNopAction());
1784             }
1785             else
1786             {
1787                 // Add a Log Action to the action list
1788 
1789                 int logfile = STAXJob.JOB_LOG;
1790 
1791                 if (signalName == "STAXLogError")
1792                 {
1793                     // Log the error message in the STAX JVM log (otherwise
1794                     // may get a duplicate STAXLogError signal)
1795 
1796                     logfile = STAXJob.JVM_LOG;
1797                 }
1798 
1799                 // Log the message with level 'Error' and send the message
1800                 // to the STAX Monitor
1801 
1802                 signalHandlerActionList.add(new STAXLogAction(
1803                     signalMsgText, "'error'", "1", "1", logfile));
1804             }
1805 
1806             if (signalAction.equals("terminate"))
1807             {
1808                 // Add a Terminate Job Action to the action list
1809 
1810                 signalHandlerActionList.add(
1811                     new STAXTerminateAction("'main'"));
1812             }
1813 
1814             // Add the signalhandler action to the action list
1815 
1816             if (signalHandlerActionList.size() == 1)
1817             {
1818                 // Don't need a STAXSequenceAction since only 1 action in list
1819 
1820                 actionList.add(new STAXSignalHandlerAction(
1821                     "'" + signalName + "'",
1822                     signalHandlerActionList.get(0)));
1823             }
1824             else
1825             {
1826                 // Wrap the action list is a STAXSequenceAction
1827 
1828                 actionList.add(new STAXSignalHandlerAction(
1829                     "'" + signalName + "'",
1830                     new STAXSequenceAction(signalHandlerActionList)));
1831             }
1832         }
1833     }
1834 
1835     //
1836     // Event methods
1837     //
1838 
generateEvent(String eventSubType, Map<String, String> details)1839     public void generateEvent(String eventSubType, Map<String, String> details)
1840     {
1841         generateEvent(eventSubType, details, false);
1842     }
1843 
generateEvent(String eventSubType, Map<String, String> details, boolean notifyAll)1844     public void generateEvent(String eventSubType, Map<String, String> details,
1845                               boolean notifyAll)
1846     {
1847         // Check if event generation has been disabled and if so, do nothing
1848 
1849         if (!getSTAX().getEventGeneration())
1850             return;
1851 
1852         // Generate an event that can be used to monitor a STAX job
1853 
1854         StringBuffer detailsString = new StringBuffer();
1855         String key = "";
1856         String value = "";
1857 
1858         for (Map.Entry<String, String> entry : details.entrySet())
1859         {
1860             key = entry.getKey();
1861             value = entry.getValue();
1862 
1863             if (value != null)
1864             {
1865                 detailsString.append("PROPERTY ").append(
1866                     STAFUtil.wrapData(key + "=" + value)).append(" ");
1867             }
1868         }
1869 
1870         // The type machine should always be the local machine
1871         // The details parm must already be in the :length: format
1872         submitSync(
1873             fSTAX.getEventServiceMachine(),
1874             fSTAX.getEventServiceName(),
1875             "GENERATE TYPE " + fSTAX.getServiceName().toUpperCase() + "/" +
1876             fSTAX.getLocalMachineName() + "/" + fJobNumber +
1877             " SUBTYPE " + STAFUtil.wrapData(eventSubType) +
1878             " ASYNC " + detailsString.toString());
1879 
1880         // Debug
1881         if (false)
1882         {
1883             STAX.logToJVMLog(
1884                 "Debug", fJobNumber, 0,
1885                 "STAXJob::GenerateEvent:\n" +  "GENERATE TYPE " +
1886                 fSTAX.getServiceName().toUpperCase() + "/" +
1887                 fSTAX.getLocalMachineName() + "/" + fJobNumber +
1888                 " SUBTYPE " + STAFUtil.wrapData(eventSubType) +
1889                 " ASYNC " + details);
1890         }
1891 
1892         if (notifyAll)
1893         {
1894             submitSync(
1895                 fSTAX.getEventServiceMachine(),
1896                 fSTAX.getEventServiceName(),
1897                 "GENERATE TYPE " + fSTAX.getServiceName().toUpperCase() + "/" +
1898                 fSTAX.getLocalMachineName() +
1899                 " SUBTYPE " + STAFUtil.wrapData(eventSubType) +
1900                 " ASYNC " + detailsString.toString());
1901 
1902             // Debug
1903             if (false)
1904             {
1905                 STAX.logToJVMLog(
1906                     "Debug", fJobNumber, 0,
1907                     "STAXJob::GenerateEvent:\n" + "GENERATE TYPE " +
1908                     fSTAX.getServiceName().toUpperCase() + "/" +
1909                     fSTAX.getLocalMachineName() +
1910                     " SUBTYPE " + STAFUtil.wrapData(eventSubType) +
1911                     " ASYNC " + details);
1912             }
1913         }
1914     }
1915 
log(int logfile, String level, String message)1916     public STAFResult log(int logfile, String level, String message)
1917     {
1918         if (logfile == STAXJob.JVM_LOG)
1919         {
1920             // Log to the JVM log instead of a STAX Job log
1921 
1922             STAX.logToJVMLog(level, fJobNumber, 0, message);
1923 
1924             return new STAFResult();
1925         }
1926 
1927         String serviceName = fSTAX.getServiceName().toUpperCase();
1928         String logName;
1929 
1930         // Don't resolve messages logged to the STAX_Service log and to
1931         // STAX Job logs to avoid possible RC 13 errors which would prevent
1932         // the message from being logged.
1933 
1934         boolean noResolveMessage = false;
1935 
1936         if (logfile == STAXJob.SERVICE_LOG)
1937         {
1938             logName = serviceName + "_Service";
1939             noResolveMessage = true;
1940         }
1941         else if (logfile == STAXJob.JOB_LOG)
1942         {
1943             logName = serviceName + "_Job_" + fJobNumber;
1944             noResolveMessage = true;
1945         }
1946         else if (logfile == STAXJob.USER_JOB_LOG)
1947         {
1948             logName = serviceName + "_Job_" + fJobNumber + "_User";
1949         }
1950         else
1951         {
1952             // Log to STAX_Service log if invalid logfile specified
1953 
1954             STAXTimestamp currentTimestamp = new STAXTimestamp();
1955             System.out.println(currentTimestamp.getTimestampString() +
1956                                " STAX Service Error: Invalid Logfile " +
1957                                logfile);
1958 
1959             logName = serviceName + "_Service";
1960             noResolveMessage = true;
1961         }
1962 
1963         String logRequest = "LOG MACHINE LOGNAME " +
1964             STAFUtil.wrapData(logName) + " LEVEL " + level;
1965 
1966         if (noResolveMessage) logRequest += " NORESOLVEMESSAGE";
1967 
1968         logRequest += " MESSAGE " + STAFUtil.wrapData(message);
1969 
1970         STAFResult result = submitSync("LOCAL", "LOG", logRequest);
1971 
1972         // Check if the result was unsuccessful except ignore the following
1973         // errors:
1974         // - UnknownService (2) in case the LOG service is not registered
1975         // - HandleDoesNotExist (5) in case the STAX job's handle has been
1976         //   unregistered indicating the job has completed
1977 
1978         if ((result.rc != STAFResult.Ok) &&
1979             (result.rc != STAFResult.UnknownService) &&
1980             (result.rc != STAFResult.HandleDoesNotExist))
1981         {
1982             if (logfile != STAXJob.USER_JOB_LOG)
1983             {
1984                 STAX.logToJVMLog(
1985                     "Error", fJobNumber, 0,
1986                     "STAXJob::log: Log request failed with RC " + result.rc +
1987                     " and Result " + result.result +
1988                     "  level: " + level +
1989                     "  logRequest: " + logRequest);
1990             }
1991             else if ((result.rc == STAFResult.VariableDoesNotExist) &&
1992                      (!noResolveMessage))
1993             {
1994                 // Retry logging the error message without resolving
1995                 // variables in the message
1996 
1997                 logRequest += " NORESOLVEMESSAGE";
1998 
1999                 result = submitSync("LOCAL", "LOG", logRequest);
2000             }
2001         }
2002 
2003         return result;
2004     }
2005 
clearLogs()2006     public void clearLogs()
2007     {
2008         String serviceName = fSTAX.getServiceName().toUpperCase();
2009         String jobLogName;
2010         String userJobLogName;
2011 
2012         jobLogName = serviceName + "_Job_" + fJobNumber;
2013         userJobLogName = serviceName + "_Job_" + fJobNumber + "_User";
2014 
2015         String logRequest = "DELETE MACHINE " +
2016             fSTAX.getLocalMachineNickname() + " LOGNAME " +
2017             STAFUtil.wrapData(jobLogName) + " CONFIRM";
2018 
2019         STAFResult result = submitSync("LOCAL", "LOG", logRequest);
2020 
2021         logRequest = "DELETE MACHINE " + fSTAX.getLocalMachineNickname() +
2022             " LOGNAME " +  STAFUtil.wrapData(userJobLogName) + " CONFIRM";
2023 
2024         result = submitSync("LOCAL", "LOG", logRequest);
2025     }
2026 
clearJobDataDir()2027     public void clearJobDataDir()
2028     {
2029         // Delete the job data directory and recreate it
2030 
2031         File dir = new File(fJobDataDir);
2032 
2033         if (dir.exists())
2034         {
2035             String deleteDirRequest = "DELETE ENTRY " +
2036                 STAFUtil.wrapData(fJobDataDir) + " RECURSE CONFIRM";
2037 
2038             submitSync("local", "FS", deleteDirRequest);
2039         }
2040 
2041         if (!dir.exists())
2042         {
2043             dir.mkdirs();
2044         }
2045     }
2046 
getBreakpointFunctionList()2047     public List<String> getBreakpointFunctionList()
2048     {
2049         return fBreakpointFunctionList;
2050     }
2051 
setBreakpointFunctionList(List<String> functionList)2052     public void setBreakpointFunctionList(List<String> functionList)
2053     {
2054         synchronized(fBreakpointFunctionList)
2055         {
2056             fBreakpointFunctionList = functionList;
2057         }
2058     }
2059 
addBreakpointFunction(String functionName)2060     public int addBreakpointFunction(String functionName)
2061     {
2062         int breakpointID = getNextBreakpointNumber();
2063 
2064         synchronized(fBreakpointFunctionList)
2065         {
2066             fBreakpointFunctionList.add(functionName);
2067         }
2068 
2069         synchronized (fBreakpointsMap)
2070         {
2071             fBreakpointsMap.put(String.valueOf(breakpointID),
2072                 new STAXBreakpoint(BREAKPOINT_FUNCTION,
2073                                   functionName, "", "", ""));
2074         }
2075 
2076         return breakpointID;
2077     }
2078 
isBreakpointFunction(String functionName)2079     public boolean isBreakpointFunction(String functionName)
2080     {
2081         synchronized (fBreakpointsMap)
2082         {
2083             for (STAXBreakpoint breakpoint : fBreakpointsMap.values())
2084             {
2085                 if (functionName.equals(breakpoint.getFunction()))
2086                 {
2087                     return true;
2088                 }
2089             }
2090         }
2091 
2092         return false;
2093     }
2094 
getBreakpointLineList()2095     public List<String> getBreakpointLineList()
2096     {
2097         return fBreakpointLineList;
2098     }
2099 
getBreakpointsMap()2100     public TreeMap<String, STAXBreakpoint> getBreakpointsMap()
2101     {
2102         return fBreakpointsMap;
2103     }
2104 
setBreakpointLineList(List<String> lineList)2105     public void setBreakpointLineList(List<String> lineList)
2106     {
2107         synchronized(fBreakpointLineList)
2108         {
2109             fBreakpointLineList = lineList;
2110         }
2111     }
2112 
removeBreakpoint(String id)2113     public STAFResult removeBreakpoint(String id)
2114     {
2115         synchronized(fBreakpointsMap)
2116         {
2117             if (!(fBreakpointsMap.containsKey(id)))
2118             {
2119                 return new STAFResult(STAFResult.DoesNotExist, id);
2120             }
2121             else
2122             {
2123                 fBreakpointsMap.remove(id);
2124 
2125                 return new STAFResult(STAFResult.Ok);
2126             }
2127         }
2128     }
2129 
addBreakpointLine(String line, String file, String machine)2130     public int addBreakpointLine(String line, String file, String machine)
2131     {
2132         int breakpointID = getNextBreakpointNumber();
2133 
2134         synchronized(fBreakpointLineList)
2135         {
2136             fBreakpointLineList.add(line + " " + machine + " " + file);
2137         }
2138 
2139         synchronized (fBreakpointsMap)
2140         {
2141             fBreakpointsMap.put(String.valueOf(breakpointID),
2142                 new STAXBreakpoint(BREAKPOINT_LINE,
2143                                    "", line, file, machine));
2144         }
2145 
2146         return breakpointID;
2147     }
2148 
isBreakpointLine(String line, String file, String machine)2149     public boolean isBreakpointLine(String line, String file, String machine)
2150     {
2151         synchronized (fBreakpointsMap)
2152         {
2153             for (STAXBreakpoint breakpoint : fBreakpointsMap.values())
2154             {
2155                 if (line.equals(breakpoint.getLine()) &&
2156                     file.equals(breakpoint.getFile()) &&
2157                     (breakpoint.getMachine().equals("") ||
2158                     machine.equalsIgnoreCase(breakpoint.getMachine())))
2159                 {
2160                     return true;
2161                 }
2162             }
2163         }
2164 
2165         return false;
2166     }
2167 
breakpointsEmpty()2168     public boolean breakpointsEmpty()
2169     {
2170         return (fBreakpointsMap.isEmpty());
2171     }
2172 
2173     // Submit methods
2174 
submitAsync(String location, String service, String request, STAXSTAFRequestCompleteListener listener)2175     public STAFResult submitAsync(String location, String service,
2176         String request, STAXSTAFRequestCompleteListener listener)
2177     {
2178         STAFResult result;
2179 
2180         synchronized(fRequestMap)
2181         {
2182             result = fHandle.submit2(
2183                 STAFHandle.ReqQueue, location, service, request);
2184 
2185             if (result.rc == STAFResult.Ok)
2186             {
2187                 try
2188                 {
2189                     // Convert request number to a negative integer if greater
2190                     // then Integer.MAX_VALUE
2191 
2192                     Integer requestNumber = STAXUtil.convertRequestNumber(
2193                         result.result);
2194 
2195                     fRequestMap.put(requestNumber, listener);
2196 
2197                     result.result = requestNumber.toString();
2198                 }
2199                 catch (NumberFormatException e)
2200                 {
2201                     STAX.logToJVMLog("Error", fJobNumber, 0,
2202                         "STAXJob::submitAsync - " + e.toString());
2203 
2204                     result.result = "0";
2205                 }
2206             }
2207         }
2208 
2209         return result;
2210     }
2211 
submitSync(String location, String service, String request)2212     public STAFResult submitSync(String location, String service,
2213                                  String request)
2214     {
2215         STAFHandle theHandle = fHandle;
2216 
2217         // If the STAX job's handle has not yet been assigned, use the
2218         // STAX service's handle to submit the STAF service request
2219 
2220         if (fHandle == null)
2221             theHandle = fSTAX.getSTAFHandle();
2222 
2223         STAFResult result = theHandle.submit2(
2224             STAFHandle.ReqSync, location, service, request);
2225 
2226         return result;
2227     }
2228 
submitAsyncForget(String location, String service, String request)2229     public STAFResult submitAsyncForget(String location, String service,
2230                                         String request)
2231     {
2232         STAFResult result = fHandle.submit2(STAFHandle.ReqFireAndForget,
2233             location, service, request);
2234 
2235         return result;
2236     }
2237 
2238     // STAXSTAFQueueListener method
2239 
handleQueueMessage(STAXSTAFMessage message, STAXJob job)2240     public void handleQueueMessage(STAXSTAFMessage message, STAXJob job)
2241     {
2242         int requestNumber = message.getRequestNumber();
2243 
2244         STAXSTAFRequestCompleteListener listener = null;
2245 
2246         synchronized (fRequestMap)
2247         {
2248             Integer key = new Integer(requestNumber);
2249 
2250             listener = fRequestMap.get(key);
2251 
2252             if (listener != null)
2253             {
2254                 fRequestMap.remove(key);
2255             }
2256         }
2257 
2258         if (listener != null)
2259         {
2260             listener.requestComplete(requestNumber, new STAFResult(
2261                 message.getRequestRC(), message.getRequestResult()));
2262         }
2263         else
2264         {   // Log a message in the job log
2265             String msg = "STAXJob.handleQueueMessage: " +
2266                          " No listener found for message:\n" +
2267                          STAFMarshallingContext.unmarshall(
2268                              message.getResult()).toString();
2269             job.log(STAXJob.JOB_LOG, "warning", msg);
2270         }
2271     }
2272 
2273     // STAXThreadCompleteListener method
2274 
threadComplete(STAXThread thread, int endCode)2275     public void threadComplete(STAXThread thread, int endCode)
2276     {
2277         // Debug:
2278         if (false)
2279         {
2280             STAX.logToJVMLog(
2281             	"Debug", thread,
2282             	"STAXJob::threadComplete: Thread #" +
2283             	thread.getThreadNumber() + " complete");
2284         }
2285 
2286         // Flush the stdout and stderr to make sure that all output is
2287         // printed.  This is because if a STAX job uses sys.stdout.write
2288         // or sys.stderr.write in a script element, it is possible that
2289         // these messages have not all been flushed from the buffer and,
2290         // thus, not logged by the STAXPythonOutput class.
2291         // This was added as requested by Bug #1510.
2292         try
2293         {
2294             thread.pyExec("sys.stdout.flush(); sys.stderr.flush()");
2295         }
2296         catch (STAXPythonEvaluationException e)
2297         {
2298             // Do nothing
2299         }
2300 
2301         boolean jobComplete = false;
2302 
2303         synchronized (fThreadMap)
2304         {
2305             fThreadMap.remove(thread.getThreadNumberAsInteger());
2306 
2307             if (fThreadMap.isEmpty())
2308                 jobComplete = true;
2309         }
2310 
2311         if (jobComplete == true)
2312         {
2313             // Perform terminateJob on interested parties
2314 
2315             fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this)
2316             {
2317                 public void visit(Object o, Iterator iter)
2318                 {
2319                     STAXJobManagementHandler handler =
2320                                              (STAXJobManagementHandler)o;
2321 
2322                     handler.terminateJob((STAXJob)fData);
2323                 }
2324             });
2325 
2326             // End the STAXTimedEventQueue thread for the job
2327             if (fSTAX.getTimedEventQueuePerJob())
2328                 fTimedEventQueue.end();
2329 
2330             // Set the result so that it is available upon job completion.
2331 
2332             String resultToEval = "STAXResult";
2333 
2334             try
2335             {
2336                 if (thread.pyBoolEval("isinstance(STAXResult, STAXGlobal)"))
2337                 {
2338                     // Use the STAXGlobal class's get() method so that the job
2339                     // result when using toString() will be it's contents and
2340                     // not org.python.core.PyFinalizableInstance
2341 
2342                     resultToEval = "STAXResult.get()";
2343                 }
2344             }
2345             catch (STAXPythonEvaluationException e)
2346             {   /* Ignore error and assume not a STAXGlobal object */ }
2347 
2348             try
2349             {
2350                 fResult = thread.pyObjectEval(resultToEval);
2351             }
2352             catch (STAXPythonEvaluationException e)
2353             {
2354                 fResult = Py.None;
2355             }
2356 
2357             // Log the result from the job in a Status message in the Job log
2358 
2359             STAFMarshallingContext mc = STAFMarshallingContext.
2360                 unmarshall(fResult.toString());
2361             log(STAXJob.JOB_LOG, "status", "Job Result: " + mc);
2362 
2363             // Log a Stop message for the job in the STAX Service and Job logs
2364 
2365             String msg = "JobID: " + fJobNumber;
2366             log(STAXJob.SERVICE_LOG, "stop", msg);
2367             log(STAXJob.JOB_LOG, "stop", msg);
2368 
2369             // Get the current date and time and set as job ending date/time
2370 
2371             fEndTimestamp = new STAXTimestamp();
2372 
2373             // Generate job completion event
2374 
2375             HashMap<String, String> jobEndMap = new HashMap<String, String>();
2376             jobEndMap.put("type", "job");
2377             jobEndMap.put("block", "main");
2378             jobEndMap.put("status", "end");
2379             jobEndMap.put("jobID", String.valueOf(fJobNumber));
2380             jobEndMap.put("result", fResult.toString());
2381             jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString());
2382 
2383             generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true);
2384 
2385             // Query its job log for any messages with level "Error".
2386             // Save the result's marshalling context so it's available when
2387             // writing the job result to a file and when the RETURNRESULT
2388             // option is specified on a STAX EXECUTE request
2389 
2390             fJobLogErrorsMC = getJobLogErrors();
2391 
2392             // Write the job result information to files in the STAX job
2393             // directory
2394 
2395             writeJobResultsToFile();
2396 
2397             // Send a STAF/Service/STAX/End message to indicate the job has
2398             // completed
2399 
2400             submitSync("local", "QUEUE", "QUEUE TYPE STAF/Service/STAX/End " +
2401                        "MESSAGE " + STAFUtil.wrapData(""));
2402 
2403             // Debug
2404             if (COUNT_PYCODE_CACHES && STAX.CACHE_PYTHON_CODE)
2405             {
2406                 STAX.logToJVMLog(
2407                     "Debug", thread,
2408                     "STAXJob::threadComplete: Job " + fJobNumber + ": " +
2409                     " cacheGets=" + fCompiledPyCodeCacheGets +
2410                     " cacheAdds=" + fCompiledPyCodeCacheAdds);
2411             }
2412 
2413             while (!fCompletionNotifiees.isEmpty())
2414             {
2415                 STAXJobCompleteListener listener =
2416                     fCompletionNotifiees.removeFirst();
2417 
2418                 if (listener != null) listener.jobComplete(this);
2419             }
2420 
2421             try
2422             {
2423                 fHandle.unRegister();
2424             }
2425             catch (STAFException e)
2426             {
2427                 /* Do Nothing */
2428             }
2429 
2430             // Clear out job's private variables
2431 
2432             fScripts = new ArrayList<String>();
2433             fScriptFiles = new ArrayList<String>();
2434             fThreadMap = new LinkedHashMap<Integer, STAXThread>();
2435             fCompletionNotifiees = new LinkedList<STAXJobCompleteListener>();
2436             fCompiledPyCodeCache = new HashMap<String, PyCode>();
2437 
2438             // Commented out setting these variables to new (empty) objects,
2439             // as this could cause the "Testcases" and "Testcase Totals" values
2440             // in the job result to be empty due to timing issues
2441             //fTestcaseList = new ArrayList();
2442             //fTestcaseTotalsMap = new HashMap();
2443 
2444             // Commented out setting the following variables to null because
2445             // STAFQueueMonitor.notifyListeners() is running in another
2446             // thread and could access these variables and get a NPE.
2447             // fQueueListenerMap = null;
2448             // fHandle = null;
2449             // fSTAFQueueMonitor = null;
2450         }
2451     }
2452 
2453     /**
2454      * This method is called to clean up a job that is in a pending state.
2455      * This is a job that has had a job number assigned to it but did not
2456      * start execution because either an error occurred (e.g. xml parsing,
2457      * etc) before the job actually started execution or because the TEST
2458      * option was specified, which indicates to test that the job doesn't
2459      * contain xml parsing or Python compile errors and to not actually
2460      * run the job.
2461      */
cleanupPendingJob(STAFResult result)2462     public void cleanupPendingJob(STAFResult result)
2463     {
2464         STAXJob job = fSTAX.removeFromJobMap(getJobNumberAsInteger());
2465 
2466         if (job == null)
2467         {
2468             // The job is not in the job map
2469             return;
2470         }
2471 
2472         if (result.rc != STAFResult.Ok)
2473         {
2474             setCompletionStatus(TERMINATED_STATUS);
2475 
2476             // Log the error message in the job log
2477 
2478             if (STAFMarshallingContext.isMarshalledData(result.result))
2479             {
2480                 try
2481                 {
2482                     STAFMarshallingContext mc =
2483                         STAFMarshallingContext.unmarshall(result.result);
2484                     Map resultMap = (Map)mc.getRootObject();
2485                     result.result = (String)resultMap.get("errorMsg");
2486                 }
2487                 catch (Exception e)
2488                 {
2489                     STAX.logToJVMLog(
2490                         "Error", fJobNumber, 0,
2491                         "STAXJob::cleanupPendingJob: " + e.toString());
2492                 }
2493             }
2494 
2495             log(STAXJob.JOB_LOG, "error", result.result);
2496         }
2497 
2498         // Log the testcase totals in the Job log by first calling the
2499         // STAXTestcaseActionFactory's initJob() method (to initialize the
2500         // testcase totals to 0) and then by calling its terminateJob() method
2501         // which logs the testcase totals
2502 
2503         STAXJobManagementHandler testcaseHandler =
2504             (STAXJobManagementHandler)fSTAX.getActionFactory("testcase");
2505 
2506         testcaseHandler.initJob(this);
2507         testcaseHandler.terminateJob(this);
2508 
2509         // Log the result from the job in a Status message in the Job log
2510 
2511         STAFMarshallingContext mc = STAFMarshallingContext.
2512             unmarshall(fResult.toString());
2513         log(STAXJob.JOB_LOG, "status", "Job Result: " + mc);
2514 
2515         // Log a Stop message for the job in the STAX Service and Job logs
2516 
2517         String msg = "JobID: " + fJobNumber;
2518         log(STAXJob.SERVICE_LOG, "stop", msg);
2519         log(STAXJob.JOB_LOG, "stop", msg);
2520 
2521         if (fEndTimestamp == null)
2522             fEndTimestamp = new STAXTimestamp();
2523 
2524         // Generate job completion event
2525 
2526         HashMap<String, String> jobEndMap = new HashMap<String, String>();
2527         jobEndMap.put("type", "job");
2528         jobEndMap.put("block", "main");
2529         jobEndMap.put("status", "end");
2530         jobEndMap.put("jobID", String.valueOf(fJobNumber));
2531         jobEndMap.put("result", fResult.toString());
2532         jobEndMap.put("jobCompletionStatus", getCompletionStatusAsString());
2533 
2534         generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true);
2535 
2536         // Query its job log for any messages with level "Error".
2537         // Save the result's marshalling context so it's available when
2538         // writing the job result to a file and when the RETURNRESULT
2539         // option is specified on a STAX EXECUTE request
2540 
2541         fJobLogErrorsMC = getJobLogErrors();
2542 
2543         // Write the job result information to files in the STAX job
2544         // directory
2545 
2546         writeJobResultsToFile();
2547 
2548         if (fHandle != null)
2549         {
2550             // Unregister the job's handle
2551             try
2552             {
2553                 fHandle.unRegister();
2554             }
2555             catch (STAFException e)
2556             {
2557                 /* Do Nothing */
2558             }
2559         }
2560     }
2561 
2562     /**
2563      * Query the STAX Job Log to get any messages with level "Error" that
2564      * were logged for this job.
2565      *
2566      * @return STAFMarshallingContext Return a marshalling context for the
2567      * result from the LOG QUERY request whose root object is a list of
2568      * error messages.
2569      */
getJobLogErrors()2570     private STAFMarshallingContext getJobLogErrors()
2571     {
2572         String jobLogName = fSTAX.getServiceName().toUpperCase() + "_Job_" +
2573             getJobNumber();
2574 
2575         String request = "QUERY MACHINE " + fSTAX.getLocalMachineNickname() +
2576             " LOGNAME " + STAFUtil.wrapData(jobLogName) +
2577             " LEVELMASK Error" +
2578             " FROM " + getStartTimestamp().getTimestampString();
2579 
2580         STAFResult result = submitSync("local", "LOG", request);
2581 
2582         // RC 4010 from the LOG QUERY request means exceeded the default
2583         // maximum query records
2584 
2585         if ((result.rc == STAFResult.Ok) || (result.rc == 4010))
2586             return result.resultContext;
2587 
2588         // Ignore the following errors from the LOG QUERY request:
2589         // - UnknownService (2) in case the LOG service is not registered
2590         // - HandleDoesNotExist (5) in case the STAX job's handle has been
2591         //   unregistered indicating the job has completed
2592 
2593         if ((result.rc != STAFResult.UnknownService) &&
2594             (result.rc != STAFResult.HandleDoesNotExist))
2595         {
2596             // Log an error in the JVM log
2597 
2598             STAX.logToJVMLog(
2599                 "Error", getJobNumber(), 0,
2600                 "STAXJob::getJobLogErrors() failed for Job ID: " +
2601                 getJobNumber() + "\nSTAF local LOG " + request +
2602                 "  RC=" + result.rc + ", Result=" + result.result);
2603         }
2604 
2605         return null;
2606     }
2607 
2608     /*
2609      * Write the marshalled job result information to files in the STAX job
2610      * directory
2611      */
writeJobResultsToFile()2612     private void writeJobResultsToFile()
2613     {
2614         // Create the marshalling context for the results without the testcase
2615         // list
2616 
2617         STAFMarshallingContext resultMC = new STAFMarshallingContext();
2618         resultMC.setMapClassDefinition(fSTAX.fResultMapClass);
2619         resultMC.setMapClassDefinition(
2620             STAXTestcaseActionFactory.fTestcaseTotalsMapClass);
2621 
2622         Map<String, Object> resultMap = new TreeMap<String, Object>();
2623         resultMap.put("staf-map-class-name", fSTAX.fResultMapClass.name());
2624 
2625         resultMap = addCommonFieldsToResultMap(resultMap);
2626 
2627         resultMC.setRootObject(resultMap);
2628 
2629         String marshalledResultString = resultMC.marshall();
2630 
2631         // Write the marshalled string for the results without the testcase
2632         // list to file marshalledResults.txt in directory fJobDataDir
2633 
2634         writeStringToFile(fSTAX.getResultFileName(fJobNumber),
2635                           marshalledResultString);
2636 
2637         // Create the marshalling context for the results with the testcase
2638         // list
2639 
2640         resultMC = new STAFMarshallingContext();
2641         resultMC.setMapClassDefinition(fSTAX.fDetailedResultMapClass);
2642         resultMC.setMapClassDefinition(
2643             STAXTestcaseActionFactory.fTestcaseTotalsMapClass);
2644         resultMC.setMapClassDefinition(
2645             STAXTestcaseActionFactory.fQueryTestcaseMapClass);
2646 
2647         resultMap = new TreeMap<String, Object>();
2648         resultMap.put("staf-map-class-name",
2649                       fSTAX.fDetailedResultMapClass.name());
2650 
2651         resultMap = addCommonFieldsToResultMap(resultMap);
2652 
2653         resultMap.put("testcaseList", fTestcaseList);
2654 
2655         resultMC.setRootObject(resultMap);
2656 
2657         marshalledResultString = resultMC.marshall();
2658 
2659         // Write the marshalled string for the results with the testcase
2660         // list to file marshalledResultsLong.txt in directory fJobDataDir
2661 
2662         writeStringToFile(fSTAX.getDetailedResultFileName(fJobNumber),
2663                           marshalledResultString);
2664     }
2665 
addCommonFieldsToResultMap( Map<String, Object> inResultMap)2666     private Map<String, Object> addCommonFieldsToResultMap(
2667         Map<String, Object> inResultMap)
2668     {
2669         Map<String, Object> resultMap = new HashMap<String, Object>(inResultMap);
2670 
2671         if (!fJobName.equals(""))
2672             resultMap.put("jobName", fJobName);
2673 
2674         resultMap.put("startTimestamp",
2675                       fStartTimestamp.getTimestampString());
2676         resultMap.put("endTimestamp",
2677                       fEndTimestamp.getTimestampString());
2678         resultMap.put("status", getCompletionStatusAsString());
2679         resultMap.put("result", fResult.toString());
2680         resultMap.put("testcaseTotals", fTestcaseTotalsMap);
2681         resultMap.put("jobLogErrors", fJobLogErrorsMC);
2682         resultMap.put("xmlFileName", fXmlFile);
2683         resultMap.put("fileMachine", fXmlMachine);
2684         resultMap.put("function", getStartFunction());
2685         resultMap.put("arguments",
2686                       STAFUtil.maskPrivateData(getStartFuncArgs()));
2687         resultMap.put("scriptList", fScripts);
2688         resultMap.put("scriptFileList", fScriptFiles);
2689 
2690         if (!fScriptFileMachine.equals(""))
2691             resultMap.put("scriptMachine", fScriptFileMachine);
2692 
2693         return resultMap;
2694     }
2695 
2696     /**
2697      *  Write the contents of a string to the specified file name
2698      */
writeStringToFile(String fileName, String data)2699     private void writeStringToFile(String fileName, String data)
2700     {
2701         FileWriter out = null;
2702 
2703         try
2704         {
2705             File outFile = new File(fileName);
2706             out = new FileWriter(outFile);
2707             out.write(data);
2708         }
2709         catch (IOException e)
2710         {
2711             log(STAXJob.JVM_LOG, "Error",
2712                 "Error writing to job results file: " + fileName + ".\n" +
2713                 e.toString());
2714         }
2715         finally
2716         {
2717             try
2718             {
2719                 if (out != null) out.close();
2720             }
2721             catch (IOException e)
2722             {
2723                 // Do nothing
2724             }
2725         }
2726     }
2727 
2728     private STAX fSTAX;
2729     private STAXDocument fDocument;
2730     private STAXCallAction fDefaultCallAction = null;
2731     private Object fNextThreadNumberSynch = new Object();
2732     private int fNextThreadNumber = 1;
2733     private int fJobNumber = 0;
2734     private Object fNextProcNumberSynch = new Object();
2735     private int fProcNumber = 1;
2736     private Object fNextCmdNumberSynch = new Object();
2737     private int fCmdNumber = 1;
2738     private Object fNextProcessKeySynch = new Object();
2739     private int fProcessKey = 1;
2740     private String fJobDataDir = new String();
2741     private String fJobName = new String();
2742     private String fXmlMachine = new String();
2743     private String fXmlFile = new String();
2744     private boolean fClearlogs;
2745     private int fMaxSTAXThreads;
2746     private String fWaitTimeout = null;
2747     private String fStartFunction = new String();
2748     private String fStartFuncArgs = new String();
2749     private List<String> fScripts = new ArrayList<String>();
2750     private List<String> fScriptFiles = new ArrayList<String>();
2751     private String fScriptFileMachine = new String();
2752     private Map<Integer, STAXThread> fThreadMap =
2753         new LinkedHashMap<Integer, STAXThread>();
2754     private STAFHandle fHandle = null;
2755     private STAFQueueMonitor fSTAFQueueMonitor = null;
2756     private LinkedList<STAXJobCompleteListener> fCompletionNotifiees =
2757         new LinkedList<STAXJobCompleteListener>();
2758 
2759     /**
2760      * Used if the STAX parameter TimedEventQueue is set to "Job" to indicate
2761      * that a STAXTimedEventQueue should be created for each job instead of
2762      * using a common STAXTimedEventQueue for all jobs.
2763      */
2764     private STAXTimedEventQueue fTimedEventQueue = null;
2765 
2766     // fExecuteAndHold:  null indicates no hold
2767     //                   blank indicates hold with no timeout
2768     //                   otherwise indicates hold with a timeout
2769     private String fExecuteAndHold = null;
2770 
2771     private String fSourceMachine = new String();  // Source == Originating
2772     private String fSourceHandleName = new String();
2773     private int fSourceHandle;
2774     private int fNotifyOnEnd = STAXJob.NO_NOTIFY_ONEND;
2775     private int fState = PENDING_STATE;
2776     private PyObject fResult = Py.None;
2777     private int fCompletionStatus = STAXJob.NORMAL_STATUS;
2778     private STAXTimestamp fStartTimestamp;
2779     private STAXTimestamp fEndTimestamp;
2780     private HashMap<String, TreeSet<STAXSTAFQueueListener>> fQueueListenerMap =
2781         new HashMap<String, TreeSet<STAXSTAFQueueListener>>();
2782     private HashMap<String, Object> fDataMap = new HashMap<String, Object>();
2783 
2784     // Map of active STAF requests
2785     private HashMap<Integer, STAXSTAFRequestCompleteListener> fRequestMap =
2786         new HashMap<Integer, STAXSTAFRequestCompleteListener>();
2787 
2788     private boolean fLogTCElapsedTime;
2789     private boolean fLogTCNumStarts;
2790     private boolean fLogTCStartStop;
2791     private int fPythonOutput;
2792     private String fPythonLogLevel;
2793     private int fInvalidLogLevelAction;
2794     private HashMap<String, PyCode> fCompiledPyCodeCache =
2795         new HashMap<String, PyCode>();
2796     private long fCompiledPyCodeCacheAdds = 0;
2797     private long fCompiledPyCodeCacheGets = 0;
2798     private List<Map<String, Object>> fTestcaseList =
2799         new ArrayList<Map<String, Object>>();
2800     private List<String> fBreakpointFunctionList = new ArrayList<String>();
2801     private List<String> fBreakpointLineList = new ArrayList<String>();
2802     private Map<String, String> fTestcaseTotalsMap =
2803         new HashMap<String, String>();
2804     private TreeMap<String, STAXBreakpoint> fBreakpointsMap =
2805         new TreeMap<String, STAXBreakpoint>();
2806     private Object fNextBreakpointNumberSynch = new Object();
2807     private int fNextBreakpointNumber = 1;
2808     private boolean fBreakpointFirstFunction = false;
2809     private boolean fBreakpointSubjobFirstFunction = false;
2810     private STAFMarshallingContext fJobLogErrorsMC =
2811         new STAFMarshallingContext();
2812 
2813     class STAXSTAFQueueListenerComparator
2814         implements Comparator<STAXSTAFQueueListener>
2815     {
compare(STAXSTAFQueueListener o1, STAXSTAFQueueListener o2)2816         public int compare(STAXSTAFQueueListener o1, STAXSTAFQueueListener o2)
2817         {
2818             if (o1.hashCode() < o2.hashCode()) return -1;
2819             else if (o1.hashCode() > o2.hashCode()) return 1;
2820 
2821             return 0;
2822         }
2823     }
2824 
2825     class STAFQueueMonitor extends Thread
2826     {
STAFQueueMonitor(STAXJob job)2827         STAFQueueMonitor(STAXJob job)
2828         {
2829             fJob = job;
2830         }
2831 
2832         /**
2833          *  Log an error message in the JVM log and the STAX Job log
2834          */
logMessage(String message, Throwable t)2835         public void logMessage(String message, Throwable t)
2836         {
2837             STAXTimestamp currentTimestamp = new STAXTimestamp();
2838 
2839             message = "STAXJob$STAFQueueMonitor.run(): " + message;
2840 
2841             if (t != null)
2842             {
2843                 // Add the Java stack trace to the message
2844 
2845                 StringWriter sw = new StringWriter();
2846                 t.printStackTrace(new PrintWriter(sw));
2847 
2848                 if (t.getMessage() != null)
2849                     message += "\n" + t.getMessage() + "\n" + sw.toString();
2850                 else
2851                     message += "\n" + sw.toString();
2852             }
2853 
2854             // Truncate the message to a maximum size of 3000 (in case
2855             // the result from the QUEUE GET WAIT request is very large)
2856 
2857             if (message.length() > 3000)
2858                 message = message.substring(0, 3000) + "...";
2859 
2860             // Log an error message in the STAX JVM log
2861 
2862             STAX.logToJVMLog(
2863                 "Error", fJob.getJobNumber(), 0, message);
2864 
2865             // Log an error message in the STAX Job log
2866 
2867             fJob.log(STAXJob.JOB_LOG, "Error", message);
2868         }
2869 
run()2870         public void run()
2871         {
2872             STAFMarshallingContext mc;
2873             List messageList;
2874             int maxMessages = fJob.getSTAX().getMaxGetQueueMessages();
2875             String request = "GET WAIT FIRST " + maxMessages;
2876             STAFResult result;
2877             int numErrors = 0;
2878 
2879             // Maximum consecutive errors submitting a local QUEUE GET WAIT
2880             // request before we decide to exit the infinite loop
2881             int maxErrors = 5;
2882 
2883             // Process messages on the STAX job handle's queue until we get
2884             // a "STAF/Service/STAX/End" message or until an error occurs 5
2885             // consecutive times submitting a STAF local QUEUE GET request
2886             // (so that we don't get stuck in an infinite loop eating CPU).
2887 
2888             for (;;)
2889             {
2890                 result = null;
2891 
2892                 try
2893                 {
2894                     // For better performance when more than 1 message is on
2895                     // the queue waiting to be processed, get multiple
2896                     // messages off the queue at a time (up to maxMessages)
2897                     // so that the total size of the messages hopefully won't
2898                     // cause an OutOfMemory problem.
2899                     // Note:  If an error occurs unmarshalling the list of
2900                     // messages, all these messages will be lost.
2901 
2902                     result = submitSync("local", "QUEUE", request);
2903 
2904                     if (result == null)
2905                     {
2906                         numErrors++;
2907 
2908                         logMessage(
2909                             "STAF local QUEUE " + request +
2910                             " returned null. This may have been caused by " +
2911                             "running out of memory creating the result.",
2912                             null);
2913 
2914                         if (numErrors < maxErrors)
2915                         {
2916                             continue;
2917                         }
2918                         else
2919                         {
2920                             logMessage(
2921                                 "Exiting this thread after the QUEUE GET " +
2922                                 "request failed " + maxErrors +
2923                                 " consecutive times.", null);
2924 
2925                             return;  // Exit STAFQueueMonitor thread
2926                         }
2927                     }
2928 
2929                     if (result.rc == STAFResult.Ok)
2930                     {
2931                         numErrors = 0;
2932                     }
2933                     else if (result.rc == STAFResult.HandleDoesNotExist)
2934                     {
2935                         // This means that the STAX job's handle has been
2936                         // unregistered which means that the STAX job is no
2937                         // longer running so we should exit this thread.
2938                         // We've seen this happen before this thread gets
2939                         // the message with type "STAF/Service/STAX/End" off
2940                         // the queue.
2941 
2942                         return;  // Exit STAFQueueMonitor thread
2943                     }
2944                     else
2945                     {
2946                         numErrors++;
2947 
2948                         logMessage(
2949                             "STAF local QUEUE " + request +
2950                             " failed with RC=" + result.rc + ", Result=" +
2951                             result.result, null);
2952 
2953                         if (numErrors < maxErrors)
2954                         {
2955                             continue;
2956                         }
2957                         else
2958                         {
2959                             logMessage(
2960                                 "Exiting this thread after the QUEUE GET " +
2961                                 "request failed " + maxErrors +
2962                                 " consecutive times", null);
2963 
2964                             return;  // Exit STAFQueueMonitor thread
2965                         }
2966                     }
2967                 }
2968                 catch (Throwable t)
2969                 {
2970                     // Note: One possible reason for an exception occurring
2971                     // submitting a QUEUE GET WAIT request can be because
2972                     // an error occurred when the result was auto-unmarshalled
2973                     // due to invalid marshalled data in the result (though
2974                     // in STAF V3.3.3 a fix was made to the Java unmarshall
2975                     // method so that this problem should no longer occur).
2976 
2977                     numErrors++;
2978 
2979                     logMessage(
2980                         "Exception getting message(s) off the queue.", t);
2981 
2982                     if (numErrors < maxErrors)
2983                     {
2984                         continue;
2985                     }
2986                     else
2987                     {
2988                         logMessage(
2989                             "Exiting this thread after the QUEUE GET " +
2990                             "request failed " + maxErrors +
2991                             " consecutive times", null);
2992 
2993                         return;  // Exit STAFQueueMonitor thread
2994                     }
2995                 }
2996 
2997                 try
2998                 {
2999                     // Unmarshall the result from a QUEUE GET request, but
3000                     // don't unmarshall indirect objects
3001 
3002                     mc = STAFMarshallingContext.unmarshall(
3003                         result.result,
3004                         STAFMarshallingContext.IGNORE_INDIRECT_OBJECTS);
3005 
3006                     messageList = (List)mc.getRootObject();
3007                 }
3008                 catch (Throwable t)
3009                 {
3010                     // Log an error message and continue
3011 
3012                     logMessage(
3013                         "Exception unmarshalling queued messages. " +
3014                         "\nMarshalled string: " + result.result, t);
3015                     continue;
3016                 }
3017 
3018                 // Iterate through the list of messages removed from our
3019                 // handle's queue and process each message
3020 
3021                 Iterator iter = messageList.iterator();
3022                 STAXSTAFMessage msg;
3023 
3024                 while (iter.hasNext())
3025                 {
3026                     // Process the message
3027 
3028                     try
3029                     {
3030                         msg = new STAXSTAFMessage((Map)iter.next());
3031                     }
3032                     catch (Throwable t)
3033                     {
3034                         // Log an error message and continue
3035 
3036                         logMessage("Exception handling a queued message.", t);
3037                         continue;
3038                     }
3039 
3040                     String type = msg.getType();
3041 
3042                     if (type != null &&
3043                         type.equalsIgnoreCase("STAF/Service/STAX/End"))
3044                     {
3045                         return;  // Exit STAFQueueMonitorThread
3046                     }
3047 
3048                     try
3049                     {
3050                         notifyListeners(msg);
3051                     }
3052                     catch (Throwable t)
3053                     {
3054                         // Log an error message and continue
3055 
3056                         logMessage(
3057                             "Exception notifying listeners for a message " +
3058                             "with type '" + type +
3059                             "' from handle " + msg.getHandle() +
3060                             " on machine " + msg.getMachine() +
3061                             " \nMessage: " + msg.getMessage(), t);
3062                         continue;
3063                     }
3064                 }
3065             }
3066         }
3067 
notifyListeners(STAXSTAFMessage msg)3068         public void notifyListeners(STAXSTAFMessage msg)
3069         {
3070             // Perform a lookup of registered message handlers for
3071             // this message type and pass the STAXSTAFMessage to
3072             // the registered message handlers.
3073 
3074             String theType = msg.getType();
3075             int listenersFound = 0;
3076 
3077             synchronized (fQueueListenerMap)
3078             {
3079                 for (Map.Entry<String, TreeSet<STAXSTAFQueueListener>> entry :
3080                      fQueueListenerMap.entrySet())
3081                 {
3082                     String msgType = entry.getKey();
3083 
3084                     if ((msgType == null) ||
3085                         // Messages from 3.x clients that STAX is interested
3086                         // in will have a type
3087                         (theType != null &&
3088                          theType.equalsIgnoreCase(msgType)) ||
3089                         // The following is for messages from 2.x clients
3090                         // which will have a null type and the message will
3091                         // begin with the type (which for processes will be
3092                         // STAF/PROCESS/END instead of STAF/Process/End
3093                         (theType == null && msg.getMessage().toUpperCase().
3094                          startsWith(msgType.toUpperCase())))
3095                     {
3096                         TreeSet<STAXSTAFQueueListener> listenerSet =
3097                             entry.getValue();
3098 
3099                         if (listenerSet == null) continue;
3100 
3101                         for (STAXSTAFQueueListener listener : listenerSet)
3102                         {
3103                             listener.handleQueueMessage(msg, fJob);
3104                             listenersFound++;
3105                         }
3106                     }
3107                 }
3108 
3109                 if ((listenersFound == 0) &&
3110                     (theType.equals("STAF/RequestComplete") ||
3111                      theType.equals("STAF/Process/End")))
3112                 {
3113                     // Log a warning message in the job log as this could
3114                     // indicate a problem in the how the STAX service is
3115                     // handling these messages
3116 
3117                     try
3118                     {
3119                         fJob.log(STAXJob.JOB_LOG, "warning",
3120                                  "STAXJob.notifyListeners: " +
3121                                  "No listener found for message:\n" +
3122                                  STAFMarshallingContext.unmarshall(
3123                                      msg.getResult()).toString());
3124                     }
3125                     catch (Throwable t)
3126                     {
3127                         // Ignore
3128                     }
3129                 }
3130             }
3131         }
3132 
3133         STAXJob fJob;
3134 
3135     } // end STAFQueueMonitor
3136 
3137 }
3138