1import atexit 2import logging 3from multiprocessing.pool import ThreadPool 4from threading import RLock 5from typing import Any, Callable, List, Mapping, Optional, Tuple, Union 6 7from errbot import Message 8from errbot.backends.base import Identifier, Room, RoomOccupant 9 10log = logging.getLogger(__name__) 11 12Predicate = Callable[[Mapping[str, Any]], bool] 13 14EXECUTOR_THREADS = ( 15 5 # the maximum number of simultaneous flows in automatic mode at the same time. 16) 17 18 19class FlowNode: 20 """ 21 This is a step in a Flow/conversation. It is linked to a specific botcmd and also a "predicate". 22 23 The predicate is a function that tells the flow executor if the flow can enter the step without the user 24 intervention (automatically). The predicates defaults to False. 25 26 The predicate is a function that takes one parameter, the context of the conversation. 27 """ 28 29 def __init__(self, command: str = None, hints: bool = True): 30 """ 31 Creates a FlowNone, takes the command to which the Node is linked to. 32 :param command: the command this Node is linked to. Can only be None if this Node is a Root. 33 :param hints: hints the users for the next steps in chat. 34 """ 35 self.command = command 36 self.children = [] # (predicate, node) 37 self.hints = hints 38 39 def connect( 40 self, 41 node_or_command: Union["FlowNode", str], 42 predicate: Predicate = lambda _: False, 43 hints: bool = True, 44 ): 45 """ 46 Construct the flow graph by connecting this node to another node or a command. 47 The predicate is a function that tells the flow executor if the flow can enter the step without the user 48 intervention (automatically). 49 :param node_or_command: the node or a string for a command you want to connect this Node to 50 (this node or command will be the follow up of this one) 51 :param predicate: function with one parameter, the context, to determine of the flow executor can continue 52 automatically this flow with no user intervention. 53 :param hints: hints the user on the next step possible. 54 :return: the newly created node if you passed a command or the node you gave it to be easily chainable. 55 """ 56 node_to_connect_to = ( 57 node_or_command 58 if isinstance(node_or_command, FlowNode) 59 else FlowNode(node_or_command, hints=hints) 60 ) 61 self.children.append((predicate, node_to_connect_to)) 62 return node_to_connect_to 63 64 def predicate_for_node(self, node: "FlowNode"): 65 """ 66 gets the predicate function for the specified child node. 67 :param node: the child node 68 :return: the predicate that allows the automatic execution of that node. 69 """ 70 for predicate, possible_node in self.children: 71 if node == possible_node: 72 return predicate 73 return None 74 75 def __str__(self): 76 return self.command 77 78 79class FlowRoot(FlowNode): 80 """ 81 This represent the entry point of a flow description. 82 """ 83 84 def __init__(self, name: str, description: str): 85 """ 86 87 :param name: The name of the conversation/flow. 88 :param description: A human description of what this flow does. 89 :param hints: Hints for the next steps when triggered. 90 """ 91 super().__init__() 92 self.name = name 93 self.description = description 94 self.auto_triggers = set() 95 self.room_flow = False 96 97 def connect( 98 self, 99 node_or_command: Union["FlowNode", str], 100 predicate: Predicate = lambda _: False, 101 auto_trigger: bool = False, 102 room_flow: bool = False, 103 ): 104 """ 105 :see: FlowNode except fot auto_trigger 106 :param predicate: :see: FlowNode 107 :param node_or_command: :see: FlowNode 108 :param auto_trigger: Flag this root as autotriggering: it will start a flow if this command is executed 109 in the chat. 110 :param room_flow: Bind the flow to the room instead of a single person 111 """ 112 resp = super().connect(node_or_command, predicate) 113 if auto_trigger: 114 self.auto_triggers.add(node_or_command) 115 self.room_flow = room_flow 116 return resp 117 118 def __str__(self): 119 return self.name 120 121 122class _FlowEnd(FlowNode): 123 def __str__(self): 124 return "END" 125 126 127#: Flow marker indicating that the flow ends. 128FLOW_END = _FlowEnd() 129 130 131class InvalidState(Exception): 132 """ 133 Raised when the Flow Executor is asked to do something contrary to the contraints it has been given. 134 """ 135 136 pass 137 138 139class Flow: 140 """ 141 This is a live Flow. It keeps context of the conversation (requestor and context). 142 Context is just a python dictionary representing the state of the conversation. 143 """ 144 145 def __init__( 146 self, root: FlowRoot, requestor: Identifier, initial_context: Mapping[str, Any] 147 ): 148 """ 149 150 :param root: the root of this flow. 151 :param requestor: the user requesting this flow. 152 :param initial_context: any data we already have that could help executing this flow automatically. 153 """ 154 self._root = root 155 self._current_step = self._root 156 self.ctx = dict(initial_context) 157 self.requestor = requestor 158 159 def next_autosteps(self) -> List[FlowNode]: 160 """ 161 Get the next steps that can be automatically executed according to the set predicates. 162 """ 163 return [ 164 node 165 for predicate, node in self._current_step.children 166 if predicate(self.ctx) 167 ] 168 169 def next_steps(self) -> List[FlowNode]: 170 """ 171 Get all the possible next steps after this one (predicates statisfied or not). 172 """ 173 return [node for predicate, node in self._current_step.children] 174 175 def advance(self, next_step: FlowNode, enforce_predicate=True): 176 """ 177 Move on along the flow. 178 :param next_step: Which node you want to move the flow forward to. 179 :param enforce_predicate: Do you want to check if the predicate is verified for this step or not. 180 Usually, if it is a manual step, the predicate is irrelevant because the user 181 will give the missing information as parameters to the command. 182 """ 183 if enforce_predicate: 184 predicate = self._current_step.predicate_for_node(next_step) 185 if predicate is None: 186 raise ValueError(f"There is no such children: {next_step}.") 187 188 if not predicate(self.ctx): 189 raise InvalidState( 190 "It is not possible to advance to this step because its predicate is false." 191 ) 192 193 self._current_step = next_step 194 195 @property 196 def name(self) -> str: 197 """ 198 Helper property to get the name of the flow. 199 """ 200 return self._root.name 201 202 @property 203 def current_step(self) -> FlowNode: 204 """ 205 The current step this Flow is waiting on. 206 """ 207 return self._current_step 208 209 @property 210 def root(self) -> FlowRoot: 211 """ 212 The original flowroot of this flow. 213 """ 214 return self._root 215 216 def check_identifier(self, identifier: Identifier): 217 is_room = isinstance(self.requestor, Room) 218 is_room = is_room and isinstance(identifier, RoomOccupant) 219 is_room = is_room and self.requestor == identifier.room 220 return is_room or self.requestor == identifier 221 222 def __str__(self): 223 return f"{self._root} ({self.requestor}) with params {dict(self.ctx)}" 224 225 226class BotFlow: 227 """ 228 Defines a Flow plugin ie. a plugin that will define new flows from its methods with the @botflow decorator. 229 """ 230 231 def __init__(self, bot, name=None): 232 super().__init__() 233 self._bot = bot 234 self.is_activated = False 235 self._name = name 236 237 @property 238 def name(self) -> str: 239 """ 240 Get the name of this flow as described in its .plug file. 241 242 :return: The flow name. 243 """ 244 return self._name 245 246 def activate(self) -> None: 247 """ 248 Override if you want to do something at initialization phase (don't forget to 249 super(Gnagna, self).activate()) 250 """ 251 self._bot.inject_flows_from(self) 252 self.is_activated = True 253 254 def deactivate(self) -> None: 255 """ 256 Override if you want to do something at tear down phase (don't forget to super(Gnagna, self).deactivate()) 257 """ 258 self._bot.remove_flows_from(self) 259 self.is_activated = False 260 261 def get_command(self, command_name: str): 262 """ 263 Helper to get a specific command. 264 """ 265 self._bot.all_commands.get(command_name, None) 266 267 268class FlowExecutor: 269 """ 270 This is a instance that can monitor and execute flow instances. 271 """ 272 273 def __init__(self, bot): 274 self._lock = RLock() 275 self.flow_roots = {} 276 self.in_flight = [] 277 self._pool = ThreadPool(EXECUTOR_THREADS) 278 atexit.register(self._pool.close) 279 self._bot = bot 280 281 def add_flow(self, flow: FlowRoot): 282 """ 283 Register a flow with this executor. 284 """ 285 with self._lock: 286 self.flow_roots[flow.name] = flow 287 288 def trigger( 289 self, cmd: str, requestor: Identifier, extra_context=None 290 ) -> Optional[Flow]: 291 """ 292 Trigger workflows that may have command cmd as a auto_trigger or an in flight flow waiting for command. 293 This assume cmd has been correctly executed. 294 :param requestor: the identifier of the person who started this flow 295 :param cmd: the command that has just been executed. 296 :param extra_context: extra context from the current conversation 297 :returns: The flow it triggered or None if none were matching. 298 """ 299 flow, next_step = self.check_inflight_flow_triggered(cmd, requestor) 300 if not flow: 301 flow, next_step = self._check_if_new_flow_is_triggered(cmd, requestor) 302 if not flow: 303 return None 304 305 flow.advance(next_step, enforce_predicate=False) 306 if extra_context: 307 flow.ctx = dict(extra_context) 308 self._enqueue_flow(flow) 309 return flow 310 311 def check_inflight_already_running(self, user: Identifier) -> bool: 312 """ 313 Check if user is already running a flow. 314 :param user: the user 315 """ 316 with self._lock: 317 for flow in self.in_flight: 318 if flow.requestor == user: 319 return True 320 return False 321 322 def check_inflight_flow_triggered( 323 self, cmd: str, user: Identifier 324 ) -> Tuple[Optional[Flow], Optional[FlowNode]]: 325 """ 326 Check if a command from a specific user was expected in one of the running flow. 327 :param cmd: the command that has just been executed. 328 :param user: the identifier of the person who started this flow 329 :returns: The name of the flow it triggered or None if none were matching.""" 330 log.debug("Test if the command %s is a trigger for an inflight flow ...", cmd) 331 # TODO: What if 2 flows wait for the same command ? 332 with self._lock: 333 for flow in self.in_flight: 334 if flow.check_identifier(user): 335 log.debug("Requestor has a flow %s in flight", flow.name) 336 for next_step in flow.next_steps(): 337 if next_step.command == cmd: 338 log.debug( 339 "Requestor has a flow in flight waiting for this command !" 340 ) 341 return flow, next_step 342 log.debug("None matched.") 343 return None, None 344 345 def _check_if_new_flow_is_triggered( 346 self, cmd: str, user: Identifier 347 ) -> Tuple[Optional[Flow], Optional[FlowNode]]: 348 """ 349 Trigger workflows that may have command cmd as a auto_trigger.. 350 This assume cmd has been correctly executed. 351 :param cmd: the command that has just been executed. 352 :param user: the identifier of the person who started this flow 353 :returns: The name of the flow it triggered or None if none were matching. 354 """ 355 log.debug("Test if the command %s is an auto-trigger for any flow ...", cmd) 356 with self._lock: 357 for name, flow_root in self.flow_roots.items(): 358 if ( 359 cmd in flow_root.auto_triggers 360 and not self.check_inflight_already_running(user) 361 ): 362 log.debug( 363 "Flow %s has been auto-triggered by the command %s by user %s", 364 name, 365 cmd, 366 user, 367 ) 368 return self._create_new_flow(flow_root, user, cmd) 369 return None, None 370 371 @staticmethod 372 def _create_new_flow( 373 flow_root, requestor: Identifier, initial_command 374 ) -> Tuple[Optional[Flow], Optional[FlowNode]]: 375 """ 376 Helper method to create a new FLow. 377 """ 378 empty_context = {} 379 flow = Flow(flow_root, requestor, empty_context) 380 for possible_next_step in flow.next_steps(): 381 if possible_next_step.command == initial_command: 382 # The predicate is good as we just executed manually the command. 383 return flow, possible_next_step 384 return None, None 385 386 def start_flow( 387 self, name: str, requestor: Identifier, initial_context: Mapping[str, Any] 388 ) -> Flow: 389 """ 390 Starts the execution of a Flow. 391 """ 392 if name not in self.flow_roots: 393 raise ValueError(f"Flow {name} doesn't exist") 394 if self.check_inflight_already_running(requestor): 395 raise ValueError(f"User {str(requestor)} is already running a flow.") 396 397 flow_root = self.flow_roots[name] 398 identity = requestor 399 if isinstance(requestor, RoomOccupant) and flow_root.room_flow: 400 identity = requestor.room 401 402 flow = Flow(self.flow_roots[name], identity, initial_context) 403 self._enqueue_flow(flow) 404 return flow 405 406 def stop_flow(self, name: str, requestor: Identifier) -> Optional[Flow]: 407 """ 408 Stops a specific flow. It is a no op if the flow doesn't exist. 409 Returns the stopped flow if found. 410 """ 411 with self._lock: 412 for flow in self.in_flight: 413 if flow.name == name and flow.check_identifier(requestor): 414 log.debug(f"Removing flow {str(flow)}.") 415 self.in_flight.remove(flow) 416 return flow 417 return None 418 419 def _enqueue_flow(self, flow): 420 with self._lock: 421 if flow not in self.in_flight: 422 self.in_flight.append(flow) 423 self._pool.apply_async(self.execute, (flow,)) 424 425 def execute(self, flow: Flow): 426 """ 427 This is where the flow execution happens from one of the thread of the pool. 428 """ 429 while True: 430 autosteps = flow.next_autosteps() 431 steps = flow.next_steps() 432 433 if not steps: 434 log.debug("Flow ended correctly.Nothing left to do.") 435 with self._lock: 436 self.in_flight.remove(flow) 437 break 438 439 if not autosteps and flow.current_step.hints: 440 possible_next_steps = [ 441 f"You are in the flow **{flow.name}**, you can continue with:\n\n" 442 ] 443 for step in steps: 444 cmd = step.command 445 cmd_fnc = self._bot.all_commands[cmd] 446 reg_cmd = cmd_fnc._err_re_command 447 syntax_args = cmd_fnc._err_command_syntax 448 reg_prefixed = ( 449 cmd_fnc._err_command_prefix_required if reg_cmd else True 450 ) 451 syntax = self._bot.prefix if reg_prefixed else "" 452 if not reg_cmd: 453 syntax += cmd.replace("_", " ") 454 if syntax_args: 455 syntax += syntax_args 456 possible_next_steps.append(f"- {syntax}") 457 self._bot.send(flow.requestor, "\n".join(possible_next_steps)) 458 break 459 460 log.debug( 461 "Steps triggered automatically %s.", 462 ", ".join(str(node) for node in autosteps), 463 ) 464 log.debug( 465 "All possible next steps: %s.", ", ".join(str(node) for node in steps) 466 ) 467 468 for autostep in autosteps: 469 log.debug("Proceeding automatically with step %s", autostep) 470 if autostep == FLOW_END: 471 log.debug("This flow ENDED.") 472 with self._lock: 473 self.in_flight.remove(flow) 474 return 475 try: 476 msg = Message(frm=flow.requestor, flow=flow) 477 result = self._bot.commands[autostep.command](msg, None) 478 log.debug("Step result %s: %s", flow.requestor, result) 479 480 except Exception as e: 481 log.exception("%s errored at %s", flow, autostep) 482 self._bot.send( 483 flow.requestor, f'{flow} errored at {autostep} with "{e}"' 484 ) 485 flow.advance( 486 autostep 487 ) # TODO: this is only true for a single step, make it forkable. 488 log.debug("Flow execution suspended/ended normally.") 489