1"""Context management tools for xonsh.""" 2import sys 3import textwrap 4import builtins 5from collections.abc import Mapping 6 7 8class Block(object): 9 """This is a context manager for obtaining a block of lines without actually 10 executing the block. The lines are accessible as the 'lines' attribute. 11 This must be used as a macro. 12 """ 13 14 __xonsh_block__ = str 15 16 def __init__(self): 17 """ 18 Attributes 19 ---------- 20 lines : list of str or None 21 Block lines as if split by str.splitlines(), if available. 22 glbs : Mapping or None 23 Global execution context, ie globals(). 24 locs : Mapping or None 25 Local execution context, ie locals(). 26 """ 27 self.lines = self.glbs = self.locs = None 28 29 def __enter__(self): 30 if not hasattr(self, "macro_block"): 31 raise XonshError(self.__class__.__name__ + " must be entered as a macro!") 32 self.lines = self.macro_block.splitlines() 33 self.glbs = self.macro_globals 34 if self.macro_locals is not self.macro_globals: 35 # leave locals as None when it is the same as globals 36 self.locs = self.macro_locals 37 return self 38 39 def __exit__(self, exc_type, exc_value, traceback): 40 pass 41 42 43class Functor(Block): 44 """This is a context manager that turns the block into a callable 45 object, bound to the execution context it was created in. 46 """ 47 48 def __init__(self, args=(), kwargs=None, rtn=""): 49 """ 50 Parameters 51 ---------- 52 args : Sequence of str, optional 53 A tuple of argument names for the functor. 54 kwargs : Mapping of str to values or list of item tuples, optional 55 Keyword argument names and values, if available. 56 rtn : str, optional 57 Name of object to return, if available. 58 59 Attributes 60 ---------- 61 func : function 62 The underlying function object. This defaults to none and is set 63 after the the block is exited. 64 """ 65 super().__init__() 66 self.func = None 67 self.args = args 68 if kwargs is None: 69 self.kwargs = [] 70 elif isinstance(kwargs, Mapping): 71 self.kwargs = sorted(kwargs.items()) 72 else: 73 self.kwargs = kwargs 74 self.rtn = rtn 75 76 def __enter__(self): 77 super().__enter__() 78 body = textwrap.indent(self.macro_block, " ") 79 uid = hash(body) + sys.maxsize # should always be a positive int 80 name = "__xonsh_functor_{uid}__".format(uid=uid) 81 # construct signature string 82 sig = rtn = "" 83 sig = ", ".join(self.args) 84 kwstr = ", ".join([k + "=None" for k, _ in self.kwargs]) 85 if len(kwstr) > 0: 86 sig = kwstr if len(sig) == 0 else sig + ", " + kwstr 87 # construct return string 88 rtn = str(self.rtn) 89 if len(rtn) > 0: 90 rtn = " return " + rtn + "\n" 91 # construct function string 92 fstr = "def {name}({sig}):\n{body}\n{rtn}" 93 fstr = fstr.format(name=name, sig=sig, body=body, rtn=rtn) 94 glbs = self.glbs 95 locs = self.locs 96 execer = builtins.__xonsh_execer__ 97 execer.exec(fstr, glbs=glbs, locs=locs) 98 if locs is not None and name in locs: 99 func = locs[name] 100 elif name in glbs: 101 func = glbs[name] 102 else: 103 raise ValueError("Functor block could not be found in context.") 104 if len(self.kwargs) > 0: 105 func.__defaults__ = tuple(v for _, v in self.kwargs) 106 self.func = func 107 return self 108 109 def __exit__(self, exc_type, exc_value, traceback): 110 pass 111 112 def __call__(self, *args, **kwargs): 113 """Dispatches to func.""" 114 if self.func is None: 115 msg = "{} block with 'None' func not callable" 116 raise AttributeError(msg.formst(self.__class__.__name__)) 117 return self.func(*args, **kwargs) 118