1import opcode
2
3from rpyc.lib.compat import is_py_gte38
4from types import CodeType, FunctionType
5from rpyc.core import brine, netref
6from dis import _unpack_opargs
7
8CODEOBJ_MAGIC = "MAg1c J0hNNzo0hn ZqhuBP17LQk8"
9
10
11# NOTE: dislike this kind of hacking on the level of implementation details,
12# should search for a more reliable/future-proof way:
13CODE_HAVEARG_SIZE = 3
14
15
16def decode_codeobj(codeobj):
17    # adapted from dis.dis
18    codestr = codeobj.co_code
19    free = None
20    for i, op, oparg in _unpack_opargs(codestr):
21        opname = opcode.opname[op]
22        if oparg is not None:
23            if op in opcode.hasconst:
24                argval = codeobj.co_consts[oparg]
25            elif op in opcode.hasname:
26                argval = codeobj.co_names[oparg]
27            elif op in opcode.hasjrel:
28                argval = i + oparg + CODE_HAVEARG_SIZE
29            elif op in opcode.haslocal:
30                argval = codeobj.co_varnames[oparg]
31            elif op in opcode.hascompare:
32                argval = opcode.cmp_op[oparg]
33            elif op in opcode.hasfree:
34                if free is None:
35                    free = codeobj.co_cellvars + codeobj.co_freevars
36                argval = free[oparg]
37
38        yield (opname, argval)
39
40
41def _export_codeobj(cobj):
42    consts2 = []
43    for const in cobj.co_consts:
44        if brine.dumpable(const):
45            consts2.append(const)
46        elif isinstance(const, CodeType):
47            consts2.append(_export_codeobj(const))
48        else:
49            raise TypeError("Cannot export a function with non-brinable constants: %r" % (const,))
50
51    if is_py_gte38:
52        # Constructor was changed in 3.8 to support "advanced" programming styles
53        exported = (cobj.co_argcount, cobj.co_posonlyargcount, cobj.co_kwonlyargcount, cobj.co_nlocals,
54                    cobj.co_stacksize, cobj.co_flags, cobj.co_code, tuple(consts2), cobj.co_names, cobj.co_varnames,
55                    cobj.co_filename, cobj.co_name, cobj.co_firstlineno, cobj.co_lnotab, cobj.co_freevars,
56                    cobj.co_cellvars)
57    else:
58        exported = (cobj.co_argcount, cobj.co_kwonlyargcount, cobj.co_nlocals, cobj.co_stacksize, cobj.co_flags,
59                    cobj.co_code, tuple(consts2), cobj.co_names, cobj.co_varnames, cobj.co_filename,
60                    cobj.co_name, cobj.co_firstlineno, cobj.co_lnotab, cobj.co_freevars, cobj.co_cellvars)
61    assert brine.dumpable(exported)
62    return (CODEOBJ_MAGIC, exported)
63
64
65def export_function(func):
66    closure = func.__closure__
67    code = func.__code__
68    defaults = func.__defaults__
69    kwdefaults = func.__kwdefaults__
70    if kwdefaults is not None:
71        kwdefaults = tuple(kwdefaults.items())
72
73    if closure:
74        raise TypeError("Cannot export a function closure")
75    if not brine.dumpable(defaults):
76        raise TypeError("Cannot export a function with non-brinable defaults (__defaults__)")
77    if not brine.dumpable(kwdefaults):
78        raise TypeError("Cannot export a function with non-brinable defaults (__kwdefaults__)")
79
80    return func.__name__, func.__module__, defaults, kwdefaults, _export_codeobj(code)[1]
81
82
83def _import_codetup(codetup):
84    # Handle tuples sent from 3.8 as well as 3 < version < 3.8.
85    if len(codetup) == 16:
86        (argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames,
87         filename, name, firstlineno, lnotab, freevars, cellvars) = codetup
88    else:
89        (argcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames,
90         filename, name, firstlineno, lnotab, freevars, cellvars) = codetup
91        posonlyargcount = 0
92
93    consts2 = []
94    for const in consts:
95        if isinstance(const, tuple) and len(const) == 2 and const[0] == CODEOBJ_MAGIC:
96            consts2.append(_import_codetup(const[1]))
97        else:
98            consts2.append(const)
99    consts = tuple(consts2)
100    if is_py_gte38:
101        codetup = (argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames,
102                   filename, name, firstlineno, lnotab, freevars, cellvars)
103    else:
104        codetup = (argcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames, filename, name,
105                   firstlineno, lnotab, freevars, cellvars)
106    return CodeType(*codetup)
107
108
109def import_function(functup, globals=None, def_=True):
110    name, modname, defaults, kwdefaults, codetup = functup
111    if globals is None:
112        try:
113            mod = __import__(modname, None, None, "*")
114        except ImportError:
115            mod = __import__("__main__", None, None, "*")
116        globals = mod.__dict__
117    # function globals must be real dicts, sadly:
118    if isinstance(globals, netref.BaseNetref):
119        from rpyc.utils.classic import obtain
120        globals = obtain(globals)
121    globals.setdefault('__builtins__', __builtins__)
122    codeobj = _import_codetup(codetup)
123    funcobj = FunctionType(codeobj, globals, name, defaults)
124    if kwdefaults is not None:
125        funcobj.__kwdefaults__ = {t[0]: t[1] for t in kwdefaults}
126    if def_:
127        globals[name] = funcobj
128    return funcobj
129