1 /*****************************************************************************/ 2 /* Software Testing Automation Framework (STAF) */ 3 /* (C) Copyright IBM Corp. 2001, 2004 */ 4 /* */ 5 /* This software is licensed under the Eclipse Public License (EPL) V1.0. */ 6 /*****************************************************************************/ 7 8 package com.ibm.staf.service.event; 9 10 import com.ibm.staf.*; 11 import com.ibm.staf.service.*; 12 import java.util.*; 13 import java.io.*; 14 15 public class GenerationManager 16 implements Serializable, Runnable 17 { 18 static final long serialVersionUID = 1; 19 transient double version = 0.9; 20 transient double fileVersion; 21 transient static int fEventID = 0; 22 transient Hashtable fEventTable = new Hashtable(); 23 transient PriorityQueue fQueue = new PriorityQueue(); 24 transient Thread fNotifierThread; 25 transient Thread fQueueThread; 26 transient STAFHandle eventSTAFHandle = null; 27 transient String eventServiceName; 28 transient RegistrationManager eventRegManager = null; 29 transient String eventGenManagerFileName = null; 30 transient HashMap fRequestsAndNotifications = new HashMap(); 31 32 public class EventID 33 { 34 transient int id; 35 transient int generatingHandle; 36 transient String generatingMachine; 37 transient String generatingProcess; 38 transient String type; 39 transient String subType; 40 transient String[] properties; 41 transient boolean notify; // Flag indicating if the event generator 42 // requested to be notified when all 43 // registered processes have acknowledged 44 // receiving the event. 45 transient Vector notificationList; 46 EventID(int theID, int theGenHandle, String theGenMachine, String theGenProcess, String theType, String theSubType, String[] theProperties, boolean theNotify, Vector theNotificationList)47 EventID(int theID, int theGenHandle, String theGenMachine, 48 String theGenProcess, String theType, String theSubType, 49 String[] theProperties, boolean theNotify, 50 Vector theNotificationList) 51 { 52 id = theID; 53 generatingHandle = theGenHandle; 54 generatingMachine = theGenMachine; 55 generatingProcess = theGenProcess; 56 type = theType; 57 subType = theSubType; 58 properties = theProperties; 59 notify = theNotify; 60 notificationList = theNotificationList; 61 } 62 getEventId()63 public int getEventId() 64 { 65 return id; 66 } 67 getType()68 public String getType() 69 { 70 return type; 71 } 72 getSubtype()73 public String getSubtype() 74 { 75 return subType; 76 } 77 getPropertyMap()78 public Map getPropertyMap() 79 { 80 return getPropertyMap(false); 81 } 82 getPropertyMap(boolean maskPrivateData)83 public Map getPropertyMap(boolean maskPrivateData) 84 { 85 Map propertyMap = new HashMap(); 86 87 for (int i = 0; i < properties.length; i++) 88 { 89 int index = properties[i].indexOf('='); 90 91 if (index >= 0) 92 { 93 // Contains a "=" 94 String name = properties[i].substring(0, index); 95 String value = properties[i].substring(index + 1); 96 97 if (!maskPrivateData) 98 propertyMap.put(name, value); 99 else 100 propertyMap.put(name, STAFUtil.maskPrivateData(value)); 101 } 102 else 103 { 104 // Does not contain an "=", so assign value = null 105 String name = properties[i]; 106 String value = null; 107 propertyMap.put(name, value); 108 } 109 } 110 111 return propertyMap; 112 } 113 getNotificationList()114 public List getNotificationList() 115 { 116 List theNotificationList = new ArrayList(); 117 118 for (int i = 0; i < notificationList.size(); i++) 119 { 120 Notification notification = 121 (Notification)notificationList.get(i); 122 theNotificationList.add(notification.getClientMap()); 123 } 124 125 return theNotificationList; 126 } 127 getEventGeneratorMap()128 public Map getEventGeneratorMap() 129 { 130 Map eventGeneratorMap = EventService.fEventGeneratorMapClass. 131 createInstance(); 132 eventGeneratorMap.put("machine", generatingMachine); 133 eventGeneratorMap.put("handleName", generatingProcess); 134 eventGeneratorMap.put("handle", "" + generatingHandle); 135 136 return eventGeneratorMap; 137 } 138 } 139 140 public class Notification 141 { 142 Client client; 143 EventID eventID; 144 boolean async; 145 Notification(Client client, EventID eventID, boolean async)146 public Notification(Client client, EventID eventID, boolean async) 147 { 148 this.client = client; 149 this.eventID = eventID; 150 this.async = async; 151 } 152 getNotificationMap()153 synchronized public Map getNotificationMap() 154 { 155 Map notificationMap = 156 EventService.fEventIDLongMapClass.createInstance(); 157 158 notificationMap.put("eventID", "" + eventID.getEventId()); 159 notificationMap.put("type", eventID.getType()); 160 notificationMap.put("subtype", eventID.getSubtype()); 161 162 // Currently, getNotificationMap() is only used when listing or 163 // querying Event IDs, so always get the properties with any 164 // private data masked, as indicated by passing true for the 165 // maskPrivateData argument. 166 notificationMap.put("propertyMap", eventID.getPropertyMap(true)); 167 168 notificationMap.put("generatedBy", 169 eventID.getEventGeneratorMap()); 170 notificationMap.put("notificationList", 171 eventID.getNotificationList()); 172 173 return notificationMap; 174 } 175 getClientMap()176 synchronized public Map getClientMap() 177 { 178 return client.getClientMap(EventService.fNotifieeMapClass); 179 } 180 } 181 GenerationManager(STAFHandle theSTAFHandle, String serviceName, RegistrationManager regManager, String genManagerFileName)182 public GenerationManager(STAFHandle theSTAFHandle, String serviceName, 183 RegistrationManager regManager, 184 String genManagerFileName) 185 { 186 eventSTAFHandle = theSTAFHandle; 187 eventServiceName = serviceName; 188 eventRegManager = regManager; 189 eventGenManagerFileName = genManagerFileName; 190 191 fNotifierThread = new Thread() 192 { 193 public void run() 194 { 195 notificationThread(); 196 } 197 }; 198 199 fNotifierThread.start(); 200 201 fQueueThread = new Thread(this); 202 fQueueThread.start(); // this calls the run() method 203 } 204 serializeEventID(ObjectOutputStream os)205 synchronized public static void serializeEventID(ObjectOutputStream os) 206 throws IOException 207 { 208 os.writeInt(fEventID); 209 } 210 deSerializeEventID(ObjectInputStream os)211 synchronized public static void deSerializeEventID(ObjectInputStream os) 212 throws IOException 213 { 214 fEventID = os.readInt(); 215 } 216 writeObject(ObjectOutputStream stream)217 private void writeObject(ObjectOutputStream stream) 218 throws IOException 219 { 220 stream.writeDouble(version); 221 serializeEventID(stream); 222 } 223 readObject(ObjectInputStream stream)224 private void readObject(ObjectInputStream stream) 225 throws IOException, ClassNotFoundException 226 { 227 fileVersion = stream.readDouble(); 228 deSerializeEventID(stream); 229 } 230 serialize()231 synchronized void serialize() 232 { 233 try 234 { 235 ObjectOutputStream out = new ObjectOutputStream( 236 new FileOutputStream(eventGenManagerFileName)); 237 238 out.writeObject(this); 239 out.close(); 240 } 241 catch(Exception e) 242 { 243 if (EventService.DEBUG) e.printStackTrace(); 244 } 245 } 246 deSerialize()247 synchronized void deSerialize() 248 { 249 try 250 { 251 ObjectInputStream in = 252 new ObjectInputStream( 253 new FileInputStream(eventGenManagerFileName)); 254 255 in.readObject(); 256 } 257 catch(Exception e) 258 { 259 if (EventService.DEBUG) e.printStackTrace(); 260 } 261 } 262 generateEvent(String theGenMachine, String theGenProcess, int theGenHandle, String theType, String theSubtype, String[] theProperties, boolean notify, boolean async, Vector mergedClients)263 synchronized public int generateEvent(String theGenMachine, 264 String theGenProcess, int theGenHandle, 265 String theType, String theSubtype, 266 String[] theProperties, 267 boolean notify, 268 boolean async, 269 Vector mergedClients) 270 { 271 if ((mergedClients == null) || (mergedClients.size() == 0)) 272 return EventService.kNoClientsForEvent; 273 274 Integer nextID = new Integer(++fEventID); 275 Vector notificationList = new Vector(); 276 EventID eventID = new EventID(nextID.intValue(), theGenHandle, 277 theGenMachine, theGenProcess, theType, 278 theSubtype, theProperties, notify, 279 notificationList); 280 281 fEventTable.put(nextID, eventID); 282 283 if ((mergedClients != null) && (mergedClients.size() != 0)) 284 { 285 for (Enumeration e = mergedClients.elements(); e.hasMoreElements();) 286 { 287 Client client = new Client((Client)e.nextElement()); 288 Notification notification = new Notification(client, 289 eventID, 290 async); 291 292 notificationList.addElement(notification); 293 fQueue.enqueue(System.currentTimeMillis(), notification); 294 } 295 } 296 else return EventService.kNoClientsForEvent; 297 298 synchronized(fNotifierThread) 299 { 300 fNotifierThread.notify(); 301 } 302 303 return nextID.intValue(); 304 } 305 sendMessage(Map propertyMap, Notification notification)306 synchronized private int sendMessage(Map propertyMap, 307 Notification notification) 308 { 309 String type = "STAF/Service/Event"; 310 STAFResult fSTAFResult = null; 311 TimeStamp now = new TimeStamp(); 312 313 Map messageMap = new HashMap(); 314 messageMap.put("eventServiceName", eventServiceName); 315 messageMap.put("eventID", "" + notification.eventID.id); 316 messageMap.put("machine", notification.eventID.generatingMachine); 317 messageMap.put("handleName", notification.eventID.generatingProcess); 318 messageMap.put("handle", "" + notification.eventID.generatingHandle); 319 messageMap.put("timestamp", now.currentDate() + "-" + 320 now.currentTime()); 321 messageMap.put("type", notification.eventID.type); 322 messageMap.put("subtype", notification.eventID.subType); 323 messageMap.put("propertyMap", propertyMap); 324 325 STAFMarshallingContext mc = new STAFMarshallingContext(); 326 mc.setRootObject(messageMap); 327 String message = mc.marshall(); 328 329 if (notification.client.fWho.handle != 0) 330 { 331 if (notification.async) 332 { 333 // ASYNC is an undocumented option, 334 // only used by the STAX service 335 fSTAFResult = eventSTAFHandle.submit2(STAFHandle.ReqQueue, 336 notification.client.getMachineName(), 337 "QUEUE", "QUEUE HANDLE " + notification.client.fWho.handle + 338 " PRIORITY " + notification.client.fHow.priority + 339 " TYPE " + STAFUtil.wrapData(type) + 340 " MESSAGE " + STAFUtil.wrapData(message)); 341 342 String requestNumber = fSTAFResult.result; 343 344 fRequestsAndNotifications.put(requestNumber, notification); 345 } 346 else 347 { 348 fSTAFResult = eventSTAFHandle.submit2( 349 notification.client.getMachineName(), 350 "QUEUE", "QUEUE HANDLE " + notification.client.fWho.handle + 351 " PRIORITY " + notification.client.fHow.priority + 352 " TYPE " + STAFUtil.wrapData(type) + 353 " MESSAGE " + STAFUtil.wrapData(message)); 354 355 if (fSTAFResult.rc != fSTAFResult.Ok) 356 { 357 if (fSTAFResult.rc == STAFResult.HandleDoesNotExist || 358 fSTAFResult.rc == STAFResult.NoPathToMachine || 359 fSTAFResult.rc == STAFResult.CommunicationError) 360 { 361 // Unregister the client (which was registered by handle) 362 // since the handle/machine is no longer available 363 364 String[] subTypes = new String[1]; 365 subTypes[0] = notification.eventID.subType; 366 367 eventRegManager.unRegisterClient( 368 notification.client.getMachineName(), 369 notification.client.fWho.handleName, 370 notification.client.fWho.handle, 371 notification.eventID.type, subTypes); 372 } 373 } 374 } 375 } 376 else 377 { 378 fSTAFResult = eventSTAFHandle.submit2( 379 notification.client.getMachineName(), 380 "QUEUE", "QUEUE NAME " + 381 STAFUtil.wrapData(notification.client.fWho.handleName) + 382 " PRIORITY " + notification.client.fHow.priority + 383 " TYPE " + STAFUtil.wrapData(type) + 384 " MESSAGE " + STAFUtil.wrapData(message)); 385 } 386 387 return fSTAFResult.rc; 388 } 389 ackEvent(int id, String machine, String handleName, int handle)390 synchronized public int ackEvent(int id, String machine, 391 String handleName, int handle) 392 { 393 EventID eventID = (EventID)fEventTable.get(new Integer(id)); 394 395 if (eventID == null) return EventService.kNoSuchID; 396 397 Vector clientList = eventID.notificationList; 398 399 if (clientList != null) 400 { 401 for (Enumeration e = clientList.elements(); e.hasMoreElements();) 402 { 403 Notification notification = (Notification)e.nextElement(); 404 405 int nHandle = notification.client.fWho.handle; 406 String nName = notification.client.fWho.handleName; 407 String nMachine = notification.client.fWho.machineName; 408 409 if ((((nHandle == 0) && nName.equalsIgnoreCase(handleName)) || 410 ((nHandle != 0) && (nHandle == handle))) && 411 nMachine.equalsIgnoreCase(machine)) 412 { 413 notification.client.fHow.maxAttempts = 0; 414 clientList.removeElement(notification); 415 416 if (eventID.notify && clientList.isEmpty()) 417 { 418 // Notify the generator that everyone has acknowledged 419 420 Map messageMap = new HashMap(); 421 messageMap.put("eventServiceName", eventServiceName); 422 messageMap.put("eventID", new Integer(id)); 423 424 STAFMarshallingContext mc = new STAFMarshallingContext(); 425 mc.setRootObject(messageMap); 426 427 eventSTAFHandle.submit2( 428 STAFHandle.ReqFireAndForget, 429 eventID.generatingMachine, "QUEUE", 430 "QUEUE HANDLE " + eventID.generatingHandle + 431 " TYPE STAF/Service/Event/AllAcksReceived" + 432 " MESSAGE " + STAFUtil.wrapData(mc.marshall())); 433 } 434 435 return STAFResult.Ok; //kAckPending; 436 } 437 } 438 439 return EventService.kNoAckPending; 440 } 441 442 return EventService.kNoSuchID; 443 } 444 getClientsForEvent(int id)445 synchronized public Vector getClientsForEvent(int id) 446 { 447 Vector result = null; 448 EventID eventID = (EventID)fEventTable.get(new Integer(id)); 449 450 if (eventID != null) result = eventID.notificationList; 451 452 return (result != null ? (Vector)result.clone() : new Vector()); 453 } 454 getNotifications()455 synchronized public Vector getNotifications() 456 { 457 Vector result = new Vector(); 458 459 PriorityQueue.PriorityQueueEntry[] entries = fQueue.getQueueCopy(); 460 461 for (int i = 0; i < fQueue.count(); i++) 462 result.addElement(entries[i]); 463 464 return result; 465 } 466 notificationThread()467 void notificationThread() 468 { 469 while (true) 470 { 471 try 472 { 473 if (fQueue.front() == null) 474 { 475 synchronized (fNotifierThread) 476 { fNotifierThread.wait(); } 477 } 478 else 479 { 480 long waitTime = fQueue.topPriority() - 481 System.currentTimeMillis(); 482 483 if (waitTime > 0) 484 { 485 synchronized (fNotifierThread) 486 { fNotifierThread.wait(waitTime); } 487 } 488 489 if (System.currentTimeMillis() >= fQueue.topPriority()) 490 notify((Notification)fQueue.dequeue()); 491 } 492 } 493 catch(InterruptedException e) 494 { 495 if (EventService.DEBUG) e.printStackTrace(); 496 } 497 catch(Exception e) 498 { 499 if (EventService.DEBUG) e.printStackTrace(); 500 } 501 } 502 } 503 notify(Notification notification)504 synchronized private int notify(Notification notification) 505 { 506 Client client = notification.client; 507 EventID eventID = notification.eventID; 508 int id = eventID.id; 509 int fReturnCode = STAFResult.Ok; 510 Map propertyMap = eventID.getPropertyMap(); 511 512 if (fEventTable.get(new Integer(id)) == null) 513 return EventService.kNoSuchID; 514 515 if ((client.getMaxAttempts() <= 0)) 516 { 517 Vector clientList = 518 ((EventID)fEventTable.get(new Integer(id))).notificationList; 519 520 clientList.removeElement(notification); 521 522 if (clientList.isEmpty()) 523 fEventTable.remove(new Integer(id)); 524 } 525 else 526 { 527 fReturnCode = sendMessage(propertyMap, notification); 528 529 if ((client.fHow.priority > 0) && 530 (client.fHow.priority >= client.fHow.priorityDelta)) 531 { 532 --client.fHow.priority; 533 } 534 else client.fHow.priority = 0; 535 536 --client.fHow.maxAttempts; 537 fQueue.enqueue(client.getTimeout() + System.currentTimeMillis(), 538 notification); 539 } 540 541 return fReturnCode; 542 } 543 handleReset()544 public void handleReset() 545 { 546 fEventTable.clear(); 547 } 548 handleRequestCompleteMsg(STAFQueueMessage queueMessage)549 private void handleRequestCompleteMsg(STAFQueueMessage queueMessage) 550 { 551 Map messageMap = (Map)queueMessage.message; 552 553 String reqNum = (String)messageMap.get("requestNumber"); 554 String rc = (String)messageMap.get("rc"); 555 556 if (!(fRequestsAndNotifications.containsKey(reqNum))) 557 { 558 return; 559 } 560 561 Notification notification = 562 (Notification)(fRequestsAndNotifications.get(reqNum)); 563 564 int queueRC = (new Integer(rc)).intValue(); 565 566 if (queueRC == STAFResult.HandleDoesNotExist || 567 queueRC == STAFResult.NoPathToMachine || 568 queueRC == STAFResult.CommunicationError) 569 { 570 // Unregister the client (which was registered by handle) 571 // since the handle/machine is no longer available 572 573 String[] subTypes = new String[1]; 574 subTypes[0] = notification.eventID.subType; 575 576 eventRegManager.unRegisterClient( 577 notification.client.getMachineName(), 578 notification.client.fWho.handleName, 579 notification.client.fWho.handle, 580 notification.eventID.type, subTypes); 581 } 582 583 fRequestsAndNotifications.remove(reqNum); 584 } 585 run()586 public void run() 587 { 588 STAFResult queueGetResult; 589 590 for (;;) 591 { 592 queueGetResult = eventSTAFHandle.submit2("local", 593 "QUEUE", "GET WAIT"); 594 595 if (queueGetResult.rc != 0) 596 { 597 // XXX: Do anything? 598 continue; 599 } 600 601 // Need a try/catch block so can catch any errors and continue 602 // processing messages on the queue 603 604 try 605 { 606 STAFQueueMessage queueMessage = new STAFQueueMessage( 607 queueGetResult.result); 608 609 String queueType = queueMessage.type; 610 611 if (queueType.equalsIgnoreCase("STAF/RequestComplete")) 612 { 613 handleRequestCompleteMsg(queueMessage); 614 } 615 else if (queueType.equalsIgnoreCase("STAF/Service/Event/End")) 616 { 617 return; 618 } 619 } 620 catch (Exception e) 621 { 622 e.printStackTrace(); 623 } 624 } 625 } 626 } 627