1import atexit
2import logging
3from multiprocessing.pool import ThreadPool
4from threading import RLock
5from typing import Any, Callable, List, Mapping, Optional, Tuple, Union
6
7from errbot import Message
8from errbot.backends.base import Identifier, Room, RoomOccupant
9
10log = logging.getLogger(__name__)
11
12Predicate = Callable[[Mapping[str, Any]], bool]
13
14EXECUTOR_THREADS = (
15    5  # the maximum number of simultaneous flows in automatic mode at the same time.
16)
17
18
19class FlowNode:
20    """
21    This is a step in a Flow/conversation. It is linked to a specific botcmd and also a "predicate".
22
23    The predicate is a function that tells the flow executor if the flow can enter the step without the user
24    intervention (automatically). The predicates defaults to False.
25
26    The predicate is a function that takes one parameter, the context of the conversation.
27    """
28
29    def __init__(self, command: str = None, hints: bool = True):
30        """
31        Creates a FlowNone, takes the command to which the Node is linked to.
32        :param command: the command this Node is linked to. Can only be None if this Node is a Root.
33        :param hints: hints the users for the next steps in chat.
34        """
35        self.command = command
36        self.children = []  # (predicate, node)
37        self.hints = hints
38
39    def connect(
40        self,
41        node_or_command: Union["FlowNode", str],
42        predicate: Predicate = lambda _: False,
43        hints: bool = True,
44    ):
45        """
46        Construct the flow graph by connecting this node to another node or a command.
47        The predicate is a function that tells the flow executor if the flow can enter the step without the user
48        intervention (automatically).
49        :param node_or_command: the node or a string for a command you want to connect this Node to
50                               (this node or command will be the follow up of this one)
51        :param predicate: function with one parameter, the context, to determine of the flow executor can continue
52                           automatically this flow with no user intervention.
53        :param hints: hints the user on the next step possible.
54        :return: the newly created node if you passed a command or the node you gave it to be easily chainable.
55        """
56        node_to_connect_to = (
57            node_or_command
58            if isinstance(node_or_command, FlowNode)
59            else FlowNode(node_or_command, hints=hints)
60        )
61        self.children.append((predicate, node_to_connect_to))
62        return node_to_connect_to
63
64    def predicate_for_node(self, node: "FlowNode"):
65        """
66        gets the predicate function for the specified child node.
67        :param node: the child node
68        :return: the predicate that allows the automatic execution of that node.
69        """
70        for predicate, possible_node in self.children:
71            if node == possible_node:
72                return predicate
73        return None
74
75    def __str__(self):
76        return self.command
77
78
79class FlowRoot(FlowNode):
80    """
81    This represent the entry point of a flow description.
82    """
83
84    def __init__(self, name: str, description: str):
85        """
86
87        :param name: The name of the conversation/flow.
88        :param description:  A human description of what this flow does.
89        :param hints: Hints for the next steps when triggered.
90        """
91        super().__init__()
92        self.name = name
93        self.description = description
94        self.auto_triggers = set()
95        self.room_flow = False
96
97    def connect(
98        self,
99        node_or_command: Union["FlowNode", str],
100        predicate: Predicate = lambda _: False,
101        auto_trigger: bool = False,
102        room_flow: bool = False,
103    ):
104        """
105        :see: FlowNode except fot auto_trigger
106        :param predicate: :see: FlowNode
107        :param node_or_command: :see: FlowNode
108        :param auto_trigger: Flag this root as autotriggering: it will start a flow if this command is executed
109                              in the chat.
110        :param room_flow: Bind the flow to the room instead of a single person
111        """
112        resp = super().connect(node_or_command, predicate)
113        if auto_trigger:
114            self.auto_triggers.add(node_or_command)
115        self.room_flow = room_flow
116        return resp
117
118    def __str__(self):
119        return self.name
120
121
122class _FlowEnd(FlowNode):
123    def __str__(self):
124        return "END"
125
126
127#: Flow marker indicating that the flow ends.
128FLOW_END = _FlowEnd()
129
130
131class InvalidState(Exception):
132    """
133    Raised when the Flow Executor is asked to do something contrary to the contraints it has been given.
134    """
135
136    pass
137
138
139class Flow:
140    """
141    This is a live Flow. It keeps context of the conversation (requestor and context).
142    Context is just a python dictionary representing the state of the conversation.
143    """
144
145    def __init__(
146        self, root: FlowRoot, requestor: Identifier, initial_context: Mapping[str, Any]
147    ):
148        """
149
150        :param root: the root of this flow.
151        :param requestor: the user requesting this flow.
152        :param initial_context: any data we already have that could help executing this flow automatically.
153        """
154        self._root = root
155        self._current_step = self._root
156        self.ctx = dict(initial_context)
157        self.requestor = requestor
158
159    def next_autosteps(self) -> List[FlowNode]:
160        """
161        Get the next steps that can be automatically executed according to the set predicates.
162        """
163        return [
164            node
165            for predicate, node in self._current_step.children
166            if predicate(self.ctx)
167        ]
168
169    def next_steps(self) -> List[FlowNode]:
170        """
171        Get all the possible next steps after this one (predicates statisfied or not).
172        """
173        return [node for predicate, node in self._current_step.children]
174
175    def advance(self, next_step: FlowNode, enforce_predicate=True):
176        """
177        Move on along the flow.
178        :param next_step: Which node you want to move the flow forward to.
179        :param enforce_predicate: Do you want to check if the predicate is verified for this step or not.
180                                   Usually, if it is a manual step, the predicate is irrelevant because the user
181                                   will give the missing information as parameters to the command.
182        """
183        if enforce_predicate:
184            predicate = self._current_step.predicate_for_node(next_step)
185            if predicate is None:
186                raise ValueError(f"There is no such children: {next_step}.")
187
188            if not predicate(self.ctx):
189                raise InvalidState(
190                    "It is not possible to advance to this step because its predicate is false."
191                )
192
193        self._current_step = next_step
194
195    @property
196    def name(self) -> str:
197        """
198        Helper property to get the name of the flow.
199        """
200        return self._root.name
201
202    @property
203    def current_step(self) -> FlowNode:
204        """
205        The current step this Flow is waiting on.
206        """
207        return self._current_step
208
209    @property
210    def root(self) -> FlowRoot:
211        """
212        The original flowroot of this flow.
213        """
214        return self._root
215
216    def check_identifier(self, identifier: Identifier):
217        is_room = isinstance(self.requestor, Room)
218        is_room = is_room and isinstance(identifier, RoomOccupant)
219        is_room = is_room and self.requestor == identifier.room
220        return is_room or self.requestor == identifier
221
222    def __str__(self):
223        return f"{self._root} ({self.requestor}) with params {dict(self.ctx)}"
224
225
226class BotFlow:
227    """
228    Defines a Flow plugin ie. a plugin that will define new flows from its methods with the @botflow decorator.
229    """
230
231    def __init__(self, bot, name=None):
232        super().__init__()
233        self._bot = bot
234        self.is_activated = False
235        self._name = name
236
237    @property
238    def name(self) -> str:
239        """
240        Get the name of this flow as described in its .plug file.
241
242        :return: The flow name.
243        """
244        return self._name
245
246    def activate(self) -> None:
247        """
248        Override if you want to do something at initialization phase (don't forget to
249        super(Gnagna, self).activate())
250        """
251        self._bot.inject_flows_from(self)
252        self.is_activated = True
253
254    def deactivate(self) -> None:
255        """
256        Override if you want to do something at tear down phase (don't forget to super(Gnagna, self).deactivate())
257        """
258        self._bot.remove_flows_from(self)
259        self.is_activated = False
260
261    def get_command(self, command_name: str):
262        """
263        Helper to get a specific command.
264        """
265        self._bot.all_commands.get(command_name, None)
266
267
268class FlowExecutor:
269    """
270    This is a instance that can monitor and execute flow instances.
271    """
272
273    def __init__(self, bot):
274        self._lock = RLock()
275        self.flow_roots = {}
276        self.in_flight = []
277        self._pool = ThreadPool(EXECUTOR_THREADS)
278        atexit.register(self._pool.close)
279        self._bot = bot
280
281    def add_flow(self, flow: FlowRoot):
282        """
283        Register a flow with this executor.
284        """
285        with self._lock:
286            self.flow_roots[flow.name] = flow
287
288    def trigger(
289        self, cmd: str, requestor: Identifier, extra_context=None
290    ) -> Optional[Flow]:
291        """
292        Trigger workflows that may have command cmd as a auto_trigger or an in flight flow waiting for command.
293        This assume cmd has been correctly executed.
294        :param requestor: the identifier of the person who started this flow
295        :param cmd: the command that has just been executed.
296        :param extra_context: extra context from the current conversation
297        :returns: The flow it triggered or None if none were matching.
298        """
299        flow, next_step = self.check_inflight_flow_triggered(cmd, requestor)
300        if not flow:
301            flow, next_step = self._check_if_new_flow_is_triggered(cmd, requestor)
302        if not flow:
303            return None
304
305        flow.advance(next_step, enforce_predicate=False)
306        if extra_context:
307            flow.ctx = dict(extra_context)
308        self._enqueue_flow(flow)
309        return flow
310
311    def check_inflight_already_running(self, user: Identifier) -> bool:
312        """
313            Check if user is already running a flow.
314        :param user: the user
315        """
316        with self._lock:
317            for flow in self.in_flight:
318                if flow.requestor == user:
319                    return True
320        return False
321
322    def check_inflight_flow_triggered(
323        self, cmd: str, user: Identifier
324    ) -> Tuple[Optional[Flow], Optional[FlowNode]]:
325        """
326        Check if a command from a specific user was expected in one of the running flow.
327        :param cmd: the command that has just been executed.
328        :param user: the identifier of the person who started this flow
329        :returns: The name of the flow it triggered or None if none were matching."""
330        log.debug("Test if the command %s is a trigger for an inflight flow ...", cmd)
331        # TODO: What if 2 flows wait for the same command ?
332        with self._lock:
333            for flow in self.in_flight:
334                if flow.check_identifier(user):
335                    log.debug("Requestor has a flow %s in flight", flow.name)
336                    for next_step in flow.next_steps():
337                        if next_step.command == cmd:
338                            log.debug(
339                                "Requestor has a flow in flight waiting for this command !"
340                            )
341                            return flow, next_step
342        log.debug("None matched.")
343        return None, None
344
345    def _check_if_new_flow_is_triggered(
346        self, cmd: str, user: Identifier
347    ) -> Tuple[Optional[Flow], Optional[FlowNode]]:
348        """
349        Trigger workflows that may have command cmd as a auto_trigger..
350        This assume cmd has been correctly executed.
351        :param cmd: the command that has just been executed.
352        :param user: the identifier of the person who started this flow
353        :returns: The name of the flow it triggered or None if none were matching.
354        """
355        log.debug("Test if the command %s is an auto-trigger for any flow ...", cmd)
356        with self._lock:
357            for name, flow_root in self.flow_roots.items():
358                if (
359                    cmd in flow_root.auto_triggers
360                    and not self.check_inflight_already_running(user)
361                ):
362                    log.debug(
363                        "Flow %s has been auto-triggered by the command %s by user %s",
364                        name,
365                        cmd,
366                        user,
367                    )
368                    return self._create_new_flow(flow_root, user, cmd)
369        return None, None
370
371    @staticmethod
372    def _create_new_flow(
373        flow_root, requestor: Identifier, initial_command
374    ) -> Tuple[Optional[Flow], Optional[FlowNode]]:
375        """
376        Helper method to create a new FLow.
377        """
378        empty_context = {}
379        flow = Flow(flow_root, requestor, empty_context)
380        for possible_next_step in flow.next_steps():
381            if possible_next_step.command == initial_command:
382                # The predicate is good as we just executed manually the command.
383                return flow, possible_next_step
384        return None, None
385
386    def start_flow(
387        self, name: str, requestor: Identifier, initial_context: Mapping[str, Any]
388    ) -> Flow:
389        """
390        Starts the execution of a Flow.
391        """
392        if name not in self.flow_roots:
393            raise ValueError(f"Flow {name} doesn't exist")
394        if self.check_inflight_already_running(requestor):
395            raise ValueError(f"User {str(requestor)} is already running a flow.")
396
397        flow_root = self.flow_roots[name]
398        identity = requestor
399        if isinstance(requestor, RoomOccupant) and flow_root.room_flow:
400            identity = requestor.room
401
402        flow = Flow(self.flow_roots[name], identity, initial_context)
403        self._enqueue_flow(flow)
404        return flow
405
406    def stop_flow(self, name: str, requestor: Identifier) -> Optional[Flow]:
407        """
408        Stops a specific flow. It is a no op if the flow doesn't exist.
409        Returns the stopped flow if found.
410        """
411        with self._lock:
412            for flow in self.in_flight:
413                if flow.name == name and flow.check_identifier(requestor):
414                    log.debug(f"Removing flow {str(flow)}.")
415                    self.in_flight.remove(flow)
416                    return flow
417        return None
418
419    def _enqueue_flow(self, flow):
420        with self._lock:
421            if flow not in self.in_flight:
422                self.in_flight.append(flow)
423        self._pool.apply_async(self.execute, (flow,))
424
425    def execute(self, flow: Flow):
426        """
427        This is where the flow execution happens from one of the thread of the pool.
428        """
429        while True:
430            autosteps = flow.next_autosteps()
431            steps = flow.next_steps()
432
433            if not steps:
434                log.debug("Flow ended correctly.Nothing left to do.")
435                with self._lock:
436                    self.in_flight.remove(flow)
437                break
438
439            if not autosteps and flow.current_step.hints:
440                possible_next_steps = [
441                    f"You are in the flow **{flow.name}**, you can continue with:\n\n"
442                ]
443                for step in steps:
444                    cmd = step.command
445                    cmd_fnc = self._bot.all_commands[cmd]
446                    reg_cmd = cmd_fnc._err_re_command
447                    syntax_args = cmd_fnc._err_command_syntax
448                    reg_prefixed = (
449                        cmd_fnc._err_command_prefix_required if reg_cmd else True
450                    )
451                    syntax = self._bot.prefix if reg_prefixed else ""
452                    if not reg_cmd:
453                        syntax += cmd.replace("_", " ")
454                    if syntax_args:
455                        syntax += syntax_args
456                    possible_next_steps.append(f"- {syntax}")
457                self._bot.send(flow.requestor, "\n".join(possible_next_steps))
458                break
459
460            log.debug(
461                "Steps triggered automatically %s.",
462                ", ".join(str(node) for node in autosteps),
463            )
464            log.debug(
465                "All possible next steps: %s.", ", ".join(str(node) for node in steps)
466            )
467
468            for autostep in autosteps:
469                log.debug("Proceeding automatically with step %s", autostep)
470                if autostep == FLOW_END:
471                    log.debug("This flow ENDED.")
472                    with self._lock:
473                        self.in_flight.remove(flow)
474                    return
475                try:
476                    msg = Message(frm=flow.requestor, flow=flow)
477                    result = self._bot.commands[autostep.command](msg, None)
478                    log.debug("Step result %s: %s", flow.requestor, result)
479
480                except Exception as e:
481                    log.exception("%s errored at %s", flow, autostep)
482                    self._bot.send(
483                        flow.requestor, f'{flow} errored at {autostep} with "{e}"'
484                    )
485                flow.advance(
486                    autostep
487                )  # TODO: this is only true for a single step, make it forkable.
488        log.debug("Flow execution suspended/ended normally.")
489