1import atexit
2import logging
3from multiprocessing.pool import ThreadPool
4from threading import RLock
5from typing import Any, Callable, List, Mapping, Optional, Tuple, Union
7from errbot import Message
8from errbot.backends.base import Identifier, Room, RoomOccupant
10log = logging.getLogger(__name__)
12Predicate = Callable[[Mapping[str, Any]], bool]
15    5  # the maximum number of simultaneous flows in automatic mode at the same time.
19class FlowNode:
20    """
21    This is a step in a Flow/conversation. It is linked to a specific botcmd and also a "predicate".
23    The predicate is a function that tells the flow executor if the flow can enter the step without the user
24    intervention (automatically). The predicates defaults to False.
26    The predicate is a function that takes one parameter, the context of the conversation.
27    """
29    def __init__(self, command: str = None, hints: bool = True):
30        """
31        Creates a FlowNone, takes the command to which the Node is linked to.
32        :param command: the command this Node is linked to. Can only be None if this Node is a Root.
33        :param hints: hints the users for the next steps in chat.
34        """
35        self.command = command
36        self.children = []  # (predicate, node)
37        self.hints = hints
39    def connect(
40        self,
41        node_or_command: Union["FlowNode", str],
42        predicate: Predicate = lambda _: False,
43        hints: bool = True,
44    ):
45        """
46        Construct the flow graph by connecting this node to another node or a command.
47        The predicate is a function that tells the flow executor if the flow can enter the step without the user
48        intervention (automatically).
49        :param node_or_command: the node or a string for a command you want to connect this Node to
50                               (this node or command will be the follow up of this one)
51        :param predicate: function with one parameter, the context, to determine of the flow executor can continue
52                           automatically this flow with no user intervention.
53        :param hints: hints the user on the next step possible.
54        :return: the newly created node if you passed a command or the node you gave it to be easily chainable.
55        """
56        node_to_connect_to = (
57            node_or_command
58            if isinstance(node_or_command, FlowNode)
59            else FlowNode(node_or_command, hints=hints)
60        )
61        self.children.append((predicate, node_to_connect_to))
62        return node_to_connect_to
64    def predicate_for_node(self, node: "FlowNode"):
65        """
66        gets the predicate function for the specified child node.
67        :param node: the child node
68        :return: the predicate that allows the automatic execution of that node.
69        """
70        for predicate, possible_node in self.children:
71            if node == possible_node:
72                return predicate
73        return None
75    def __str__(self):
76        return self.command
79class FlowRoot(FlowNode):
80    """
81    This represent the entry point of a flow description.
82    """
84    def __init__(self, name: str, description: str):
85        """
87        :param name: The name of the conversation/flow.
88        :param description:  A human description of what this flow does.
89        :param hints: Hints for the next steps when triggered.
90        """
91        super().__init__()
92        self.name = name
93        self.description = description
94        self.auto_triggers = set()
95        self.room_flow = False
97    def connect(
98        self,
99        node_or_command: Union["FlowNode", str],
100        predicate: Predicate = lambda _: False,
101        auto_trigger: bool = False,
102        room_flow: bool = False,
103    ):
104        """
105        :see: FlowNode except fot auto_trigger
106        :param predicate: :see: FlowNode
107        :param node_or_command: :see: FlowNode
108        :param auto_trigger: Flag this root as autotriggering: it will start a flow if this command is executed
109                              in the chat.
110        :param room_flow: Bind the flow to the room instead of a single person
111        """
112        resp = super().connect(node_or_command, predicate)
113        if auto_trigger:
114            self.auto_triggers.add(node_or_command)
115        self.room_flow = room_flow
116        return resp
118    def __str__(self):
119        return self.name
122class _FlowEnd(FlowNode):
123    def __str__(self):
124        return "END"
127#: Flow marker indicating that the flow ends.
128FLOW_END = _FlowEnd()
131class InvalidState(Exception):
132    """
133    Raised when the Flow Executor is asked to do something contrary to the contraints it has been given.
134    """
136    pass
139class Flow:
140    """
141    This is a live Flow. It keeps context of the conversation (requestor and context).
142    Context is just a python dictionary representing the state of the conversation.
143    """
145    def __init__(
146        self, root: FlowRoot, requestor: Identifier, initial_context: Mapping[str, Any]
147    ):
148        """
150        :param root: the root of this flow.
151        :param requestor: the user requesting this flow.
152        :param initial_context: any data we already have that could help executing this flow automatically.
153        """
154        self._root = root
155        self._current_step = self._root
156        self.ctx = dict(initial_context)
157        self.requestor = requestor
159    def next_autosteps(self) -> List[FlowNode]:
160        """
161        Get the next steps that can be automatically executed according to the set predicates.
162        """
163        return [
164            node
165            for predicate, node in self._current_step.children
166            if predicate(self.ctx)
167        ]
169    def next_steps(self) -> List[FlowNode]:
170        """
171        Get all the possible next steps after this one (predicates statisfied or not).
172        """
173        return [node for predicate, node in self._current_step.children]
175    def advance(self, next_step: FlowNode, enforce_predicate=True):
176        """
177        Move on along the flow.
178        :param next_step: Which node you want to move the flow forward to.
179        :param enforce_predicate: Do you want to check if the predicate is verified for this step or not.
180                                   Usually, if it is a manual step, the predicate is irrelevant because the user
181                                   will give the missing information as parameters to the command.
182        """
183        if enforce_predicate:
184            predicate = self._current_step.predicate_for_node(next_step)
185            if predicate is None:
186                raise ValueError(f"There is no such children: {next_step}.")
188            if not predicate(self.ctx):
189                raise InvalidState(
190                    "It is not possible to advance to this step because its predicate is false."
191                )
193        self._current_step = next_step
195    @property
196    def name(self) -> str:
197        """
198        Helper property to get the name of the flow.
199        """
200        return self._root.name
202    @property
203    def current_step(self) -> FlowNode:
204        """
205        The current step this Flow is waiting on.
206        """
207        return self._current_step
209    @property
210    def root(self) -> FlowRoot:
211        """
212        The original flowroot of this flow.
213        """
214        return self._root
216    def check_identifier(self, identifier: Identifier):
217        is_room = isinstance(self.requestor, Room)
218        is_room = is_room and isinstance(identifier, RoomOccupant)
219        is_room = is_room and self.requestor == identifier.room
220        return is_room or self.requestor == identifier
222    def __str__(self):
223        return f"{self._root} ({self.requestor}) with params {dict(self.ctx)}"
226class BotFlow:
227    """
228    Defines a Flow plugin ie. a plugin that will define new flows from its methods with the @botflow decorator.
229    """
231    def __init__(self, bot, name=None):
232        super().__init__()
233        self._bot = bot
234        self.is_activated = False
235        self._name = name
237    @property
238    def name(self) -> str:
239        """
240        Get the name of this flow as described in its .plug file.
242        :return: The flow name.
243        """
244        return self._name
246    def activate(self) -> None:
247        """
248        Override if you want to do something at initialization phase (don't forget to
249        super(Gnagna, self).activate())
250        """
251        self._bot.inject_flows_from(self)
252        self.is_activated = True
254    def deactivate(self) -> None:
255        """
256        Override if you want to do something at tear down phase (don't forget to super(Gnagna, self).deactivate())
257        """
258        self._bot.remove_flows_from(self)
259        self.is_activated = False
261    def get_command(self, command_name: str):
262        """
263        Helper to get a specific command.
264        """
265        self._bot.all_commands.get(command_name, None)
268class FlowExecutor:
269    """
270    This is a instance that can monitor and execute flow instances.
271    """
273    def __init__(self, bot):
274        self._lock = RLock()
275        self.flow_roots = {}
276        self.in_flight = []
277        self._pool = ThreadPool(EXECUTOR_THREADS)
278        atexit.register(self._pool.close)
279        self._bot = bot
281    def add_flow(self, flow: FlowRoot):
282        """
283        Register a flow with this executor.
284        """
285        with self._lock:
286            self.flow_roots[flow.name] = flow
288    def trigger(
289        self, cmd: str, requestor: Identifier, extra_context=None
290    ) -> Optional[Flow]:
291        """
292        Trigger workflows that may have command cmd as a auto_trigger or an in flight flow waiting for command.
293        This assume cmd has been correctly executed.
294        :param requestor: the identifier of the person who started this flow
295        :param cmd: the command that has just been executed.
296        :param extra_context: extra context from the current conversation
297        :returns: The flow it triggered or None if none were matching.
298        """
299        flow, next_step = self.check_inflight_flow_triggered(cmd, requestor)
300        if not flow:
301            flow, next_step = self._check_if_new_flow_is_triggered(cmd, requestor)
302        if not flow:
303            return None
305        flow.advance(next_step, enforce_predicate=False)
306        if extra_context:
307            flow.ctx = dict(extra_context)
308        self._enqueue_flow(flow)
309        return flow
311    def check_inflight_already_running(self, user: Identifier) -> bool:
312        """
313            Check if user is already running a flow.
314        :param user: the user
315        """
316        with self._lock:
317            for flow in self.in_flight:
318                if flow.requestor == user:
319                    return True
320        return False
322    def check_inflight_flow_triggered(
323        self, cmd: str, user: Identifier
324    ) -> Tuple[Optional[Flow], Optional[FlowNode]]:
325        """
326        Check if a command from a specific user was expected in one of the running flow.
327        :param cmd: the command that has just been executed.
328        :param user: the identifier of the person who started this flow
329        :returns: The name of the flow it triggered or None if none were matching."""
330        log.debug("Test if the command %s is a trigger for an inflight flow ...", cmd)
331        # TODO: What if 2 flows wait for the same command ?
332        with self._lock:
333            for flow in self.in_flight:
334                if flow.check_identifier(user):
335                    log.debug("Requestor has a flow %s in flight", flow.name)
336                    for next_step in flow.next_steps():
337                        if next_step.command == cmd:
338                            log.debug(
339                                "Requestor has a flow in flight waiting for this command !"
340                            )
341                            return flow, next_step
342        log.debug("None matched.")
343        return None, None
345    def _check_if_new_flow_is_triggered(
346        self, cmd: str, user: Identifier
347    ) -> Tuple[Optional[Flow], Optional[FlowNode]]:
348        """
349        Trigger workflows that may have command cmd as a auto_trigger..
350        This assume cmd has been correctly executed.
351        :param cmd: the command that has just been executed.
352        :param user: the identifier of the person who started this flow
353        :returns: The name of the flow it triggered or None if none were matching.
354        """
355        log.debug("Test if the command %s is an auto-trigger for any flow ...", cmd)
356        with self._lock:
357            for name, flow_root in self.flow_roots.items():
358                if (
359                    cmd in flow_root.auto_triggers
360                    and not self.check_inflight_already_running(user)
361                ):
362                    log.debug(
363                        "Flow %s has been auto-triggered by the command %s by user %s",
364                        name,
365                        cmd,
366                        user,
367                    )
368                    return self._create_new_flow(flow_root, user, cmd)
369        return None, None
371    @staticmethod
372    def _create_new_flow(
373        flow_root, requestor: Identifier, initial_command
374    ) -> Tuple[Optional[Flow], Optional[FlowNode]]:
375        """
376        Helper method to create a new FLow.
377        """
378        empty_context = {}
379        flow = Flow(flow_root, requestor, empty_context)
380        for possible_next_step in flow.next_steps():
381            if possible_next_step.command == initial_command:
382                # The predicate is good as we just executed manually the command.
383                return flow, possible_next_step
384        return None, None
386    def start_flow(
387        self, name: str, requestor: Identifier, initial_context: Mapping[str, Any]
388    ) -> Flow:
389        """
390        Starts the execution of a Flow.
391        """
392        if name not in self.flow_roots:
393            raise ValueError(f"Flow {name} doesn't exist")
394        if self.check_inflight_already_running(requestor):
395            raise ValueError(f"User {str(requestor)} is already running a flow.")
397        flow_root = self.flow_roots[name]
398        identity = requestor
399        if isinstance(requestor, RoomOccupant) and flow_root.room_flow:
400            identity = requestor.room
402        flow = Flow(self.flow_roots[name], identity, initial_context)
403        self._enqueue_flow(flow)
404        return flow
406    def stop_flow(self, name: str, requestor: Identifier) -> Optional[Flow]:
407        """
408        Stops a specific flow. It is a no op if the flow doesn't exist.
409        Returns the stopped flow if found.
410        """
411        with self._lock:
412            for flow in self.in_flight:
413                if flow.name == name and flow.check_identifier(requestor):
414                    log.debug(f"Removing flow {str(flow)}.")
415                    self.in_flight.remove(flow)
416                    return flow
417        return None
419    def _enqueue_flow(self, flow):
420        with self._lock:
421            if flow not in self.in_flight:
422                self.in_flight.append(flow)
423        self._pool.apply_async(self.execute, (flow,))
425    def execute(self, flow: Flow):
426        """
427        This is where the flow execution happens from one of the thread of the pool.
428        """
429        while True:
430            autosteps = flow.next_autosteps()
431            steps = flow.next_steps()
433            if not steps:
434                log.debug("Flow ended correctly.Nothing left to do.")
435                with self._lock:
436                    self.in_flight.remove(flow)
437                break
439            if not autosteps and flow.current_step.hints:
440                possible_next_steps = [
441                    f"You are in the flow **{flow.name}**, you can continue with:\n\n"
442                ]
443                for step in steps:
444                    cmd = step.command
445                    cmd_fnc = self._bot.all_commands[cmd]
446                    reg_cmd = cmd_fnc._err_re_command
447                    syntax_args = cmd_fnc._err_command_syntax
448                    reg_prefixed = (
449                        cmd_fnc._err_command_prefix_required if reg_cmd else True
450                    )
451                    syntax = self._bot.prefix if reg_prefixed else ""
452                    if not reg_cmd:
453                        syntax += cmd.replace("_", " ")
454                    if syntax_args:
455                        syntax += syntax_args
456                    possible_next_steps.append(f"- {syntax}")
457                self._bot.send(flow.requestor, "\n".join(possible_next_steps))
458                break
460            log.debug(
461                "Steps triggered automatically %s.",
462                ", ".join(str(node) for node in autosteps),
463            )
464            log.debug(
465                "All possible next steps: %s.", ", ".join(str(node) for node in steps)
466            )
468            for autostep in autosteps:
469                log.debug("Proceeding automatically with step %s", autostep)
470                if autostep == FLOW_END:
471                    log.debug("This flow ENDED.")
472                    with self._lock:
473                        self.in_flight.remove(flow)
474                    return
475                try:
476                    msg = Message(frm=flow.requestor, flow=flow)
477                    result = self._bot.commands[autostep.command](msg, None)
478                    log.debug("Step result %s: %s", flow.requestor, result)
480                except Exception as e:
481                    log.exception("%s errored at %s", flow, autostep)
482                    self._bot.send(
483                        flow.requestor, f'{flow} errored at {autostep} with "{e}"'
484                    )
485                flow.advance(
486                    autostep
487                )  # TODO: this is only true for a single step, make it forkable.
488        log.debug("Flow execution suspended/ended normally.")