1import opcode 2 3from rpyc.lib.compat import is_py_gte38 4from types import CodeType, FunctionType 5from rpyc.core import brine, netref 6from dis import _unpack_opargs 7 8CODEOBJ_MAGIC = "MAg1c J0hNNzo0hn ZqhuBP17LQk8" 9 10 11# NOTE: dislike this kind of hacking on the level of implementation details, 12# should search for a more reliable/future-proof way: 13CODE_HAVEARG_SIZE = 3 14 15 16def decode_codeobj(codeobj): 17 # adapted from dis.dis 18 codestr = codeobj.co_code 19 free = None 20 for i, op, oparg in _unpack_opargs(codestr): 21 opname = opcode.opname[op] 22 if oparg is not None: 23 if op in opcode.hasconst: 24 argval = codeobj.co_consts[oparg] 25 elif op in opcode.hasname: 26 argval = codeobj.co_names[oparg] 27 elif op in opcode.hasjrel: 28 argval = i + oparg + CODE_HAVEARG_SIZE 29 elif op in opcode.haslocal: 30 argval = codeobj.co_varnames[oparg] 31 elif op in opcode.hascompare: 32 argval = opcode.cmp_op[oparg] 33 elif op in opcode.hasfree: 34 if free is None: 35 free = codeobj.co_cellvars + codeobj.co_freevars 36 argval = free[oparg] 37 38 yield (opname, argval) 39 40 41def _export_codeobj(cobj): 42 consts2 = [] 43 for const in cobj.co_consts: 44 if brine.dumpable(const): 45 consts2.append(const) 46 elif isinstance(const, CodeType): 47 consts2.append(_export_codeobj(const)) 48 else: 49 raise TypeError("Cannot export a function with non-brinable constants: %r" % (const,)) 50 51 if is_py_gte38: 52 # Constructor was changed in 3.8 to support "advanced" programming styles 53 exported = (cobj.co_argcount, cobj.co_posonlyargcount, cobj.co_kwonlyargcount, cobj.co_nlocals, 54 cobj.co_stacksize, cobj.co_flags, cobj.co_code, tuple(consts2), cobj.co_names, cobj.co_varnames, 55 cobj.co_filename, cobj.co_name, cobj.co_firstlineno, cobj.co_lnotab, cobj.co_freevars, 56 cobj.co_cellvars) 57 else: 58 exported = (cobj.co_argcount, cobj.co_kwonlyargcount, cobj.co_nlocals, cobj.co_stacksize, cobj.co_flags, 59 cobj.co_code, tuple(consts2), cobj.co_names, cobj.co_varnames, cobj.co_filename, 60 cobj.co_name, cobj.co_firstlineno, cobj.co_lnotab, cobj.co_freevars, cobj.co_cellvars) 61 assert brine.dumpable(exported) 62 return (CODEOBJ_MAGIC, exported) 63 64 65def export_function(func): 66 closure = func.__closure__ 67 code = func.__code__ 68 defaults = func.__defaults__ 69 kwdefaults = func.__kwdefaults__ 70 if kwdefaults is not None: 71 kwdefaults = tuple(kwdefaults.items()) 72 73 if closure: 74 raise TypeError("Cannot export a function closure") 75 if not brine.dumpable(defaults): 76 raise TypeError("Cannot export a function with non-brinable defaults (__defaults__)") 77 if not brine.dumpable(kwdefaults): 78 raise TypeError("Cannot export a function with non-brinable defaults (__kwdefaults__)") 79 80 return func.__name__, func.__module__, defaults, kwdefaults, _export_codeobj(code)[1] 81 82 83def _import_codetup(codetup): 84 # Handle tuples sent from 3.8 as well as 3 < version < 3.8. 85 if len(codetup) == 16: 86 (argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames, 87 filename, name, firstlineno, lnotab, freevars, cellvars) = codetup 88 else: 89 (argcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames, 90 filename, name, firstlineno, lnotab, freevars, cellvars) = codetup 91 posonlyargcount = 0 92 93 consts2 = [] 94 for const in consts: 95 if isinstance(const, tuple) and len(const) == 2 and const[0] == CODEOBJ_MAGIC: 96 consts2.append(_import_codetup(const[1])) 97 else: 98 consts2.append(const) 99 consts = tuple(consts2) 100 if is_py_gte38: 101 codetup = (argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames, 102 filename, name, firstlineno, lnotab, freevars, cellvars) 103 else: 104 codetup = (argcount, kwonlyargcount, nlocals, stacksize, flags, code, consts, names, varnames, filename, name, 105 firstlineno, lnotab, freevars, cellvars) 106 return CodeType(*codetup) 107 108 109def import_function(functup, globals=None, def_=True): 110 name, modname, defaults, kwdefaults, codetup = functup 111 if globals is None: 112 try: 113 mod = __import__(modname, None, None, "*") 114 except ImportError: 115 mod = __import__("__main__", None, None, "*") 116 globals = mod.__dict__ 117 # function globals must be real dicts, sadly: 118 if isinstance(globals, netref.BaseNetref): 119 from rpyc.utils.classic import obtain 120 globals = obtain(globals) 121 globals.setdefault('__builtins__', __builtins__) 122 codeobj = _import_codetup(codetup) 123 funcobj = FunctionType(codeobj, globals, name, defaults) 124 if kwdefaults is not None: 125 funcobj.__kwdefaults__ = {t[0]: t[1] for t in kwdefaults} 126 if def_: 127 globals[name] = funcobj 128 return funcobj 129