1# -*- coding: utf-8 -*-
2
3"""
4File Stream Operations
5"""
6from io import open as io_open
7import requests
8import sys
9import tempfile
10
11from typing import Optional
12
13import os.path as osp
14
15from mathics.settings import ROOT_DIR
16
17HOME_DIR = osp.expanduser("~")
18PATH_VAR = [".", HOME_DIR, osp.join(ROOT_DIR, "data"), osp.join(ROOT_DIR, "packages")]
19
20def create_temporary_file(suffix=None, delete=False):
21    if suffix == "":
22        suffix = None
23
24    fp = tempfile.NamedTemporaryFile(delete=delete, suffix=suffix)
25    result = fp.name
26    fp.close()
27    return result
28
29
30def urlsave_tmp(url, location=None, **kwargs):
31    suffix = ""
32    strip_url = url.split("/")
33    if len(strip_url) > 3:
34        strip_url = strip_url[-1]
35        if strip_url != "":
36            suffix = strip_url[len(strip_url.split(".")[0]) :]
37        try:
38            r = requests.get(url, allow_redirects=True)
39            if location is None:
40                location = create_temporary_file(suffix=suffix)
41            with open(location, "wb") as fp:
42                fp.write(r.content)
43                result = fp.name
44        except Exception:
45            result = None
46    return result
47
48
49def path_search(filename):
50    # For names of the form "name`", search for name.mx and name.m
51    if filename[-1] == "`":
52        filename = filename[:-1].replace("`", osp.sep)
53        for ext in [".mx", ".m"]:
54            result = path_search(filename + ext)
55            if result is not None:
56                filename = None
57                break
58    if filename is not None:
59        result = None
60        # If filename is an internet address, download the file
61        # and store it in a temporal location
62        lenfn = len(filename)
63        if (
64            (lenfn > 7 and filename[:7] == "http://")
65            or (lenfn > 8 and filename[:8] == "https://")
66            or (lenfn > 6 and filename[:6] == "ftp://")
67        ):
68            result = urlsave_tmp(filename)
69        else:
70            for p in PATH_VAR + [""]:
71                path = osp.join(p, filename)
72                if osp.exists(path):
73                    result = path
74                    break
75
76            # If FindFile resolves to a dir, search within for Kernel/init.m and init.m
77            if result is not None and osp.isdir(result):
78                for ext in [osp.join("Kernel", "init.m"), "init.m"]:
79                    tmp = osp.join(result, ext)
80                    if osp.isfile(tmp):
81                        return tmp
82    return result
83
84
85class StreamsManager(object):
86    __instance = None
87    STREAMS = {}
88    @staticmethod
89    def get_instance():
90        """ Static access method. """
91        if StreamsManager.__instance == None:
92            StreamsManager()
93        return StreamsManager.__instance
94
95    def __init__(self):
96        """ Virtually private constructor. """
97        if StreamsManager.__instance != None:
98            raise Exception("this class is a singleton!")
99        else:
100            StreamsManager.__instance = self
101
102    def add(self, name: str, mode: Optional[str]=None, encoding=None, io=None, num: Optional[int]=None) -> Optional["Stream"]:
103        if num is None:
104            num = self.next
105            # In theory in this branch we won't find num.
106        # sanity check num
107        found = self.lookup_stream(num)
108        if found and found is not None:
109            raise Exception(f"Stream {num} already open")
110        stream = Stream(name, mode, encoding, io, num)
111        self.STREAMS[num] = stream
112        return stream
113
114    def delete(self, n: int) -> bool:
115        stream = self.STREAMS.get(n, None)
116        if stream is not None:
117            del self.STREAMS[stream.n]
118            return True
119        return False
120
121    def lookup_stream(self, n=None) -> Optional["Stream"]:
122        if n is None:
123            return None
124        return self.STREAMS.get(n, None)
125
126    @property
127    def next(self):
128        numbers = [stream.n for stream in self.STREAMS.values()] + [2]
129        return max(numbers)+1
130
131
132stream_manager = StreamsManager()
133
134class Stream(object):
135    """
136    Opens a stream
137
138    This can be used in a context_manager like this:
139
140    with Stream(pypath, "r") as f:
141         ...
142
143    However see mathics_open which wraps this
144    """
145    def __init__(self, name: str, mode="r", encoding=None, io=None, channel_num=None):
146        if channel_num is None:
147            channel_num = stream_manager.next
148        if mode is None:
149            mode = "r"
150        self.name = name
151        self.mode = mode
152        self.encoding = encoding
153        self.io = io
154        self.n = channel_num
155
156        if mode not in ["r", "w", "a", "rb", "wb", "ab"]:
157            raise ValueError("Can't handle mode {0}".format(mode))
158
159    def __enter__(self):
160        # find path
161        path = path_search(self.name)
162        if path is None and self.mode in ["w", "a", "wb", "ab"]:
163            path = self.name
164        if path is None:
165            raise IOError
166
167        # determine encoding
168        if "b" not in self.mode:
169            encoding = self.encoding
170        else:
171            encoding = None
172
173        # open the stream
174        fp = io_open(path, self.mode, encoding=encoding)
175        stream_manager.add(name=path, mode=self.mode, encoding=encoding, io=fp)
176        return fp
177
178    def __exit__(self, type, value, traceback):
179        if self.io is not None:
180            self.io.close()
181        # Leave around self.io so we can call closed() to query its status.
182        stream_manager.delete(self.n)
183
184stream_manager.add("stdin", mode="r", num=0, io=sys.stdin)
185stream_manager.add("stdout", mode="w", num=1, io=sys.stdout)
186stream_manager.add("stderr", mode="w", num=2, io=sys.stderr)
187