1 /*****************************************************************************/
2 /* Software Testing Automation Framework (STAF)                              */
3 /* (C) Copyright IBM Corp. 2001, 2004                                        */
4 /*                                                                           */
5 /* This software is licensed under the Eclipse Public License (EPL) V1.0.    */
6 /*****************************************************************************/
7 
8 package com.ibm.staf.service.event;
9 
10 import com.ibm.staf.*;
11 import com.ibm.staf.service.*;
12 import java.util.*;
13 import java.io.*;
14 
15 public class GenerationManager
16 implements Serializable, Runnable
17 {
18     static final long serialVersionUID = 1;
19     transient double version = 0.9;
20     transient double fileVersion;
21     transient static int fEventID = 0;
22     transient Hashtable fEventTable = new Hashtable();
23     transient PriorityQueue fQueue = new PriorityQueue();
24     transient Thread fNotifierThread;
25     transient Thread fQueueThread;
26     transient STAFHandle eventSTAFHandle = null;
27     transient String eventServiceName;
28     transient RegistrationManager eventRegManager = null;
29     transient String eventGenManagerFileName = null;
30     transient HashMap fRequestsAndNotifications = new HashMap();
31 
32     public class EventID
33     {
34         transient int id;
35         transient int generatingHandle;
36         transient String generatingMachine;
37         transient String generatingProcess;
38         transient String type;
39         transient String subType;
40         transient String[] properties;
41         transient boolean notify;  // Flag indicating if the event generator
42                                    // requested to be notified when all
43                                    // registered processes have acknowledged
44                                    // receiving the event.
45         transient Vector notificationList;
46 
EventID(int theID, int theGenHandle, String theGenMachine, String theGenProcess, String theType, String theSubType, String[] theProperties, boolean theNotify, Vector theNotificationList)47         EventID(int theID, int theGenHandle, String theGenMachine,
48                 String theGenProcess, String theType, String theSubType,
49                 String[] theProperties, boolean theNotify,
50                 Vector theNotificationList)
51         {
52             id                = theID;
53             generatingHandle  = theGenHandle;
54             generatingMachine = theGenMachine;
55             generatingProcess = theGenProcess;
56             type              = theType;
57             subType           = theSubType;
58             properties        = theProperties;
59             notify            = theNotify;
60             notificationList  = theNotificationList;
61         }
62 
getEventId()63         public int getEventId()
64         {
65             return id;
66         }
67 
getType()68         public String getType()
69         {
70             return type;
71         }
72 
getSubtype()73         public String getSubtype()
74         {
75             return subType;
76         }
77 
getPropertyMap()78         public Map getPropertyMap()
79         {
80             return getPropertyMap(false);
81         }
82 
getPropertyMap(boolean maskPrivateData)83         public Map getPropertyMap(boolean maskPrivateData)
84         {
85             Map propertyMap = new HashMap();
86 
87             for (int i = 0; i < properties.length; i++)
88             {
89                 int index = properties[i].indexOf('=');
90 
91                 if (index >= 0)
92                 {
93                     // Contains a "="
94                     String name = properties[i].substring(0, index);
95                     String value = properties[i].substring(index + 1);
96 
97                     if (!maskPrivateData)
98                         propertyMap.put(name, value);
99                     else
100                         propertyMap.put(name, STAFUtil.maskPrivateData(value));
101                 }
102                 else
103                 {
104                     // Does not contain an "=", so assign value = null
105                     String name = properties[i];
106                     String value = null;
107                     propertyMap.put(name, value);
108                 }
109             }
110 
111             return propertyMap;
112         }
113 
getNotificationList()114         public List getNotificationList()
115         {
116             List theNotificationList = new ArrayList();
117 
118             for (int i = 0; i < notificationList.size(); i++)
119             {
120                 Notification notification =
121                     (Notification)notificationList.get(i);
122                 theNotificationList.add(notification.getClientMap());
123             }
124 
125             return theNotificationList;
126         }
127 
getEventGeneratorMap()128         public Map getEventGeneratorMap()
129         {
130             Map eventGeneratorMap = EventService.fEventGeneratorMapClass.
131                 createInstance();
132             eventGeneratorMap.put("machine", generatingMachine);
133             eventGeneratorMap.put("handleName", generatingProcess);
134             eventGeneratorMap.put("handle", "" + generatingHandle);
135 
136             return eventGeneratorMap;
137         }
138     }
139 
140     public class Notification
141     {
142         Client client;
143         EventID eventID;
144         boolean async;
145 
Notification(Client client, EventID eventID, boolean async)146         public Notification(Client client, EventID eventID, boolean async)
147         {
148             this.client = client;
149             this.eventID = eventID;
150             this.async = async;
151         }
152 
getNotificationMap()153         synchronized public Map getNotificationMap()
154         {
155             Map notificationMap =
156                 EventService.fEventIDLongMapClass.createInstance();
157 
158             notificationMap.put("eventID", "" + eventID.getEventId());
159             notificationMap.put("type", eventID.getType());
160             notificationMap.put("subtype", eventID.getSubtype());
161 
162             // Currently, getNotificationMap() is only used when listing or
163             // querying Event IDs, so always get the properties with any
164             // private data masked, as indicated by passing true for the
165             // maskPrivateData argument.
166             notificationMap.put("propertyMap", eventID.getPropertyMap(true));
167 
168             notificationMap.put("generatedBy",
169                                 eventID.getEventGeneratorMap());
170             notificationMap.put("notificationList",
171                                 eventID.getNotificationList());
172 
173             return notificationMap;
174         }
175 
getClientMap()176         synchronized public Map getClientMap()
177         {
178             return client.getClientMap(EventService.fNotifieeMapClass);
179         }
180     }
181 
GenerationManager(STAFHandle theSTAFHandle, String serviceName, RegistrationManager regManager, String genManagerFileName)182     public GenerationManager(STAFHandle theSTAFHandle, String serviceName,
183                              RegistrationManager regManager,
184                              String genManagerFileName)
185     {
186         eventSTAFHandle = theSTAFHandle;
187         eventServiceName = serviceName;
188         eventRegManager = regManager;
189         eventGenManagerFileName = genManagerFileName;
190 
191         fNotifierThread = new Thread()
192         {
193             public void run()
194             {
195                 notificationThread();
196             }
197         };
198 
199         fNotifierThread.start();
200 
201         fQueueThread = new Thread(this);
202         fQueueThread.start(); // this calls the run() method
203     }
204 
serializeEventID(ObjectOutputStream os)205     synchronized public static void serializeEventID(ObjectOutputStream os)
206     throws IOException
207     {
208         os.writeInt(fEventID);
209     }
210 
deSerializeEventID(ObjectInputStream os)211     synchronized public static void deSerializeEventID(ObjectInputStream os)
212     throws IOException
213     {
214         fEventID = os.readInt();
215     }
216 
writeObject(ObjectOutputStream stream)217     private void writeObject(ObjectOutputStream stream)
218     throws IOException
219     {
220         stream.writeDouble(version);
221         serializeEventID(stream);
222     }
223 
readObject(ObjectInputStream stream)224     private void readObject(ObjectInputStream stream)
225     throws IOException, ClassNotFoundException
226     {
227         fileVersion = stream.readDouble();
228         deSerializeEventID(stream);
229     }
230 
serialize()231     synchronized void serialize()
232     {
233         try
234         {
235             ObjectOutputStream out = new ObjectOutputStream(
236                 new FileOutputStream(eventGenManagerFileName));
237 
238             out.writeObject(this);
239             out.close();
240         }
241         catch(Exception e)
242         {
243            if (EventService.DEBUG) e.printStackTrace();
244         }
245     }
246 
deSerialize()247     synchronized void deSerialize()
248     {
249         try
250         {
251             ObjectInputStream in =
252                 new ObjectInputStream(
253                 new FileInputStream(eventGenManagerFileName));
254 
255                 in.readObject();
256         }
257         catch(Exception e)
258         {
259             if (EventService.DEBUG) e.printStackTrace();
260         }
261     }
262 
generateEvent(String theGenMachine, String theGenProcess, int theGenHandle, String theType, String theSubtype, String[] theProperties, boolean notify, boolean async, Vector mergedClients)263     synchronized public int generateEvent(String theGenMachine,
264                                           String theGenProcess, int theGenHandle,
265                                           String theType, String theSubtype,
266                                           String[] theProperties,
267                                           boolean notify,
268                                           boolean async,
269                                           Vector mergedClients)
270     {
271         if ((mergedClients == null) || (mergedClients.size() == 0))
272             return EventService.kNoClientsForEvent;
273 
274         Integer nextID = new Integer(++fEventID);
275         Vector notificationList = new Vector();
276         EventID eventID = new EventID(nextID.intValue(), theGenHandle,
277                                       theGenMachine, theGenProcess, theType,
278                                       theSubtype, theProperties, notify,
279                                       notificationList);
280 
281         fEventTable.put(nextID, eventID);
282 
283         if ((mergedClients != null) && (mergedClients.size() != 0))
284         {
285             for (Enumeration e = mergedClients.elements(); e.hasMoreElements();)
286             {
287                 Client client = new Client((Client)e.nextElement());
288                 Notification notification = new Notification(client,
289                                                              eventID,
290                                                              async);
291 
292                 notificationList.addElement(notification);
293                 fQueue.enqueue(System.currentTimeMillis(), notification);
294             }
295         }
296         else return EventService.kNoClientsForEvent;
297 
298         synchronized(fNotifierThread)
299         {
300             fNotifierThread.notify();
301         }
302 
303         return nextID.intValue();
304     }
305 
sendMessage(Map propertyMap, Notification notification)306     synchronized private int sendMessage(Map propertyMap,
307                                          Notification notification)
308     {
309         String type = "STAF/Service/Event";
310         STAFResult fSTAFResult = null;
311         TimeStamp now = new TimeStamp();
312 
313         Map messageMap = new HashMap();
314         messageMap.put("eventServiceName", eventServiceName);
315         messageMap.put("eventID", "" + notification.eventID.id);
316         messageMap.put("machine", notification.eventID.generatingMachine);
317         messageMap.put("handleName", notification.eventID.generatingProcess);
318         messageMap.put("handle", "" + notification.eventID.generatingHandle);
319         messageMap.put("timestamp", now.currentDate() + "-" +
320                        now.currentTime());
321         messageMap.put("type", notification.eventID.type);
322         messageMap.put("subtype", notification.eventID.subType);
323         messageMap.put("propertyMap", propertyMap);
324 
325         STAFMarshallingContext mc = new STAFMarshallingContext();
326         mc.setRootObject(messageMap);
327         String message = mc.marshall();
328 
329         if (notification.client.fWho.handle != 0)
330         {
331             if (notification.async)
332             {
333                 // ASYNC is an undocumented option,
334                 // only used by the STAX service
335                 fSTAFResult = eventSTAFHandle.submit2(STAFHandle.ReqQueue,
336                     notification.client.getMachineName(),
337                     "QUEUE", "QUEUE HANDLE " + notification.client.fWho.handle +
338                     " PRIORITY " + notification.client.fHow.priority +
339                     " TYPE " + STAFUtil.wrapData(type) +
340                     " MESSAGE " + STAFUtil.wrapData(message));
341 
342                 String requestNumber = fSTAFResult.result;
343 
344                 fRequestsAndNotifications.put(requestNumber, notification);
345             }
346             else
347             {
348                 fSTAFResult = eventSTAFHandle.submit2(
349                     notification.client.getMachineName(),
350                     "QUEUE", "QUEUE HANDLE " + notification.client.fWho.handle +
351                     " PRIORITY " + notification.client.fHow.priority +
352                     " TYPE " + STAFUtil.wrapData(type) +
353                     " MESSAGE " + STAFUtil.wrapData(message));
354 
355                 if (fSTAFResult.rc != fSTAFResult.Ok)
356                 {
357                     if (fSTAFResult.rc == STAFResult.HandleDoesNotExist ||
358                         fSTAFResult.rc == STAFResult.NoPathToMachine ||
359                         fSTAFResult.rc == STAFResult.CommunicationError)
360                     {
361                         // Unregister the client (which was registered by handle)
362                         // since the handle/machine is no longer available
363 
364                         String[] subTypes = new String[1];
365                         subTypes[0] = notification.eventID.subType;
366 
367                         eventRegManager.unRegisterClient(
368                             notification.client.getMachineName(),
369                             notification.client.fWho.handleName,
370                             notification.client.fWho.handle,
371                             notification.eventID.type, subTypes);
372                     }
373                 }
374             }
375         }
376         else
377         {
378             fSTAFResult = eventSTAFHandle.submit2(
379                 notification.client.getMachineName(),
380                 "QUEUE", "QUEUE NAME " +
381                 STAFUtil.wrapData(notification.client.fWho.handleName) +
382                 " PRIORITY " + notification.client.fHow.priority +
383                 " TYPE " + STAFUtil.wrapData(type) +
384                 " MESSAGE " + STAFUtil.wrapData(message));
385         }
386 
387         return fSTAFResult.rc;
388     }
389 
ackEvent(int id, String machine, String handleName, int handle)390     synchronized public int ackEvent(int id, String machine,
391                                      String handleName, int handle)
392     {
393         EventID eventID = (EventID)fEventTable.get(new Integer(id));
394 
395         if (eventID == null) return EventService.kNoSuchID;
396 
397         Vector clientList = eventID.notificationList;
398 
399         if (clientList != null)
400         {
401             for (Enumeration e = clientList.elements(); e.hasMoreElements();)
402             {
403                 Notification notification = (Notification)e.nextElement();
404 
405                 int    nHandle  = notification.client.fWho.handle;
406                 String nName    = notification.client.fWho.handleName;
407                 String nMachine = notification.client.fWho.machineName;
408 
409                 if ((((nHandle == 0) && nName.equalsIgnoreCase(handleName)) ||
410                      ((nHandle != 0) && (nHandle == handle))) &&
411                     nMachine.equalsIgnoreCase(machine))
412                 {
413                     notification.client.fHow.maxAttempts = 0;
414                     clientList.removeElement(notification);
415 
416                     if (eventID.notify && clientList.isEmpty())
417                     {
418                         // Notify the generator that everyone has acknowledged
419 
420                         Map messageMap = new HashMap();
421                         messageMap.put("eventServiceName", eventServiceName);
422                         messageMap.put("eventID", new Integer(id));
423 
424                         STAFMarshallingContext mc = new STAFMarshallingContext();
425                         mc.setRootObject(messageMap);
426 
427                         eventSTAFHandle.submit2(
428                             STAFHandle.ReqFireAndForget,
429                             eventID.generatingMachine, "QUEUE",
430                             "QUEUE HANDLE " + eventID.generatingHandle +
431                             " TYPE STAF/Service/Event/AllAcksReceived" +
432                             " MESSAGE " + STAFUtil.wrapData(mc.marshall()));
433                     }
434 
435                     return STAFResult.Ok;  //kAckPending;
436                 }
437             }
438 
439             return EventService.kNoAckPending;
440         }
441 
442         return EventService.kNoSuchID;
443     }
444 
getClientsForEvent(int id)445     synchronized public Vector getClientsForEvent(int id)
446     {
447         Vector result = null;
448         EventID eventID = (EventID)fEventTable.get(new Integer(id));
449 
450         if (eventID != null) result = eventID.notificationList;
451 
452         return (result != null ? (Vector)result.clone() : new Vector());
453     }
454 
getNotifications()455     synchronized public Vector getNotifications()
456     {
457         Vector result = new Vector();
458 
459         PriorityQueue.PriorityQueueEntry[] entries = fQueue.getQueueCopy();
460 
461         for (int i = 0; i < fQueue.count(); i++)
462              result.addElement(entries[i]);
463 
464         return result;
465     }
466 
notificationThread()467     void notificationThread()
468     {
469         while (true)
470         {
471             try
472             {
473                 if (fQueue.front() == null)
474                 {
475                     synchronized (fNotifierThread)
476                     { fNotifierThread.wait(); }
477                 }
478                 else
479                 {
480                     long waitTime = fQueue.topPriority() -
481                                     System.currentTimeMillis();
482 
483                     if (waitTime  > 0)
484                     {
485                         synchronized (fNotifierThread)
486                         { fNotifierThread.wait(waitTime); }
487                     }
488 
489                     if (System.currentTimeMillis() >= fQueue.topPriority())
490                         notify((Notification)fQueue.dequeue());
491                 }
492             }
493             catch(InterruptedException e)
494             {
495                 if (EventService.DEBUG) e.printStackTrace();
496             }
497             catch(Exception e)
498             {
499                 if (EventService.DEBUG) e.printStackTrace();
500             }
501         }
502     }
503 
notify(Notification notification)504     synchronized private int notify(Notification notification)
505     {
506         Client client = notification.client;
507         EventID eventID = notification.eventID;
508         int id = eventID.id;
509         int fReturnCode = STAFResult.Ok;
510         Map propertyMap = eventID.getPropertyMap();
511 
512         if (fEventTable.get(new Integer(id)) == null)
513             return EventService.kNoSuchID;
514 
515         if ((client.getMaxAttempts() <= 0))
516         {
517             Vector clientList =
518                    ((EventID)fEventTable.get(new Integer(id))).notificationList;
519 
520             clientList.removeElement(notification);
521 
522             if (clientList.isEmpty())
523                 fEventTable.remove(new Integer(id));
524         }
525         else
526         {
527             fReturnCode = sendMessage(propertyMap, notification);
528 
529             if ((client.fHow.priority > 0) &&
530                 (client.fHow.priority >= client.fHow.priorityDelta))
531             {
532                     --client.fHow.priority;
533             }
534             else client.fHow.priority = 0;
535 
536             --client.fHow.maxAttempts;
537             fQueue.enqueue(client.getTimeout() + System.currentTimeMillis(),
538                            notification);
539         }
540 
541         return fReturnCode;
542     }
543 
handleReset()544     public void handleReset()
545     {
546         fEventTable.clear();
547     }
548 
handleRequestCompleteMsg(STAFQueueMessage queueMessage)549     private void handleRequestCompleteMsg(STAFQueueMessage queueMessage)
550     {
551         Map messageMap = (Map)queueMessage.message;
552 
553         String reqNum = (String)messageMap.get("requestNumber");
554         String rc = (String)messageMap.get("rc");
555 
556         if (!(fRequestsAndNotifications.containsKey(reqNum)))
557         {
558             return;
559         }
560 
561         Notification notification =
562             (Notification)(fRequestsAndNotifications.get(reqNum));
563 
564         int queueRC = (new Integer(rc)).intValue();
565 
566         if (queueRC == STAFResult.HandleDoesNotExist ||
567             queueRC == STAFResult.NoPathToMachine ||
568             queueRC == STAFResult.CommunicationError)
569         {
570             // Unregister the client (which was registered by handle)
571             // since the handle/machine is no longer available
572 
573             String[] subTypes = new String[1];
574             subTypes[0] = notification.eventID.subType;
575 
576             eventRegManager.unRegisterClient(
577                 notification.client.getMachineName(),
578                 notification.client.fWho.handleName,
579                 notification.client.fWho.handle,
580                 notification.eventID.type, subTypes);
581         }
582 
583         fRequestsAndNotifications.remove(reqNum);
584     }
585 
run()586     public void run()
587     {
588         STAFResult queueGetResult;
589 
590         for (;;)
591         {
592             queueGetResult = eventSTAFHandle.submit2("local",
593                 "QUEUE", "GET WAIT");
594 
595             if (queueGetResult.rc != 0)
596             {
597                 // XXX: Do anything?
598                 continue;
599             }
600 
601             // Need a try/catch block so can catch any errors and continue
602             // processing messages on the queue
603 
604             try
605             {
606                 STAFQueueMessage queueMessage = new STAFQueueMessage(
607                     queueGetResult.result);
608 
609                 String queueType = queueMessage.type;
610 
611                 if (queueType.equalsIgnoreCase("STAF/RequestComplete"))
612                 {
613                     handleRequestCompleteMsg(queueMessage);
614                 }
615                 else if (queueType.equalsIgnoreCase("STAF/Service/Event/End"))
616                 {
617                     return;
618                 }
619             }
620             catch (Exception e)
621             {
622                 e.printStackTrace();
623             }
624         }
625     }
626 }
627