1# -*- coding: utf-8 -*- 2 3""" 4File Stream Operations 5""" 6from io import open as io_open 7import requests 8import sys 9import tempfile 10 11from typing import Optional 12 13import os.path as osp 14 15from mathics.settings import ROOT_DIR 16 17HOME_DIR = osp.expanduser("~") 18PATH_VAR = [".", HOME_DIR, osp.join(ROOT_DIR, "data"), osp.join(ROOT_DIR, "packages")] 19 20def create_temporary_file(suffix=None, delete=False): 21 if suffix == "": 22 suffix = None 23 24 fp = tempfile.NamedTemporaryFile(delete=delete, suffix=suffix) 25 result = fp.name 26 fp.close() 27 return result 28 29 30def urlsave_tmp(url, location=None, **kwargs): 31 suffix = "" 32 strip_url = url.split("/") 33 if len(strip_url) > 3: 34 strip_url = strip_url[-1] 35 if strip_url != "": 36 suffix = strip_url[len(strip_url.split(".")[0]) :] 37 try: 38 r = requests.get(url, allow_redirects=True) 39 if location is None: 40 location = create_temporary_file(suffix=suffix) 41 with open(location, "wb") as fp: 42 fp.write(r.content) 43 result = fp.name 44 except Exception: 45 result = None 46 return result 47 48 49def path_search(filename): 50 # For names of the form "name`", search for name.mx and name.m 51 if filename[-1] == "`": 52 filename = filename[:-1].replace("`", osp.sep) 53 for ext in [".mx", ".m"]: 54 result = path_search(filename + ext) 55 if result is not None: 56 filename = None 57 break 58 if filename is not None: 59 result = None 60 # If filename is an internet address, download the file 61 # and store it in a temporal location 62 lenfn = len(filename) 63 if ( 64 (lenfn > 7 and filename[:7] == "http://") 65 or (lenfn > 8 and filename[:8] == "https://") 66 or (lenfn > 6 and filename[:6] == "ftp://") 67 ): 68 result = urlsave_tmp(filename) 69 else: 70 for p in PATH_VAR + [""]: 71 path = osp.join(p, filename) 72 if osp.exists(path): 73 result = path 74 break 75 76 # If FindFile resolves to a dir, search within for Kernel/init.m and init.m 77 if result is not None and osp.isdir(result): 78 for ext in [osp.join("Kernel", "init.m"), "init.m"]: 79 tmp = osp.join(result, ext) 80 if osp.isfile(tmp): 81 return tmp 82 return result 83 84 85class StreamsManager(object): 86 __instance = None 87 STREAMS = {} 88 @staticmethod 89 def get_instance(): 90 """ Static access method. """ 91 if StreamsManager.__instance == None: 92 StreamsManager() 93 return StreamsManager.__instance 94 95 def __init__(self): 96 """ Virtually private constructor. """ 97 if StreamsManager.__instance != None: 98 raise Exception("this class is a singleton!") 99 else: 100 StreamsManager.__instance = self 101 102 def add(self, name: str, mode: Optional[str]=None, encoding=None, io=None, num: Optional[int]=None) -> Optional["Stream"]: 103 if num is None: 104 num = self.next 105 # In theory in this branch we won't find num. 106 # sanity check num 107 found = self.lookup_stream(num) 108 if found and found is not None: 109 raise Exception(f"Stream {num} already open") 110 stream = Stream(name, mode, encoding, io, num) 111 self.STREAMS[num] = stream 112 return stream 113 114 def delete(self, n: int) -> bool: 115 stream = self.STREAMS.get(n, None) 116 if stream is not None: 117 del self.STREAMS[stream.n] 118 return True 119 return False 120 121 def lookup_stream(self, n=None) -> Optional["Stream"]: 122 if n is None: 123 return None 124 return self.STREAMS.get(n, None) 125 126 @property 127 def next(self): 128 numbers = [stream.n for stream in self.STREAMS.values()] + [2] 129 return max(numbers)+1 130 131 132stream_manager = StreamsManager() 133 134class Stream(object): 135 """ 136 Opens a stream 137 138 This can be used in a context_manager like this: 139 140 with Stream(pypath, "r") as f: 141 ... 142 143 However see mathics_open which wraps this 144 """ 145 def __init__(self, name: str, mode="r", encoding=None, io=None, channel_num=None): 146 if channel_num is None: 147 channel_num = stream_manager.next 148 if mode is None: 149 mode = "r" 150 self.name = name 151 self.mode = mode 152 self.encoding = encoding 153 self.io = io 154 self.n = channel_num 155 156 if mode not in ["r", "w", "a", "rb", "wb", "ab"]: 157 raise ValueError("Can't handle mode {0}".format(mode)) 158 159 def __enter__(self): 160 # find path 161 path = path_search(self.name) 162 if path is None and self.mode in ["w", "a", "wb", "ab"]: 163 path = self.name 164 if path is None: 165 raise IOError 166 167 # determine encoding 168 if "b" not in self.mode: 169 encoding = self.encoding 170 else: 171 encoding = None 172 173 # open the stream 174 fp = io_open(path, self.mode, encoding=encoding) 175 stream_manager.add(name=path, mode=self.mode, encoding=encoding, io=fp) 176 return fp 177 178 def __exit__(self, type, value, traceback): 179 if self.io is not None: 180 self.io.close() 181 # Leave around self.io so we can call closed() to query its status. 182 stream_manager.delete(self.n) 183 184stream_manager.add("stdin", mode="r", num=0, io=sys.stdin) 185stream_manager.add("stdout", mode="w", num=1, io=sys.stdout) 186stream_manager.add("stderr", mode="w", num=2, io=sys.stderr) 187